李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkCore之累加器
Leefs
2021-11-09 PM
1011℃
0条
[TOC] #### 前言 本篇将先从一个案例入手,对Driver端和Executer端执行过程进行一个简单了解,在深入讲解累加器。 ### 一、累加操作案例 #### 案例 **需求** > 将1,2,3,4进行累加求和操作 **代码** ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/9 * @description 1.0 **/ object Spark_WordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Spark_WordCount").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val dataRdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) var sum = 0 //foreach:将传入的值进行挨个遍历(属于分布式循环遍历) dataRdd.foreach( num => { sum += num } ) println("sum=" + sum) sc.stop() } } ``` **运行结果** ``` sum=0 ``` #### 结果分析 ![21.SparkCore之累加器01.jpg](https://lilinchao.com/usr/uploads/2021/11/1225231202.jpg) **说明** + Spark在Driver端会进行初始化操作使`sum=0`; + foreach是进行分布式执行,会将Driver端数据传输到多个Executor共同执行累加操作(数据传输的过程Spark中称为闭包检测); + 当Executor将自身的计算任务完成之后,**并不能够将计算后的结果返回到Driver端**; + Driver端传输到Executor端的数据会进行累加操作,但是Executor端并不能将计算好的结果返回给Driver端,导致Driver端的sum并没有发送变化; + 在Driver端输出sum的时候还是初始值0。 **Executor并不能将计算后的结果返回给Driver端** *注:Executor和Executor之间是独立的,不能互相读取数据* #### 结构改进 ![21.SparkCore之累加器02.jpg](https://lilinchao.com/usr/uploads/2021/11/3775771022.jpg) + Executor端将计算好的结果返回给Driver端; + Driver端再将各个Executor端返回的结果进行一个合并操作,得到最终结果。 *上方描述的也是累加器的主要功能。* ### 二、累加器概念 #### 2.1 定义 + 累加器:分布式共享只写变量。 #### 2.2 实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。 #### 2.3 累加器分类 + 系统累加器 + 自定义累加器 ### 三、系统累加器 #### 3.1 使用步骤 (1)声明累加器 ```scala val sum: LongAccumulator = sc.longAccumulator("sum") ``` (2)使用累加器 ```scala sum.add(count) ``` (3)获取累加结果 ```scala sum.value ``` #### 3.2 代码示例 **需求** > 对("a", 1), ("a", 2), ("a", 3), ("a", 4)出现次数进行打印 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/8 * @description 系统累加器 **/ object accumulator01_system { /** * 需求: * 对("a", 1), ("a", 2), ("a", 3), ("a", 4)出现次数进行打印 * @param args */ def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("accumulator01_system") val sc = new SparkContext(sparConf) //创建RDD val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //方式一:通过reduceByKey输出单词出现的次数,代码执行过程中存在shuffle操作,效率较低 // dataRDD.reduceByKey(_ + _).collect().foreach(println) //方式二:通过foreach循环 /*var sum = 0 //输出是在Executor端 dataRDD.foreach{ case (a,count) =>{ sum = sum + count println("sum="+sum) } } //输出是在Driver端 println("a"+sum)*/ //方式三:使用系统自带累加器 val sum = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) =>{ //调用累加器add()方法 sum.add(count) } } //获取累加结果 println("a="+sum.value) //关闭连接 sc.stop() } } ``` **运行结果** ``` a=10 ``` #### 3.3 累加器放在行动算子中 对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。 **代码示例** ```scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator /** * @author lilinchao * @date 2021/11/8 * @description 1.0 **/ object accumulator02_updateCount { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("accumulator02_updateCount") val sc = new SparkContext(sparConf) //创建RDD val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //定义累加器 val sum: LongAccumulator = sc.longAccumulator("sum") //统计累加器执行累加次数 val value: RDD[(String, Int)] = dataRDD.map(t => { //累加器添加数据 sum.add(1) t }) //调用两次行动算子,map执行两次,导致最终累加器的值翻倍 value.foreach(println) println("a1:"+sum.value) //收集操作 value.collect() //输出累加器累加次数 println("a2:"+sum.value) //关闭连接 sc.stop() } } ``` **运行结果** ``` (a,3) (a,4) (a,1) (a,2) a1:4 a2:8 ``` **说明** 从结果可以很明显的看出a1被调用了4次,多加了一个收集操作后到a2直接翻了一倍。 ### 四、自定义累加器 #### 4.1 版本 自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。 #### 4.2 实现步骤 ``` (1)继承AccumulatorV2,设定输入、输出泛型 (2)重写方法 ``` #### 4.3 案例 **需求** > 自定义累加器,统计RDD中首字母为“H”的单词以及出现的次数。 **代码** ```scala import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * @author lilinchao * @date 2021/11/8 * @description 自定义累加器 **/ object accumulator03_define { /** * 需求: * 自定义累加器,统计RDD中首字母为“H”的单词以及出现的次数 * List(“Hello”, “Hello”, “Hello”, “Hello”, “Hello”, “Spark”, “Spark”) * @param args */ def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("accumulator03_define") val sc = new SparkContext(sparConf) //创建RDD val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark"), 2) //创建累加器 val acc: MyAccumulator = new MyAccumulator() //注册累加器 sc.register(acc,"wordCount") //调用累加器 rdd.foreach( word => { acc.add(word) } ) //获取累加器的累加结果 println(acc.value) //关闭连接 sc.stop() } } /** * 声明累加器: * 1.继承AccumulatorV2,设定输入、输出泛型 * 2.重写方法 */ class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]] { //定义输出数据集合 var map = mutable.Map[String,Long]() //是否为初始化状态 //定义如果集合中数据为空,即为初始化状态 override def isZero: Boolean = map.isEmpty //复制累加器 override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() } //重置累加器 override def reset(): Unit = map.clear() //增加数据 override def add(v: String): Unit = { //业务逻辑操作 if(v.startsWith("H")){ map(v) = map.getOrElse(v,0L)+1L } } //合并累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { other.value.foreach{ case (word,count) => { map(word) = map.getOrElse(word,0L) + count } } } //累加器的返回结果 override def value: mutable.Map[String, Long] = map } ``` **运行结果** ``` Map(Hello -> 4) ``` *附参考文章链接:* *https://blog.csdn.net/weixin_42796403/article/details/111938041*
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1617.html
上一篇
Spark Core案例实操(十)
下一篇
SparkCore之广播变量
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
Spring
二叉树
稀疏数组
散列
Flume
链表
Http
并发编程
Typora
ajax
Scala
Spark Streaming
正则表达式
HDFS
Java
队列
字符串
JavaScript
Spark SQL
Python
高并发
Docker
BurpSuite
Ubuntu
栈
Hadoop
Kafka
序列化和反序列化
SpringBoot
Filter
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞