李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(二)
Leefs
2021-10-30 PM
939℃
0条
### 前言 本篇根据**Spark Core案例实操(一)**中需求继续对代码进行优化,减少shuffle,提高性能。 ### 五、实现方案三 在方案一和二中,reduceByKey算子使用过多,因为reduceByKey在进行聚合时也会存在shuffle,影响代码的整体性能。 #### 5.1 分析 + 在读取数据之后直接转换成如下结构: ``` 点击的场合 : ( 品类ID,( 1, 0, 0 ) ) 下单的场合 : ( 品类ID,( 0, 1, 0 ) ) 支付的场合 : ( 品类ID,( 0, 0, 1 ) ) ``` + 再将相同的品类ID的数据进行分组聚合 ``` ( 品类ID,( 点击数量, 下单数量, 支付数量 ) ) ``` 这样可以简化步骤,同时减少许多reduceByKey的使用。 #### 5.2 代码实现 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/10/30 * @description Top10热门品类 **/ object HotCategoryTop10Analysis03 { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(sparConf) // 1. 读取原始日志数据 val actionRDD = sc.textFile("datas/user_visit_action.txt") // 2. 将数据转换结构 val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap( action => { val datas = action.split("_") if(datas(6) != "-1"){ //点击的场合 List((datas(6),(1,0,0))) }else if(datas(8) != "null") { //下单的场合 val ids = datas(8).split(",") ids.map(id => (id,(0,1,0))) }else if(datas(10) !="null"){ //支付的场合 val ids = datas(10).split(",") ids.map(id => (id,(0,0,1))) }else{ Nil } } ) // 3. 将相同的品类ID的数据进行分组聚合 val analysisRDD = flatRDD.reduceByKey( (t1, t2) => { ( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 ) } ) // 4. 将统计结果根据数量进行降序处理,取前10名 val resultRDD = analysisRDD.sortBy(_._2, false).take(10) // 5. 将结果采集到控制台打印出来 resultRDD.foreach(println) sc.stop() } } ``` **运行结果** ``` (15,(6120,1672,1259)) (2,(6119,1767,1196)) (20,(6098,1776,1244)) (12,(6095,1740,1218)) (11,(6093,1781,1202)) (17,(6079,1752,1231)) (7,(6074,1796,1252)) (9,(6045,1736,1230)) (19,(6044,1722,1158)) (13,(6036,1781,1161)) ``` ### 六、实现方案四 通过自定义累加器实现,该过程没有shuffle,性能高。 ```scala package com.llc.spark.core import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * @author lilinchao * @date 2021/10/30 * @description Top10热门品类 **/ object HotCategoryTop10Analysis04 { def main(args: Array[String]): Unit = { val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis") val sc = new SparkContext(sparConf) // 1. 读取原始日志数据 val actionRDD = sc.textFile("datas/user_visit_action.txt") val acc = new HotCategoryAccumulator sc.register(acc, "hotCategory") // 2. 将数据转换结构 actionRDD.foreach( action => { val datas = action.split("_") if (datas(6) != "-1") { // 点击的场合 acc.add((datas(6), "click")) } else if (datas(8) != "null") { // 下单的场合 val ids = datas(8).split(",") ids.foreach( id => { acc.add( (id, "order") ) } ) } else if (datas(10) != "null") { // 支付的场合 val ids = datas(10).split(",") ids.foreach( id => { acc.add( (id, "pay") ) } ) } } ) val accVal: mutable.Map[String, HotCategory] = acc.value val categories: mutable.Iterable[HotCategory] = accVal.map(_._2) //3. 排序 val sort = categories.toList.sortWith( (left, right) => { if ( left.clickCnt > right.clickCnt ) { true } else if (left.clickCnt == right.clickCnt) { if ( left.orderCnt > right.orderCnt ) { true } else if (left.orderCnt == right.orderCnt) { left.payCnt > right.payCnt } else { false } } else { false } } ) // 4. 将结果采集到控制台打印出来 sort.take(10).foreach(println) sc.stop() } case class HotCategory(cid:String,var clickCnt:Int,var orderCnt:Int,var payCnt:Int) /** * 自定义累加器 * 1. 继承AccumulatorV2,定义泛型 * IN : ( 品类ID, 行为类型 ) * OUT : mutable.Map[String, HotCategory] * 2. 重写方法(6) */ class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]{ private val hcMap = mutable.Map[String, HotCategory]() // 累加器是否为初始状态 override def isZero: Boolean = { hcMap.isEmpty } // 复制累加器 override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = { new HotCategoryAccumulator() } // 重置累加器 override def reset(): Unit = { hcMap.clear() } // 向累加器中增加数据 (In) override def add(v: (String, String)): Unit = { val cid = v._1 val actionType = v._2 val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0,0,0)) if(actionType == "click"){ category.clickCnt += 1 }else if(actionType == "order"){ category.orderCnt += 1 }else if(actionType == "pay"){ category.payCnt += 1 } hcMap.update(cid,category) } // 合并累加器 override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = { val map1 = this.hcMap val map2 = other.value map2.foreach{ case ( cid, hc ) => { val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0,0,0)) category.clickCnt += hc.clickCnt category.orderCnt += hc.orderCnt category.payCnt += hc.payCnt map1.update(cid, category) } } } // 返回累加器的结果 (Out) override def value: mutable.Map[String, HotCategory] = hcMap } } ``` 运行结果 ``` HotCategory(15,6120,1672,1259) HotCategory(2,6119,1767,1196) HotCategory(20,6098,1776,1244) HotCategory(12,6095,1740,1218) HotCategory(11,6093,1781,1202) HotCategory(17,6079,1752,1231) HotCategory(7,6074,1796,1252) HotCategory(9,6045,1736,1230) HotCategory(19,6044,1722,1158) HotCategory(13,6036,1781,1161) ``` **说明** 累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后, 传回 Driver 端进行 merge。 ### 结尾 因为本篇使用的示例数据`user_visit_action.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1596.html
上一篇
Spark Core案例实操(一)
下一篇
Spark Core案例实操(三)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Golang基础
DataX
SQL练习题
Livy
排序
二叉树
Hbase
Scala
JavaWeb
容器深入研究
查找
HDFS
Redis
Filter
高并发
Flink
VUE
SpringCloudAlibaba
序列化和反序列化
算法
Java
Eclipse
BurpSuite
微服务
JavaScript
JVM
Netty
JavaWEB项目搭建
Azkaban
FastDFS
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞