李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
05.【转载】Spark RDD转换算子
Leefs
2021-06-29 AM
1481℃
0条
[TOC] ### 前言 ![05.Spark RDD转换算子.png](https://lilinchao.com/usr/uploads/2021/06/659710740.png) ## 转换算子 RDD 根据数据处理方式的不同,将算子整体上分为 Value 类型、双 Value 类型 和 Key-Value类型。 ### Value类型 #### map ```scala def map(f: T => U): RDD[U] ``` 说明:将RDD中类型为T的元素,**一对一地映射**为类型为U的元素,这里的转换可以是**类型的转换**,也可以是**值的转换** ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 值的转换 ==> List(2, 4, 6, 8) val mapRDD: RDD[Int] = rdd.map( _ * 2 ) mapRDD.collect().foreach(println) // 类型的转换 ==> List("1", "2", "3", "4") val mapRDD1: RDD[String] = rdd.map( _ + "" ) mapRDD1.collect().foreach(println) ``` + 技巧:当map转换复杂的数据类型时,通过 **模式匹配** 简洁表达 ```scala val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4))) val mapRdd: RDD[String] = rdd.map( (tuple: (String, Int)) => { tuple._1 + tuple._2 // "a1","b2"... } ) val mapRdd1: RDD[String] = rdd.map { //最外层是 { } case (str, num) => { str + num } } ``` #### mapPartitions ```scala def mapPartitions( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] ``` 说明:将待处理的数据 **以分区为单位** 发送到计算节点进行处理,输入参数为RDD中每一个分区的迭代器。参数二`preservesPartitioning`是否保留父RDD的分区信息。 示例:获取每个数据分区的最大值 ```scala val rdd = sc.makeRDD(List(1, 2, 3, 4), 2) // 传入 f: Iterator => Iterator val mpRDD: RDD[Int] = rdd.mapPartitions( iterator => { List(iterator.max).iterator } ) mpRDD.collect().foreach(println) output: 2 4 ``` 思考:**map 和 mapPartitions 的区别?** - 数据处理角度 Map 算子是读一个record计算一个record,类似于**串行**操作。而 mapPartitions 算子是**以分区为单位进行批处理**操作。 - 功能的角度 Map 算子主要目的将数据源中的数据进行转换,不会减少或增多数据,**映射前后维度不变**。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,**可以增加或减少数据**。 - 性能的角度 Map 算子因为类似于串行操作,所以性能比较低,而 mapPartitions 算子类似于批处理,所以性能较高。 但是 mapPartitions 算子会将整个分区的数据加载到内存进行引用,那么这样会导致内存可能不够用,出现内存溢出的错误。**所以在内存有限的情况下,不推荐使用MapPartitions**。 思考:**mapPartitions 使用场景?** 如果在映射过程中需要频繁创建额外的对象,mapPartitions 可以使 RDD中各个分区可以共享同一个对象以提高性能。 思考:如何理解Map 算子类似于串行,而mapPartitions 算子是以分区为单位进行批处理操作呢? + map ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1) //单个分区 val mapRDD1: RDD[Int] = rdd.map( num => { println(">>>>>>>> " + num) num } ) val mapRDD2: RDD[Int] = mapRDD1.map( num => { println("-------- " + num) num } ) mapRDD2.collect() ``` output: ```scala >>>>>>>> 1 -------- 1 >>>>>>>> 2 -------- 2 >>>>>>>> 3 -------- 3 >>>>>>>> 4 -------- 4 ``` 当分区个数为1时,只有当前面一个record全部的逻辑执行完毕后,才会执行下一个数据(串行)。分区内数据的执行是有序的。 当分区格式为2时,不同分区之间是并行执行的,无先后顺序;而同一分区内的数据,满足有序性,逐个执行(串行)。 + mapPartitions ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) // 传入 f: Iterator => Iterator val mpRDD: RDD[Int] = rdd.mapPartitions( // iterator代表一个分区的迭代器 iterator => { println(">>>>>>>>>>") iterator.map(_ * 2) //一次性加载整个分区,然后对该分区进行map转换,类似于批处理 } ) mpRDD.collect() output: ">>>>>>>>>>" ">>>>>>>>>>" ``` - 小结: map的实现:同一分区内的数据,必须等待全部的逻辑执行完毕,才会加载下一个数据,这就是串行; mapPartitions的实现: 会先将分区内的全部数据加载到内存中,然后执行逻辑。 ![05.Spark RDD转换算子02.png](https://lilinchao.com/usr/uploads/2021/06/3547201911.png) #### mapPartitionsWithIndex ```scala def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] ``` 说明:将待处理的数据 **以分区为单位** 发送到计算节点进行处理,在处理时同时可以**获取当前分区索引**。 示例:获取第二个数据分区的数据 ```scala val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3) //分区结果 ==> 【1】,【2,3】,【4,5】 //获取第2个分区的数据 val mpiRDD = rdd.mapPartitionsWithIndex( (index, iterator) => { if (index == 1) { iterator } else { Nil.iterator } } ) mpiRDD.collect().foreach(println)// 【2,3】 ``` #### flatMap ```scala def flatMap(f: T => TraversableOnce[U]): RDD[U] ``` 说明:将RDD中的每一个元素进行一对多转换,然后扁平化 强调:`f: T => TraversableOnce[U]`的返回值**必须是可遍历集合,不能是标量**。 ```scala // 先映射后打散,只需传入映射逻辑 val rdd1: RDD[String] = sc.makeRDD(List("hello spark", "hello scala")) // "hello spark" ==> Array["hello", "spark"] ==> "hello", "spark" val flatRDD1: RDD[String] = rdd1.flatMap( s => s.split(" ") ) flatRDD1.collect().foreach(println) // hello // spark // hello // scala ``` 示例:将 `List(List(1,2),3,List(4,5))` 进行扁平化操作 ```scala val rdd: RDD[Any] = sc.makeRDD(List( List(1, 2), 3, List(4, 5) )) // List中元素类型不同,需模式匹配 val flatRDD = rdd.flatMap{ case list: List[_] => list case a: Int => List(a) } flatRDD.collect().foreach(println) // 1 2 3 4 5 ``` #### glom ```scala def glom(): RDD[Array[T]] ``` 说明:将 **同一个分区** 的数据直接转换为相同类型的内存**数组**进行处理,分区不变,每个分区只有一个数组元素。 示例: 计算所有分区最大值求和(分区内取最大值,分区间求和) ```scala val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5), 2) // 【1,2】,【3,4,5】 // List(1,2,3,4) => RDD(Array(1,2), Array(3,4,5)) val glomRDD: RDD[Array[Int]] = rdd.glom() // Array(1,2) -> 2; Array(3,4,5) -> 5 val maxRDD: RDD[Int] = glomRDD.map( arr => arr.max ) // RDD(2, 5) => 归约(相加) val res: Int = maxRDD.reduce(_ + _) println(res) // res = 7 ``` + 补充:使用行动算子`aggregate`一步实现 ```scala val res: Int = rdd.aggregate(0)(math.max(_, _), _ + _) ``` + 思考:如何理解分区不变性? ```scala // 将RDD保存到目录下,以观察分区情况 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2) rdd.saveAsTextFile("output1") val mapRDD: RDD[Int] = rdd.map(_ * 2) mapRDD.saveAsTextFile("output2") /* 原始rdd的每条record会有分区号,经过map操作后,依然在相同的分区中 output1 part-00000 1 2 part-00001 3 4 output2 part-00000 2 4 part-00001 6 8 */ ``` ![05.Spark RDD转换算子03.png](https://lilinchao.com/usr/uploads/2021/06/3123702281.png) 与分区不变性相对立的是shuffle,下面介绍的算子groupBy涉及到shuffle过程。 #### groupBy ```scala def groupBy (f: T => K): RDD[(K, Iterable[T])] ``` 说明: ```scala /** * groupBy(f: T => K ) 将数据源中的元素映射到key上 * T是数据源元素的类型,K为任意类型 * * groupBy将数据源中的每一个数据进行f映射,根据返回的分组key进行分组 * 相同的key值的数据会放置在一个可迭代的集合中,即Iterable()中 */ ``` 示例1:按奇偶分组 ```scala val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) // 按奇偶分组 val groupRDD1: RDD[(Int, Iterable[Int])] = intRdd.groupBy( (num: Int) => { num % 2 } ) groupRDD1.collect().foreach(println) // (0,CompactBuffer(2, 4, 6)) // (1,CompactBuffer(1, 3, 5)) ``` 示例2:按单词首字母分组 ```scala val strRdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2) //按首字母分组 val groupRDD2: RDD[(Char, Iterable[String])] = strRdd.groupBy( word => word.charAt(0) ) groupRDD2.collect().foreach(println) // (H,CompactBuffer(Hello, Hadoop)) // (S,CompactBuffer(Spark, Scala)) ``` + 思考:分组和分区有什么关系? **分组后,一个组的数据会在一个分区中,但是并不是说一个分区中只有一个组,一句话:分组和分区没有必然的关系。** ```scala val intRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2) // 原集合分区 intRdd.saveAsTextFile("output1") val groupRDD: RDD[(Int, Iterable[Int])] = intRdd.groupBy( (num: Int) => { num % 2 } ) // 分组后 groupRDD.saveAsTextFile("output2") ``` 结果: ```scala output1: part-00000 1 2 3 part-00001 4 5 6 output2: part-00000 (0,CompactBuffer(2, 4, 6)) part-00001 (1,CompactBuffer(1, 3, 5)) ``` ![05.Spark RDD转换算子04.png](https://lilinchao.com/usr/uploads/2021/06/233924564.png) #### filter ```scala def filter(f: T => Boolean): RDD[T] ``` 说明:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃(返回True的保留,False丢弃) 示例:过滤,只保留偶数 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) val filterRdd: RDD[Int] = rdd.filter(num => num % 2 == 0) filterRdd.collect().foreach(println) // 2 4 6 ``` **当数据进行筛选过滤后,分区不变**,但是分区内的数据可能不均衡,即数据倾斜。 ```scala val rdd: RDD[Int] = sc.makeRDD(List(2, 4, 6, 8, 1, 3, 5, 8), 2) val filterRdd: RDD[Int] = rdd.filter(num => num % 2 == 0) // 原分区: 【2,4,6,8】 【1,3,5,8】 // 过滤分区不变:【2,4,6,8】 【8】 <== 不同分区的数据不均衡 ``` #### distinct ```scala def distinct(): RDD[T] def distinct(numPartitions: Int): RDD[T] ``` 说明:将数据集中重复的数据去重 ```scala /** * 空参 distinct() 调用的实际是 distinct(partitions.length) * 其中,distinct(numPartitions: Int) 去重原理为 * map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) */ ``` 示例:去重 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4)) rdd.distinct() .collect().foreach(println) // 1 2 3 4 ``` #### coalesce ```scala def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] ``` 说明:根据数据量**增减分区**,用于大数据集过滤后,提高小数据集的执行效率。 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩分区,减少分区的个数,减小任务调度成本。 - 缩小分区(N > M)且 N 和 M 相差不多的两种形式 ```scala /* 1. coalesce方法默认情况下不会将分区的数据打乱重新组合 例如元素3和4原本同一分区,那么缩减后仍会处于同一分区(窄依赖) 这种情况下的缩减分区可能会导致数据倾斜 */ val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3) val newRDD: RDD[Int] = rdd.coalesce(2) newRDD.saveAsTextFile("output") // 产生两个分区:分别为【1,2】、【3,4,5,6】 ``` ![05.Spark RDD转换算子05.png](https://lilinchao.com/usr/uploads/2021/06/2228380108.png) ```scala // 2. 如果想要让数据均衡,可以进行shuffle处理,第二个参数为True(宽依赖) val newRDD: RDD[Int] = rdd.coalesce(2, true) newRDD.saveAsTextFile("output") ``` ![05.Spark RDD转换算子06.png](https://lilinchao.com/usr/uploads/2021/06/3587483232.png) - 缩小分区( N > M)且N和M差距悬殊(比如N=1000,M=1) 如果不进行shuffle,由于父子RDD是窄依赖,他们同处于一个Stage中,就可能造成Spark程序运行的并行度不够(Task个数由Stage的最后一个RDD的分区个数决定)。比如M=1时,由于只有一个分区,所以只会有一个Task运行,为了使coalesce之前的操作有更好的并行度,可以将shuffle参数设为true。 ![05.Spark RDD转换算子07.png](https://lilinchao.com/usr/uploads/2021/06/688231220.png) - 我想要扩大分区(N < M),怎么办? 一般情况下N个分区由于数据分布不均,利用HashPartitioner函数将数据重新分区为M个,这时必须将shuffle参数设为为True。 **扩大分区个数,如果不进行shuffle操作,是没有意义的,无法改变RDD分区数目**: ```scala val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2) val newRDD: RDD[Int] = rdd.coalesce(3, shuffle = true) ``` spark提供了一个简化的操作`repartition`,专门用于扩大分区, 底层代码调用的就是coalesce,而且采用shuffle。 #### repartition ```scala def repartition(numPartitions: Int): RDD[T] = coalesce(numPartitions, shuffle = true) ``` 示例: ```scala val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2) rdd.repartition(3) .saveAsTextFile("output") // 【1,6】 【2,5】 【3,4】 ``` - 思考:**coalesce 和 repartition 区别?** coalesce 和 repartition 本质是相同的,后者底层代码调用的就是coalesce,且一定要经过shuffle。 习惯上**减少分区使用coalesce, 扩大分区使用repartition 。** #### sortBy ```scala def sortBy( f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length): RDD[T] ``` 说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列,第二个参数为False为降序。 默认排序前后 RDD 的**分区数一致**,**中间存在 shuffle 的过程**。 示例1: ```scala val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 6, 5, 4, 3), 2) val newRDD: RDD[Int] = rdd.sortBy(num=>num) newRDD.saveAsTextFile("output") // 两个分区为 【1,2,3】 【4,5,6】,所以经历了shuffle ``` 示例2: ```scala val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2) // 按元组的第一个元素,降序 val sortRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1, false) sortRDD.collect().foreach(println) // ("2",3) ("11",2) ("1",1) ``` #### sample* ```scala def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] ``` 说明:根据指定的规则从数据集中抽取数据 - 不放回 ```scala val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) /* 抽取数据不放回(伯努利算法) 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要 第一个参数:抽取的数据是否放回,false:不放回 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; 第三个参数:随机数种子,种子相同随机结果也是相同的,不传递的话默认值为当前系统时间 */ println(rdd.sample( false, 0.3 ).collect().mkString(",")) // 7,8,9 ``` - 放回 ```scala /* 抽取数据放回(泊松算法) 第一个参数:抽取的数据是否放回,true:放回 第二个参数:表示数据源中的每条数据被抽取的可能次数 第三个参数:随机数种子 */ println(rdd.sample( true, 2 ).collect().mkString(",")) // 1,1,2,2,4,6,6,6,6,6,7,8,8,9,10 ``` - 思考:抽样函数有什么用呢? 对发生数据倾斜的分区数据集,进行多次抽样,从样本中分析数据的分布。 ### 双 Value 类型 方法签名: ```scala // 交集 def intersection(other: RDD[T]): RDD[T] // 并集 def union(other: RDD[T]): RDD[T] // 差集 def subtract(other: RDD[T]): RDD[T] // 拉链,形成元组 def zip(other: RDD[U]): RDD[(T, U)] ``` 示例: ```scala val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4)) val rdd2: RDD[Int] = sc.makeRDD(List(3,4,5,6)) // 交集 : 【3,4】,会去重 val rdd3: RDD[Int] = rdd1.intersection(rdd2) println(rdd3.collect().mkString(",")) // 并集 : 【1,2,3,4,3,4,5,6】,不会去重 val rdd4: RDD[Int] = rdd1.union(rdd2) println(rdd4.collect().mkString(",")) // 差集 : 【1,2】 val rdd5: RDD[Int] = rdd1.subtract(rdd2) println(rdd5.collect().mkString(",")) // 拉链 : (1,3),(2,4),(3,5),(4,6) val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) println(rdd6.collect().mkString(",")) ``` 特点: - **交集、并集 和 差集要求两个数据源数据类型一致** - 拉链操作:两个数据源的类型**可以不一致** ```scala val rdd7 = sc.makeRDD(List("a","b","c","d")) val rdd8 = rdd1.zip(rdd7) println(rdd8.collect().mkString(",")) // (1,a),(2,b),(3,c),(4,d) ``` - 拉链操作:两个RDD要求**分区数量要保持一致,分区中数据量保持一致** ```scala val rdd1 = sc.makeRDD(List(1,2,3,4),2) val rdd2 = sc.makeRDD(List(3,4,5,6),3) // val rdd3: RDD[(Int, Int)] = rdd1.zip(rdd2) 分区数量不一致,异常 val rdd4 = sc.makeRDD(List(3,4,5,6,7,8), 2) // val rdd5: RDD[(Int, Int)] = rdd1.zip(rdd5) 分区中数据量不一致,异常 ``` 注:scala语法中,两个集合zip操作,不要求元素个数相同。 - 是否存在shuffle? 一般情况下,intersection和subtract都会有shuffle过程;而union是窄依赖(RangeDependency ),不存在shuffle,如下图所示。 ![05.Spark RDD转换算子08.png](https://lilinchao.com/usr/uploads/2021/06/816701985.png) ### Key - Value 类型 Value 类型 与 Key - Value 类型区别在于,前者更为广泛,单值`RDD[U]`与键值`RDD[(K,V)]`都适用;后者只适用于`RDD[(K,V)]`。 #### partitionBy ```scala def partitionBy(partitioner: Partitioner): RDD[(K, V)] ``` 说明:将数据 **基于 key 按照指定 Partitioner 重分区**。Spark 默认的分区器是 HashPartitioner。 按照指定的分区器,对Key进行计算得到新的分区号,从而对数据重新分区。 ```scala val rdd: RDD[(Int, String)] = sc.makeRDD( Array((1,"aaa"),(2,"bbb"),(3,"ccc"), (4,"ddd")), 2) /* HashPartitioner(2) 传入分区数为2,也可以与原分区数不同 */ val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2)) value.saveAsTextFile("output") // Key 按照哈希分区器划定分区,【(1,"aaa"),(3,"ccc")】 【(2,"bbb"),(4,"ddd")】 ``` - 补充:`partitionBy()`是PairRDDFunctions类中的方法,那么RDD为何可以调用呢? ```scala /* 因为存在 隐式转换(二次编译),RDD => PairRDDFunctions */ abstract class RDD {...} object RDD{ implicit def rddToPairRDDFunctions(rdd: RDD) = new PairRDDFunctions(rdd) ... } ``` - 思考:如果重分区的分区器和当前 RDD 的分区器一样怎么办? ```scala /* 当【分区器类别 + 分区数量】相同时,就不会创建新的RDD,返回当前RDD 二者有任一不同,将创建新的RDD返回 */ val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2)) val value1: RDD[(Int, String)] = value.partitionBy(new HashPartitioner(2)) println(value1 == value) // true ``` - 思考:Spark 还有其他分区器吗? 常见的有 HashPartitioner、RangePartitioner - 思考:如果想按照自己的方法进行数据分区怎么办? 自定义分区器,继承 Partitioner #### mapValues ```scala def mapValues(f: V => U): RDD[(K, U)] ``` 说明:针对KV类型的映射map,当K不变,只对V进行映射时,可采用mapValues简化 示例:wordCount ```scala val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val flatRdd: RDD[String] = rdd.flatMap(_.split(" ")) val groupRdd: RDD[(String, Iterable[String])] = flatRdd.groupBy(str => str) // 使用map,(k1,V1) -> (k2,v2) val value: RDD[(String, Int)] = groupRdd.map{ case (a, b) => { (a, b.size) } } // 使用mapValues,(K,V) -> (K,U) val value: RDD[(String, Int)] = groupRdd.mapValues( iter => iter.size ) outout: // (Spark,1) // (Hello,2) // (Scala,1) ``` #### reduceByKey ```scala // 泛型为[K, V],V代表value的类型 def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] ``` 说明:相同的key的数据进行value的聚合操作(两两聚合),**传入的func表示两个val的聚合逻辑**。如果key的数据只有一个,是不会参与运算的,直接返回。 示例:wordCount ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // 【1,2,3】-> 【3,3】 -> 【6】 val value: RDD[(String, Int)] = rdd.reduceByKey( (x: Int, y: Int) => { x + y } ) value.collect().foreach(println) // (a,6) // (b,4) ``` #### groupByKey ```scala def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] ``` 说明:将数据源的数据按照key ,对 value 进行分组 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("a", 3), ("b", 4) )) // groupByKey : 针对[K,V]类型,将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组 // 元组中的第一个元素就是key,元组中的第二个元素就是相同key的value的集合 val groupRDD1: RDD[(String, Iterable[Int])] = rdd.groupByKey() groupRDD1.collect().foreach(println) // (a,CompactBuffer(1, 2, 3)) // (b,CompactBuffer(4)) println(groupRDD1.partitioner) // Some(org.apache.spark.HashPartitioner@8) 默认使用哈希分区器,8个分区 ``` - groupByKey的RDD依赖关系: ![05.Spark RDD转换算子09.png](https://lilinchao.com/usr/uploads/2021/06/3915956801.png) - 思考:groupByKey 与 groupBy的区别? | | groupByKey | groupBy | | ------------ | --------------------------- | ---------------------------- | | 适用集合类型 | 必须是`RDD[(K, V)]` | 任意`RDD[T]` | | 分组逻辑 | 按照Key分组 | 自定义`f:T->key`,需传入 | | 返回值 | `k ->Iterable(v1, v2, ...)` | `k -> Iterable(T1, T2, ...)` | - 经典考题: reduceByKey 和 groupByKey 的区别? - 从 shuffle 的角度:**reduceByKey 和 groupByKey 都存在 shuffle 的操作**,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。 - 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,只能使用 groupByKey。 ![05.Spark RDD转换算子10.png](https://lilinchao.com/usr/uploads/2021/06/3070839448.png) reduceByKey针对分区内与分区间,计算规则是相同的。如果分区内与分区间的计算规则不同,可以使用aggregateByKey。 #### aggregateByKey ```scala def aggregateByKey(zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] ``` 说明:将数据根据 **不同的规则** 进行分区内计算和分区间计算 ```scala /** * aggregateByKey存在函数柯里化,有两个参数列表 * 第一个参数列表,需要传递一个参数,表示为初始值(只用于分区内计算) * 用于当碰见key第一个value时,与它进行分区内计算 * 第二个参数列表需要传递2个参数 * 参数1表示分区内计算规则 * 参数2表示分区间计算规则 */ ``` 示例:取出每个分区内相同 key 的最大值然后分区间相加 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) rdd.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) //(b,8) //(a,8) ``` ![05.Spark RDD转换算子11.png](https://lilinchao.com/usr/uploads/2021/06/3114546660.png) 初始值`zeroValue`的选取是重要的,如果给的值不合适,将会是不同的结果: ```scala rdd.aggregateByKey(5)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println) //(b,10) //(a,11) ``` ![05.Spark RDD转换算子12.png](https://lilinchao.com/usr/uploads/2021/06/3713508465.png) aggregateByKey中初始值的类型与原本值的类型 **可以不同**,而最终的返回数据结果应该和初始值的类型保持一致,重温一下方法签名: ```scala // 键值对的泛型[K, V],输出为[K, U] def aggregateByKey(zeroValue: U) (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] ``` 示例:获取相同key的数据的平均值 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // 获取相同key的数据的平均值 => (a, 3),(b, 4) val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )( // (Tuple, Int) => Tuple // a: t=(0,0),v=1 => (1,1) => t=(1,1),v=2 => (3,2) ( t, v ) => { (t._1 + v, t._2 + 1) }, // (Tuple, Tuple) => Tuple // a: t1=(3,2),t2=(6,1) => (9,3) (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2) } ) // 对[K,V]做映射时,若K保持不动,仅对V做映射,可使用mapValues(f: V => U) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) // (a, 3) (b, 4) ``` ![05.Spark RDD转换算子13.png](https://lilinchao.com/usr/uploads/2021/06/939009585.png) #### foldByKey ```scala def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] ``` 说明:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey 示例: ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) // 二者等价: 结果为 (b,12),(a,9) rdd.aggregateByKey(0)(_+_, _+_) rdd.foldByKey(0)(_+_) ``` 注:值得注意的是,foldByKey保持键值对的泛型不变(`(k,v)->(k,v)`),而aggregateByKey可能会改变输出的值类型(`(k,v)->(k,u)`)。 - 问题:当分区内计算规则和分区间计算规则相同时,foldByKey和reduceByKey都能实现,二者有什么区别呢? ```scala def reduceByKey(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] ``` **相同点**:不会改变键值对类型`(K,V)->(K,V)`,针对相同的Key,对Value做**两两聚合**操作 **不同点**:`reduceByKey`没有初始值,如果key的数据只有一个,是不会参与运算的,直接返回;而`foldByKey`要给定初始值,如果key的数据只有一个,就会与初始值进行计算。 #### combineByKey ```scala def combineByKey( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] ``` 说明:它是对aggregateByKey的另一种实现,它不直接给定初始值,而是将相同key的第一个数据进行结构的转换,作为初始值。 ```scala /** * combineByKey : 方法需要三个参数 * 1. createCombiner:将相同key的第一个数据进行结构的转换,实现操作 * 2. mergeValue:分区内的计算规则 * 3. mergeCombiners:分区间的计算规则 */ ``` 示例:获取相同key的数据的平均值 ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //注:因为第一个参数返回值类型是动态的,所以计算规则需加上泛型限定 val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey( t => (t, 1), //"a": 1 => (1, 1)形成初始值 ( t: (Int, Int), v) => { (t._1 + v, t._2 + 1) }, (t1: (Int, Int), t2: (Int, Int)) => { (t1._1 + t2._1, t1._2 + t2._2) } ) val resultRDD: RDD[(String, Int)] = newRDD.mapValues { case (num, cnt) => { num / cnt } } resultRDD.collect().foreach(println) // (a, 4) (b, 4) ``` 执行流程(初始值 -> 分区内 -> 分区间)如图所示: ![05.Spark RDD转换算子14.png](https://lilinchao.com/usr/uploads/2021/06/1443787968.png) - groupByKey、reduceByKey、foldByKey、aggregateByKey这四种算子,**最终都归结为对combineByKey 的调用**。 - combineByKey 共有五个参数如下: ![05.Spark RDD转换算子15.png](https://lilinchao.com/usr/uploads/2021/06/533512966.png) - 值得注意的是:groupByKey的参数`mapSideCombine=false`,不会在map端进行combine操作,其余四种算子该参数为`mapSideCombine=true`。 - 归约算子的内部实现: ![05.Spark RDD转换算子16.png](https://lilinchao.com/usr/uploads/2021/06/202345924.png) | 转换操作 | 生成RDD的类型 | | ------------------------------------------------------- | ---------------------------------------------------------- | | combineByKey (reduceByKey、foldByKey、aggregateByKey) | MapParitionsRDD(预聚合)-> ShuffledRDD -> MapParitionsRDD | | groupByKey | ShuffledRDD -> MapParitionsRDD | 其中, ShuffledRDD 进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。 - 对比:**reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?** | | 初始值 | 相同的Key第一个值 | 分区内&分区间的计算规则 | | -------------- | ------ | ------------------------------------------- | ----------------------- | | reduceByKey | 无 | 相同 key 的第一个数据不进行任何计算 | 计算规则相同 | | foldByKey | 给定 | 相同 key 的第一个数据和初始值进行分区内计算 | 计算规则相同 | | aggregateByKey | 给定 | 相同 key 的第一个数据和初始值进行分区内计算 | 计算规则可以不同 | | combineByKey | 无 | 相同 key 的第一个数据结构转换,作为初始值 | 计算规则可以不同 | **重要相同点:四个算子均具有“预聚合”功能,即在shuffle落盘之前,在内存中先聚合数据,再写入磁盘,减少数据落盘量** 示例:实现wordCount ```scala val rdd = sc.makeRDD(List( ("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6) ),2) //实现wordCount的四种方式:(b,12)、(a,9) rdd.reduceByKey(_+_) rdd.aggregateByKey(0)(_+_, _+_) rdd.foldByKey(0)(_+_) rdd.combineByKey(v=>v, (v1: Int, v2) => v1+v2, (v1: Int, v2: Int)=> v1+v2) ``` #### join ```scala def join(other: RDD[(K, W)]): RDD[(K, (V, W))] ``` 说明:在类型为`(K,V)`和`(K,W)`的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的`(K,(V,W))`的 RDD ```scala val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val rdd2 = sc.makeRDD(List( ("a", 4), ("a", 5), ("c", 6) )) val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2) joinRDD.collect().foreach(println) //(a,(1,5)) //(a,(1,4)) //(c,(3,6)) ``` 如果两个数据源中key没有匹配上,那么数据不会出现在结果中(内连接,取交集); 如果两个数据源中key有多个相同的,会逐个匹配,可能会出现**笛卡尔乘积**,且会发生shuffle,故不推荐使用。 #### leftOuterJoin ```scala def leftOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] ``` 说明:类似于左外连接,保留主表的所有数据,从表数据会由Option封装。 ```scala val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5))) val leftRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2) //(a,(1,Some(4))) //(b,(2,Some(5))) //(c,(3,None)) ``` 相应的,还有右外连接**rightOuterJoin**。 #### cogroup ```scala def cogroup(other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] ``` 说明:在类型为`(K,V)`和`(K,W)`的 RDD 上调用,返回一个`(K,(Iterable
,Iterable
))`类型的 RDD ```scala val rdd1 = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3))) val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5),("c", 6),("c", 7))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2) // (a,(CompactBuffer(1, 2),CompactBuffer(4))) // (b,(CompactBuffer(3),CompactBuffer(5))) // (c,(CompactBuffer(),CompactBuffer(6, 7))) ``` 它的Join的区别在于:Join返回的是两侧RDD公共的Key,而cogroup可以返回仅一侧出现的Key,类似于**全外连接**。 join等连接操作的底层,使用的是cogroup实现,**Join内部机制**如图: ![05.Spark RDD转换算子17.png](https://lilinchao.com/usr/uploads/2021/06/1118227016.png) #### sortByKey ```scala def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] ``` 说明:在一个(K,V)的 RDD 上调用,**K 必须实现 Ordered 特质**,返回一个按照 key 进行排序的(K, V) ```scala val dataRDD1 = sc.makeRDD(List(("a",3),("b",2),("c",1))) val sortRdd1: RDD[(String, Int)] = dataRDD1.sortByKey() // 按Key升序 (a,3),(b,2),(c,1) val sortRdd2: RDD[(String, Int)] = dataRDD1.sortByKey(false) // 按Key降序 (c,1),(b,2),(a,3) ``` ### 经典案例 数据准备: > agent.log [时间戳,省份,城市,用户,广告],中间字段使用空格分隔。 功能实现:统计出 **每一个省份 每个广告被点击数量排行的 Top3** 分析: ① **提取有效数据**:通过`map`只保留有效数据,比如省份,广告,减少数据传输量 ② **建立有效键Key**:省份与广告均为分组关键词,应将元组`(省份,广告)`作为Key ③ **归约**:` ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )` ④ **结构转换**:为了查询每一个省份的TOP,做转换` ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )` ⑤ **分组**:按照省份进行分组,每个省份对应多干个`( 广告, sum )` ⑥ **排序**:对`sum`降序排序,取前三 ![05.Spark RDD转换算子18.png](https://lilinchao.com/usr/uploads/2021/06/996595702.png) ```scala // 1. 获取原始数据:时间戳,省份,城市,用户,广告 val rdd: RDD[String] = sc.textFile("data/agent.log") // 2. 将原始数据进行结构的转换。方便统计 // 时间戳,省份,城市,用户,广告 // => // ( ( 省份,广告 ), 1 ) val mapRDD: RDD[((String, String), Int)] = rdd.map( line => { val words: Array[String] = line.split(" ") ((words(1), words(4)), 1) } ) // 3. 将转换结构后的数据,进行分组聚合 // ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum ) val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _) // 4. 将聚合的结果进行结构的转换 // ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) ) val mapRdd1: RDD[(String, (String, Int))] = reduceRDD.map { case ((prv, ad), sum) => { (prv, (ad, sum)) } /* case (tuple, cnt) => { (tuple._1, (tuple._2, cnt)) }*/ } // 5. 将转换结构后的数据根据省份进行分组 // ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 ) val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRdd1.groupByKey() // 6. 将分组后的数据组内排序(降序),取前3名 val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3) } ) // 7. 采集数据打印在控制台 resultRDD.collect().foreach(println) ``` *附:* 原文链接地址:https://juejin.cn/post/6957937654360965157#heading-35
标签:
Spark
,
Spark Core
,
Spark RDD
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1300.html
上一篇
04.Spark RDD创建简介
下一篇
06.【转载】Spark RDD行动算子
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
43
标签云
序列化和反序列化
Stream流
Java阻塞队列
并发编程
递归
MySQL
随笔
NIO
Beego
SpringCloud
Golang基础
Spark Core
Spring
Redis
Git
线程池
Quartz
Kibana
BurpSuite
GET和POST
SpringCloudAlibaba
数学
Shiro
并发线程
Elasticsearch
Hive
RSA加解密
Yarn
链表
Spark Streaming
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞