李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
03.DStream创建
Leefs
2021-10-20 PM
1010℃
0条
[TOC] ### 一、RDD队列 #### 1.1 用法及说明 测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。 ### 1.2 案例实操 ##### 需求 > 循环创建几个RDD,将 RDD放入队列。通过SparkStream创建Dstream,计算WordCount。 ##### 实现代码 ```scala import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ object RDDStream { def main(args: Array[String]) { //1.初始化 Spark 配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //2.初始化 SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(4)) //3.创建 RDD 队列 val rddQueue = new mutable.Queue[RDD[Int]]() //4.创建 QueueInputDStream val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //5.处理队列中的 RDD 数据 val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印结果 reducedStream.print() //7.启动任务 ssc.start() //8.循环创建并向 RDD 队列中放入 RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } ssc.awaitTermination() } } ``` **运行结果** ![03.DStream创建01.jpg](https://lilinchao.com/usr/uploads/2021/10/1942316752.jpg) ### 二、自定义数据源 #### 2.1 用法及说明 首先要实现一个Receiver。自定义接收方必须通过实现两个方法扩展此抽象类Receiver - onStart(): 开始接收数据要做的事情 - onStop():停止接收数据要做的事情 不能无限期地阻塞 onStart ()和 onStop ()。通常,onStart ()将启动负责接收数据的线程,而 onStop ()将确保这些接收数据的线程停止。接收线程还可以使用接收方法 isStopped ()来检查它们是否应该停止接收数据。 一旦接收到数据,数据就可以通过调用 store (data)存储在 Spark 内部,这是由 Receiver 类提供的一种方法。Store ()有许多种类型,允许一次存储接收到的数据记录或作为对象/序列化字节的整个集合存储。 应该正确捕获和处理接收线程中的任何异常,以避免接收方的静默故障。 + Restart (< exception >)将通过异步调用 onStop ()并在延迟后调用 onStart ()来重新启动接收器。 + Stop (< exception >)将调用 onStop ()并终止接收器。 + reportError (< error >)会向驱动程序报告错误消息(在日志和 UI 中可见) ,而不会停止/重新启动接收器。 #### 2.2 案例实操 **需求** > 自定义数据源,实现监控某个端口号,获取该端口号内容。 **(1)自定义数据源** ```scala import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ class CustomerReceiver (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //读数据并将数据发送给Spark def receive(): Unit = { //创建一个Socket var socket: Socket = new Socket(host, port) //定义一个变量,用来接收端口传过来的数据 var input: String = null //创建一个BufferedReader用于读取端口传来的数据 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8)) //读取数据 input = reader.readLine() //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循环则关闭资源 reader.close() socket.close() //重启任务 restart("restart") } override def onStop(): Unit = {} } ``` **(2)使用自定义的数据源采集数据** ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author lilinchao * @date 2021/8/13 * @description 1.0 **/ object FileStream { def main(args: Array[String]): Unit = { //1.初始化 Spark 配置信息 val sparkConf = new SparkConf().setMaster("local[*]") .setAppName("StreamWordCount") //2.初始化 SparkStreamingContext val ssc = new StreamingContext(sparkConf, Seconds(5)) //3.创建自定义 receiver 的 Streaming val lineStream = ssc.receiverStream(new CustomerReceiver("192.168.10.7", 9999)) //4.将每一行数据做切分,形成一个个单词 val wordStream = lineStream.flatMap(_.split("\t")) //5.将单词映射成元组(word,1) val wordAndOneStream = wordStream.map((_, 1)) //6.将相同的单词次数做统计 val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _) //7.打印 wordAndCountStream.print() //8.启动 SparkStreamingContext ssc.start() sc.awaitTermination() } } ``` **运行结果** ![03.DStream创建02.jpg](https://lilinchao.com/usr/uploads/2021/10/321462242.jpg)
标签:
Spark
,
Spark Streaming
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1581.html
上一篇
02.DStream入门
下一篇
04.SparkStreaming之Kafka数据源
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Shiro
国产数据库改造
JavaWeb
Kibana
持有对象
Java工具类
MyBatis-Plus
JVM
Jenkins
Kafka
MyBatis
MyBatisX
ClickHouse
HDFS
Spark Core
算法
人工智能
gorm
Redis
Jquery
数据结构
JavaSE
SpringCloudAlibaba
递归
队列
Elastisearch
FastDFS
MySQL
序列化和反序列化
Hive
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞