李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
28.Flink ProcessFunction应用示例
Leefs
2022-01-30 AM
924℃
0条
[TOC] ### 前言 **版本:** + Flink 1.10.1 + Scala 1.12 **数据准备:** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,33.6 sensor_1,1547718225,35.8 ``` ### 一、定时器应用示例 #### 1.1 需求 > 连续15秒如果温度持续上升就报警 #### 1.2 示例代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/30 * @description 连续15秒如果温度持续上升就报警 **/ object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //读取数据 val inputStream = env.socketTextStream("192.168.159.139",8888) //将数据转换成样例类类型 val dataStream: DataStream[SensorReading] = inputStream.map( data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) } ) val warningStream = dataStream .keyBy(_.id) .process(new TempIncreWarning(15000L)) warningStream.print() env.execute("ProcessFunctionTest") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) // 实现自定义的KeyedProcessFunction class TempIncreWarning(interval: Long) extends KeyedProcessFunction[String, SensorReading, String]{ //定义状态:保存上一个温度值进行比较,保存注册定时器的时间戳用于删除 lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) ///定时器时间戳 lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts",classOf[Long])) //每条数据都会经过这个方法 override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { //先取出上次定时器状态和时间 val lastTemp = lastTempState.value() val timerTs = timerTsState.value() //更新温度值 lastTempState.update(value.temperature) //当前温度值和上次温度值进行比较 //timerTs == 0:说明是第一次 if(value.temperature > lastTemp && timerTs == 0){ //如果温度上升,且没有定时器,那么注册当前时间10s之后的定时器 val ts = ctx.timerService().currentProcessingTime() + interval ctx.timerService().registerProcessingTimeTimer(ts) timerTsState.update(ts) }else if(value.temperature < lastTemp){ //如果温度下降,那么删除定时器 ctx.timerService().deleteProcessingTimeTimer(timerTs) timerTsState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("传感器" + ctx.getCurrentKey + "的温度连续" + interval/1000 + "秒连续上升") timerTsState.clear() } } ``` ### 二、侧输出流应用示例 #### 2.1 需求 > 如果温度超过30,则输出到主流上,否则输出到侧流上,实现分流操作 #### 2.2 示例代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/30 * @description 如果温度超过30,则输出到主流上,否则输出到侧流上,实现分流操作 **/ object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //读取数据 val inputStream = env.socketTextStream("192.168.159.139",8888) //将数据转换成样例类类型 val dataStream: DataStream[SensorReading] = inputStream.map( data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) } ) val highTempStream: DataStream[SensorReading] = dataStream.process(new SplitTempProcessor(30.0)) highTempStream.print("high") highTempStream.getSideOutput(new OutputTag[(String, Long, Double)]("low")).print("low") env.execute("SideOutputTest") } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) // 实现自定义ProcessFunction,利用侧输出流,进行分流操作 /** * @param threshold * ProcessFunction传2个参数,第一个是输入的数据类型,第二个是输出的数据类型,都可以自定义 */ class SplitTempProcessor(threshold:Double) extends ProcessFunction[SensorReading, SensorReading]{ override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { if(value.temperature > threshold){ //如果当前温度值大于30,那么输出到主流 out.collect(value) }else { //如果不超过30度,那么输出到侧输出流 ctx.output(new OutputTag[(String, Long, Double)]("low"),(value.id,value.timestamp,value.temperature)) } } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1885.html
上一篇
27.【转载】Flink ProcessFunction API全解析及实战
下一篇
29.Flink状态后端(State Backends)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
SpringCloudAlibaba
HDFS
ClickHouse
Livy
Flume
字符串
数据结构和算法
DataWarehouse
Hbase
并发编程
Java
Redis
FileBeat
Eclipse
Stream流
二叉树
JavaWEB项目搭建
Elastisearch
持有对象
Tomcat
Spark Streaming
Hadoop
Beego
FastDFS
NIO
锁
Spark RDD
Git
算法
MyBatis-Plus
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞