李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
26.Flink状态编程操作示例
Leefs
2022-01-27 PM
1009℃
0条
[TOC] ### 一、键控状态的使用 ```java import java.util import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.functions.{ReduceFunction, RichReduceFunction} import org.apache.flink.api.common.state._ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ /** * @author lilinchao * @date 2022/1/27 * @description 键控状态Demo **/ object KeyedStateDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array: Array[String] = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData = dataStream.keyBy(_.id) .reduce(new MyKeyedState) resultData.print() env.execute("KeyedStateDemo") } } class MyKeyedState extends RichReduceFunction[SensorReading]{ // state 定义 // 声明一个键控状态 // lazy 定义的惰性变量 会实现延迟加载,即在编译的时候并没有执行 // 惰性变量只能是不可变变量,且只有在调用惰性变量时,才会实例化这个变量 // 全局定义值状态 var valueState:ValueState[Double] = _ //定义值状态 lazy val myValueState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("myValue",classOf[Double])) // 定义列表状态 lazy val myListState:ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("myList",classOf[String])) //定义mapState状态 lazy val myMapState:MapState[String,Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Double]("myMap",classOf[String],classOf[Double])) //Reducing状态 lazy val myReducingState:ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("myReduce",new MyReduceFunction,classOf[SensorReading])) override def open(parameters: Configuration): Unit = { // 定义值状态 只有在open生命周期内才生效 valueState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate", classOf[Double])) } override def reduce(t: SensorReading, t1: SensorReading): SensorReading = { //状态的读写 //值状态的读取 val myValue: Double = myValueState.value() // 更新状态 valueState.update(t.temperature) // 列表状态 myListState.add("aaa") val list = new util.ArrayList[String]() list.add("bbb") list.add("ccc") myListState.addAll(list) // 更新替换掉列表内所有的数据 myListState.update(list) // 获取整个列表的值,可遍历 myListState.get() //mapState状态操作 myMapState.contains("sensor_1") // 是否包含key myMapState.get("sensor_1") // 获取key对应的value myMapState.put("sensor_1",1.3) // 更新状态 myMapState.remove("sensor_1") //移除某个值 //Reducing状态 myReducingState.get() myReducingState.add(t) // 与之前的状态聚合 t1 } } class MyReduceFunction extends ReduceFunction[SensorReading]{ override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = SensorReading(value1.id, value2.timestamp, value1.temperature.min(value2.temperature)) } ``` **说明** **声明一个键控状态方式:** + **进行全局定义状态** + 只有在open生命周期内才生效 + **通过lazy惰性变量定义** + lazy 定义的惰性变量会实现延迟加载,即在编译的时候并没有执行; + 惰性变量只能是不可变变量,且只有在调用惰性变量时,才会实例化这个变量。 ### 二、案例实操 #### 2.1 需求 > 对于温度传感器温度值跳变,超过10度,报警 #### 2.2 数据准备 ```basic sensor_1,1547718199,35.8 sensor_1,1547718206,32.3 sensor_1,1547718208,36.2 sensor_1,1547718210,20.3 sensor_1,1547718213,22.5 sensor_1,1547718215,34.6 sensor_1,1547718218,30.9 ``` #### 2.3 示例代码 + **方式一:通过自定义RichFlatmapFunction实现** ```java class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //定义状态保存上一次的温度值 lazy val lastTempSate:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) lazy val flagState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean] ("flag",classOf[Boolean])) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempSate.value() //跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if(diff > threshold) out.collect((value.id,lastTemp,value.temperature)) //更新状态 lastTempSate.update(value.temperature) flagState.update(true) } } ``` 该种方式如果在第一次输入温度值时因为获取上次的温度值为0,如果初始温度不在`-10~10`度之间会出现误判。 + **方式二:使用`FlatMap with keyed ValueState` 的快捷方式 `flatMapWithState`实现** ```java .flatMapWithState[(String, Double, Double), Double]{ case (data:SensorReading,None) => (List.empty,Some(data.temperature)) case (data:SensorReading,lastTemp:Some[Double]) => { //跟最新的温度值求差值作比较 val diff = (data.temperature - lastTemp.get).abs if(diff > 10.0) (List((data.id,lastTemp.get,data.temperature)),Some(data.temperature)) else (List.empty,Some(data.temperature)) } } ``` #### 2.4 完整代码 ```java import com.lilinchao.flink.window.SensorReading import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author lilinchao * @date 2022/1/27 * @description 需求:对于温度传感器温度值跳变,超过10度,报警 **/ object TempChangeAlertDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream: DataStream[String] = env.socketTextStream("192.168.159.139",8888) //样例类转换 val dataStream: DataStream[SensorReading] = inputStream.map(data => { val array = data.split(",") SensorReading(array(0), array(1).toLong, array(2).toDouble) }) val resultData: DataStream[(String, Double, Double)] = dataStream .keyBy(_.id) // .flatMap(new TempChangeAlert(10.0)) .flatMapWithState[(String, Double, Double), Double]{ case (data:SensorReading,None) => (List.empty,Some(data.temperature)) case (data:SensorReading,lastTemp:Some[Double]) => { //跟最新的温度值求差值作比较 val diff = (data.temperature - lastTemp.get).abs if(diff > 10.0) (List((data.id,lastTemp.get,data.temperature)),Some(data.temperature)) else (List.empty,Some(data.temperature)) } } resultData.print() env.execute("TempChangeAlertDemo") } } /** * 实现自定义RichFlatmapFunction * @param threshold 跳变温度值 */ class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //定义状态保存上一次的温度值 lazy val lastTempSate:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp",classOf[Double])) lazy val flagState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean] ("flag",classOf[Boolean])) override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上次的温度值 val lastTemp = lastTempSate.value() //跟最新的温度值求差值作比较 val diff = (value.temperature - lastTemp).abs if(diff > threshold) out.collect((value.id,lastTemp,value.temperature)) //更新状态 lastTempSate.update(value.temperature) flagState.update(true) } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1879.html
上一篇
25.Flink状态管理介绍
下一篇
27.【转载】Flink ProcessFunction API全解析及实战
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
微服务
NIO
锁
稀疏数组
Jquery
Golang基础
JavaSE
MyBatisX
Flume
Flink
MyBatis
Zookeeper
Sentinel
人工智能
Java
MySQL
工具
Ubuntu
DataWarehouse
nginx
Scala
Hive
JavaWEB项目搭建
Git
JVM
Hbase
ClickHouse
序列化和反序列化
Stream流
正则表达式
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞