李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
23.Flink之Watermark使用详解
Leefs
2022-01-25 PM
1576℃
0条
[TOC] ### 前言 **版本:** + Flink 1.10.1 **示例数据** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,31.8 sensor_1,1547718225,37.2 ``` ### 一、引入Watermark(使用已有的类) **方式一:**对于一个没有乱序,时间为升序的流设置引入Watermark ```java .assignAscendingTimestamps(_.timestamp * 1000L) //升序数据提取时间戳 ``` **方式二:当流中存在时间乱序问题,引入watermark,并设置延迟时间** ```java .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) ``` **说明** 1、`BoundedOutOfOrdernessTimestampExtractor`中的泛型为流中数据的类型; 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间); 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回; 4、当我们能大约估计到流中的最大乱序时,建议使用此方式,比较方便。 Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示: ```java val env = StreamExecutionEnvironment.getExecutionEnvironment // 必须指定 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ``` **完整代码** ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description Watermark引入 **/ object WaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //引入Watermark(使用已有的类) val waterMarkData = dataStream // 给一个没有乱序,时间为升序的流设置一个EventTime // .assignAscendingTimestamps(_.timestamp * 1000L) //升序数据提取时间戳 /** * 当流中存在时间乱序问题,引入watermark,并设置延迟时间 * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型 * 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间) * 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回 * 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便 */ .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("WaterMarkDemo") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ``` ### 二、自定义WaterMark生成机制 我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。 Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。 **自定义WaterMark两种方式**: + AssignerWithPeriodicWatermarks + AssignerWithPunctuatedWatermarks 以上两个接口都继承自TimestampAssigner。 #### 2.1 定期水位线(Assigner with periodic watermarks) **根据从事件数据中去获取时间戳设置水位线存在的问题:** 没有达到水位线时不管现实中的时间推进了多久都不会触发关窗。 ##### 概念 **定期水位线(Periodic Watermark)**:按照固定时间间隔生成新的水位线。 **优势** 不管是否有新的消息抵达,水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。 最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。 **代码示例** 应用定期水位线需要**实现AssignerWithPeriodicWatermarks API** ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description 定期水位线Demo **/ object PeriodicWaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //使用 TimestampAssigner 引入 Watermark /** * Assigner with periodic watermarks(周期性引入watermark) * 1、系统会周期性的将watermark插入到流中,默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置,单位为毫秒 * 2、产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大于流中最大watermark就插入,小于就不插入 * 3、如下,可以自定义一个周期性的时间戳抽取(需要实现 AssignerWithPeriodicWatermarks 接口) */ val waterMarkData = dataStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10)) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("PeriodicWaterMarkDemo") } } /** * 自定义一个周期生成watermark的类 * @param bound watermark的延时时间(毫秒) */ class MyPeriodicAssigner(bound:Long) extends AssignerWithPeriodicWatermarks[SensorReading]{ // 当前为止的最大时间戳(毫秒) var maxTs: Long = Long.MinValue /** * 获取当前的watermark(默认200毫秒获取一次,可以通过 env.getConfig.setAutoWatermarkInterval(5000) 来设置) * @return 当前watermark,当前最大时间戳 - 延时时间 */ override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) } /** * 指定eventTime对应的字段(流中每条数据都会调用一次此方法) * @param element 流中的每条数据 * @param previousElementTimestamp 无 * @return 当前流的eventTime(单位:毫秒) */ override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = { // 每条数据都获取其中的时间戳,跟最大时间戳取大,并重新赋值给最大时间戳 maxTs = maxTs.max(element.timestamp * 1000) element.timestamp * 1000 } } ``` 其中**extractTimestamp用于从消息中提取事件时间**,而**getCurrentWatermark**用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。 **注:**周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时 #### 2.2 标点水位线(Assigner with punctuated watermarks) ##### 概念 标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。 ##### 代码示例 应用标点水位线需要实现`AssignerWithPunctuatedWatermarks API` ```java import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time /** * @author lilinchao * @date 2022/1/25 * @description 标点水位线Demo **/ object PunctuatedWaterMarkDemo { def main(args: Array[String]): Unit = { //构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从调用时刻开始给env创建的每一个stream追加时间特性 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可 env.getConfig.setAutoWatermarkInterval(5000) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) //使用 TimestampAssigner 引入 Watermark /** * Assigner with punctuated watermarks(标点水位线) * 1、通过数据流中某些特殊标记事件来触发新水位线的生成。 * 2、种方式下窗口的触发与时间无关,而是决定于何时收到标记事件 */ val waterMarkData = dataStream.assignTimestampsAndWatermarks(new MyPunctuatedAssigner(10)) val resultData: DataStream[(String, Double, Long)] = waterMarkData.map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //滚动时间窗口 参数:滚动时长 .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3)) resultData.print() env.execute("PeriodicWaterMarkDemo") } } /** * 自定义一个周期生成watermark的类 * @param bound watermark的延时时间(毫秒) */ class MyPunctuatedAssigner(bound:Long) extends AssignerWithPunctuatedWatermarks[SensorReading]{ //checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线 override def checkAndGetNextWatermark(t: SensorReading, extractedTS: Long): Watermark = { if(t.id == "sensor_1"){ new Watermark(extractedTS - bound) }else{ null } } //extractTimestamp用于从消息中提取事件时间 override def extractTimestamp(element: SensorReading, previousTS: Long): Long = { element.timestamp * 1000 } } ``` **说明** + **extractTimestamp**:用于从消息中提取事件时间; + **checkAndGetNextWatermark**:用于检查事件是否标点事件,若是则生成新的水位线。 不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。 **注:**数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。 #### 2.3 迟到事件 ##### 概念 虽说水位线表明着早于它的事件不应该再出现,但是如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。 实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。 ##### 处理迟到数据的方法 迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种: - 重新激活已经关闭的窗口并重新计算以修正结果。 - 将迟到事件收集起来另外处理。 - 将迟到事件视为错误消息并丢弃。 Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用**Side Output**和**Allowed Lateness**。 - Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。 - Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。 ### 三、Watermark的设定 + 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解; + 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果; + 而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1872.html
上一篇
22.Flink之Watermark基本概念
下一篇
24.EvnetTime在window中的使用
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Jenkins
容器深入研究
Flink
Kibana
Typora
数学
Eclipse
Linux
Sentinel
Golang
并发编程
Zookeeper
国产数据库改造
MySQL
Stream流
数据结构
设计模式
Redis
DataX
查找
正则表达式
随笔
Flume
gorm
FileBeat
Spark Core
DataWarehouse
数据结构和算法
Hive
GET和POST
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞