李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
14.Flink流处理API之Transform转换算子
Leefs
2022-01-13 PM
1011℃
0条
[TOC] #### 1、Map + **作用** 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素。 ![14.Flink流处理API之Transform转换算子01.png](https://lilinchao.com/usr/uploads/2022/01/1201784901.png) + **示例** > 需求:使用Map将数据转换成样例类 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 使用Map将数据转换成样例类 */ object StreamMapTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromElements val dataStream: DataStream[String] = env.fromElements("张三,10","李四,12","王五,14") //3.使用Map将数据封装成样例类 val userData: DataStream[User] = dataStream.map(m => { User(m.split(",")(0), m.split(",")(1)) }) //4.打印输出 userData.print() env.execute() } case class User(name:String,age:String) } ``` #### 2、flatMap + **作用** 消费一个元素并产生零个或多个元素 + **示例** > 需求:分别将以下数据,转换成 国家 、省份 、城市三个维度的数据。 ```scala 将"张三,中国,江西省,南昌市","李四,中国,河北省,石家庄市","Tom,America,NewYork,Manhattan" 转换为: 张三,中国 张三,中国江西省 张三,中国江西省南昌市 ``` **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 分别将以下数据,转换成 国家 、省份 、城市 三个维度的数据。 */ object StreamFlatMapTest { /** * 1.构建批处理运行环境 * 2.构建本地集合数据源 * 3.使用 flatMap 将一条数据转换为三条数据 * a. 使用逗号分隔字段 * b. 分别构建国家、国家省份、国家省份城市三个元组 * 4.打印输出 */ def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection构建数据集 val dataStream = env.fromCollection(List("张三,中国,江西省,南昌市", "李四,中国,河北省,石家庄市", "Tom,America,NewYork,Manhattan")) val flatMapData: DataStream[(String, String)] = dataStream.flatMap(line => { val arr = line.split(",") List( (arr(0), arr(1)), (arr(0), arr(1) + arr(2)), (arr(0), arr(1) + arr(2) + arr(3)) ) }) //3.输出结果 flatMapData.print() env.execute("stream flatmap test") } } ``` #### 3、Filter + **作用** 根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃 ![14.Flink流处理API之Transform转换算子02.png](https://lilinchao.com/usr/uploads/2022/01/501101129.png) + **示例** > 需求:使用filter过滤掉大于10的数字 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 需求:使用filter过滤掉大于10的数字 */ object StreamFilterTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建数据集 val dataStream = env.fromElements(1,2,3,4,10,12,11,14,5,6) //3.使用 filter 操作执行过滤 val filter = dataStream.filter(_ < 10) //4.结果输出 filter.print() env.execute("stream filter test") } } ``` #### 4、KeyBy + **作用** 把流中的数据分到不同的分区(并行度)中.具有`相同key的元素`会分到同一个分区中.一个分区中可以有多重不同的key。 在内部是使用的是key的`hash分区`来实现的。 ![14.Flink流处理API之Transform转换算子03.png](https://lilinchao.com/usr/uploads/2022/01/3032307013.png) + **说明** + 对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。 + keyBy会将一个DataStream转化为一个KeyedStream,聚合操作会将KeyedStream转化为DataStream。 + 如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。 ![14.Flink流处理API之Transform转换算子04.png](https://lilinchao.com/usr/uploads/2022/01/2673194641.png) + **使用** > dataStream.keyby(param) **param:**数据字段下标 默认从0开始,也可以输入id的字段,就会按照id分流 + **示例** > 需求:使用keyBy对集合数据进行转换,最后通过reduce进行聚合操作 **代码** ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 使用keyBy对集合数据进行转换,最后通过reduce进行聚合操作 */ object StreamKeyByTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection val dataStream: DataStream[String] =env.fromCollection(Array("hello1 flink","hello1 world1","hello2 world1","hello3 world2")) //3.通过keyBy将DataStream转换成KeyedStream val keyByedData: KeyedStream[(String, String), Tuple] = dataStream.map(r => { val array: Array[String] = r.split(" ") (array(0), array(1)) }).keyBy(0) //4.通过reduce聚合操作,将KeyedStream转换成DataStream //一个分组数据流的聚合操作,合并当前的元素 //和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果 val reduceData: DataStream[(String, String)] = keyByedData.reduce((v1, v2) => (v1._1 + "---" + v2._1, v1._2 + "---" + v2._2)) reduceData.print("stream") env.execute("keyby test") } } ``` #### 5、滚动聚合算子(Rolling Aggregation) + **作用** + sum :求和 + max : 选择每条流的最大值 + min : 选择每条流的最小值 + minby : 针对 keyedStream中的某个字段数据进行选择最小值 + maxby : 针对 keyedStream中的某个字段数据进行选择最大值 + **示例** > 需求:对集合中的数据进行分区后,再进行滚动聚合操作 **代码** ```java import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamRollingAggregation { def main(args: Array[String]): Unit = { //1.构建运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromCollection val arrayData: DataStream[String] = env.fromCollection(Array("spark,2","spark,1","scala,1","scala,2","flink,3")) //3.通过map对数据进行类型转换,再使用keyBy对数据重分区 val keyByedData: KeyedStream[(String, Int), Tuple] = arrayData.map(r => { val array = r.split(",") (array(0), array(1).toInt) }).keyBy(0) //4.对重分区好的数据进行滚动聚合操作 val sumedDS: DataStream[(String, Int)] = keyByedData.sum(1) val minedDS: DataStream[(String, Int)] = keyByedData.min(1) val maxedDS: DataStream[(String, Int)] = keyByedData.max(1) val minByedDS: DataStream[(String, Int)] = keyByedData.minBy(1) val maxByedDS: DataStream[(String, Int)] = keyByedData.maxBy(1) //5.输出结果 sumedDS.print("stream1") minedDS.print("stream2") maxedDS.print("stream3") minByedDS.print("stream4") maxByedDS.print("stream5") env.execute() } } ``` #### 6、Reduce + **作用** 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。 为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值! + **示例** > 需求:获得各学生成绩总和 **代码** ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 获得各学生成绩总和 */ object StreamReduceTest { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.使用fromElements val dataStream: DataStream[String] = env.fromElements("张三,10","李四,12","王五,14","张三,12","李四,8") //3.根据name对成绩通过reduce进行累加操作 val studentData: DataStream[Student] = dataStream.map(line => { val data = line.split(",") Student(data(0), data(1).toInt) }).keyBy("name").reduce((x,y) => Student(x.name,x.score+y.score)) //4.输出结果 studentData.print() env.execute("stream reduce test") } case class Student(name:String,score:Int) } ``` #### 7、Split和Select(已废弃) + **作用** **split** DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。 ![14.Flink流处理API之Transform转换算子05.png](https://lilinchao.com/usr/uploads/2022/01/2558627232.png) **Select** SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。 ![14.Flink流处理API之Transform转换算子06.png](https://lilinchao.com/usr/uploads/2022/01/3033594272.png) + **示例** ```java import org.apache.flink.streaming.api.scala._ object StreamSplitAndSelect { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val arr: Array[(String, Int)] = Array(("scala", 1),("spark", 2), ("flink", 3), ("scala", 4)) val ds: DataStream[(String, Int)] = env.fromCollection(arr) val splitedSS: SplitStream[(String, Int)] = ds.split(r => { if (r._2 > 2) Seq("big") else Seq("small") }) val bigDS: DataStream[(String, Int)] = splitedSS.select("big") val smallDS: DataStream[(String, Int)] = splitedSS.select("small") val allDS: DataStream[(String, Int)] = splitedSS.select("big", "small") bigDS.print("bigDS") smallDS.print("smallDS") allDS.print("allDS") env.execute() } } ``` split在fink 1.13版本之后因为性能等原因,被SideOutput函数给替换,在后面会对SideOutput做详细介绍,本篇将不再过多叙述。 #### 8、Connect和CoMap + **作用** **connect** **DataStream,DataStream → ConnectedStreams:**连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 ![14.Flink流处理API之Transform转换算子07.png](https://lilinchao.com/usr/uploads/2022/01/54086602.png) **CoMap,CoFlatMap** **ConnectedStreams → DataStream:**作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一 个Stream分别进行map和flatMap处理。 ![14.Flink流处理API之Transform转换算子08.png](https://lilinchao.com/usr/uploads/2022/01/2200907536.png) + 注意 1. 两个流中存储的数据类型`可以不同`; 2. 只是机械的合并在一起, 内部仍然是分离的2个流; 3. 只能2个流进行connect, `不能`有第3个参与; 4. 把两个流连接在一起,输入类型可以不同,但是返回的类型必须是一致。 + 示例 > 需求:将两个数据集合并到一个流当中,对数值类型进行加减操作 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamConnectAndComap { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建两个数据集 val arrayData1: DataStream[(String, Int)] = env.fromCollection(Array(("flink",1),("spark",2),("scala",3),("java",4))) val arrayData2: DataStream[(Int, String)] = env.fromCollection(Array((1,"flink"),(2,"spark"),(3,"scala"))) //3.通过connect函数对两个数据集进行关联 val connectStream: ConnectedStreams[(String, Int), (Int, String)] = arrayData1.connect(arrayData2) //4.分别对两个数据集进行相应操作 val coMap: DataStream[(Any, Any)] = connectStream.map(r1 => (r1._1, r1._2 - 5), r2 => (r2._1 + 5, r2._2)) //5.输出结果 coMap.print() env.execute() } } ``` #### 9、Union + **作用** 对`两个或者两个以上`的DataStream进行union操作,产生一个包含所有DataStream元素的`新DataStream` ![14.Flink流处理API之Transform转换算子09.png](https://lilinchao.com/usr/uploads/2022/01/3170407162.png) + **示例** > 需求:合并两个数据集 ```java import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/13 * Description 1.0 */ object StreamUnion { def main(args: Array[String]): Unit = { //1.构建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.构建两个数据集 val arrayData1: DataStream[(String, Int)] = env.fromCollection(Array(("flink",1),("spark",2))) val arrayData2: DataStream[(String, Int)] = env.fromCollection(Array(("scala",3),("java",4))) //3.合并两个数据集 val unionData: DataStream[(String, Int)] = arrayData1.union(arrayData2) //4.输出结果 unionData.print("stream") env.execute() } } ``` **Connect与Union 区别:** 1. Union之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。 2. Connect只能操作两个流,Union 可以操作多个。
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1841.html
上一篇
13.Flink流处理API之Source
下一篇
15【转载】Flink数据类型和序列化
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
Flink
Docker
Jenkins
Filter
随笔
机器学习
链表
Spark Core
Beego
SpringCloud
Linux
Sentinel
HDFS
Spark Streaming
递归
Elastisearch
JavaWEB项目搭建
FileBeat
算法
Python
SQL练习题
GET和POST
国产数据库改造
Java编程思想
Flume
MyBatis
高并发
Azkaban
ajax
JavaSE
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞