李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
17.Flink流处理API之Sink
Leefs
2022-01-17 PM
910℃
0条
[TOC] ### 前言 使用 Flink 进行数据处理时,数据经 Data Source 流入,通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。 ### 一、概述 Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。 ```scala stream.addSink(new MySink(xxxx)) ``` 官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。 ![17.Flink流处理API之Sink01.jpg](https://lilinchao.com/usr/uploads/2022/01/3405132150.jpg) ### 二、输出到文件 ```scala import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/17 * Description 输出到文件 */ object SinkToFileTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[Student] = studentStream.map(data => { val array: Array[String] = data.split(",") Student(array(0).toInt, array(1), array(2).toDouble) }) //这两种方法已经被弃用 //直接写入文件 // studentData.writeAsText("") // studentData.writeAsCsv("") //新的输出到文件方式 //该种方式输出的文件带时间 studentData.addSink(StreamingFileSink.forRowFormat[Student]( new Path("E:\\data\\flink"), new SimpleStringEncoder[Student]("UTF-8") ).build()) env.execute("sink to file test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` **运行结果** ![17.Flink流处理API之Sink02.jpg](https://lilinchao.com/usr/uploads/2022/01/3928896107.jpg) ### 三、输出到Socket ```scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization._ /** * Created by lilinchao * Date 2022/1/17 * Description 写入Socket */ object SinkToSocketTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[Student] = studentStream.map(data => { val array: Array[String] = data.split(",") Student(array(0).toInt, array(1), array(2).toDouble) }) //根据 SerializationSchema 将数据输出到 socket studentData.writeToSocket("192.168.61.129",7777,new SerializationSchema[Student]{ override def serialize(t: Student): Array[Byte] = { (t + "\n").getBytes() } }) env.execute("sink to socket test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` **运行结果** ![17.Flink流处理API之Sink03.jpg](https://lilinchao.com/usr/uploads/2022/01/497614082.jpg) ### 四、输出到Kafka #### 4.1 引入依赖 ```xml
org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
``` #### 4.2 代码实现 ```scala import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka._ /** * Created by lilinchao * Date 2022/1/17 * Description 写入Kafka */ object SinkToKafkaTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[String] = studentStream.map(data => { val array: Array[String] = data.split(",") //使用toString方法转换成为String类型 Student(array(0).toInt, array(1), array(2).toDouble).toString }) studentData.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema())) env.execute("sink to kafka test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` ### 五、JDBC自定义Sink #### 5.1 数据准备 + **sensor.txt** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` #### 5.2 引入MySQL连接依赖 ```xml
mysql
mysql-connector-java
5.1.44
``` #### 5.3 代码实现 ```scala import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/17 * Description 自定义输出MySQL */ object SinkToJDBC { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputDStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataDstream: DataStream[SensorReading] = inputDStream.map( data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) //调用自定义JDBC连接 dataDstream.addSink( MyJdbcSink() ) dataDstream.print("mysql") env.execute("sink jdbc test") } } case class MyJdbcSink() extends RichSinkFunction[SensorReading]{ // 声明连接变量 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { // 创建连接和预编译语句 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456") insertStmt = conn.prepareStatement("insert into sensor_temp(id,temperature) values(?,?)") updateStmt = conn.prepareStatement("update sensor_temp set temperature = ? where id = ?") } // 每来一条数据,就调用连接,执行一次sql override def invoke(value: SensorReading): Unit = { // 直接执行udate语句,如果没有更新数据,那么执行insert updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() if(updateStmt.getUpdateCount == 0){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1856.html
上一篇
16.Flink实现UDF函数
下一篇
18.Flink window API介绍
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Zookeeper
JavaSE
散列
SpringCloud
NIO
JVM
Typora
SpringCloudAlibaba
MyBatisX
Shiro
LeetCode刷题
MyBatis-Plus
Beego
Ubuntu
FastDFS
Spark
并发线程
Nacos
ajax
查找
排序
Java阻塞队列
Spark Streaming
序列化和反序列化
GET和POST
锁
Java编程思想
Flume
Livy
Eclipse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞