李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(二)
Leefs
2021-11-13 PM
1064℃
0条
[TOC] ### 一、需求 #### 1.1 需求简介 > 各区域热门商品 Top3 > > + 这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品 > + 备注上每个商品在主要城市中的分布比例 > + 超过两个城市用其他显示 **示例** | 地区 | 商品名称 | 点击次数 | 城市备注 | | ---- | -------- | -------- | ---------------------------------- | | 华北 | 商品 A | 100000 | 北京 21.2%,天津 13.2%,其他 65.6% | | 华北 | 商品 P | 80200 | 北京 63.0%,太原 10%,其他 27.0% | | 华北 | 商品 M | 40000 | 北京 63.0%,太原 10%,其他 27.0% | | 东北 | 商品 J | 92000 | 大连 28%,辽宁 17.0%,其他 55.0% | #### 1.2 需求分析 + 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与 Product_info 表连接得到产品名称; + 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数 + 每个地区内按照点击次数降序排列 + 只取前三名 + 城市备注需要自定义 UDAF 函数 ### 二、样例类 + 用户访问动作表(`user_visit_action.txt`) ```scala case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Option[Long],//用户的ID session_id: String,//Session的ID page_id: Option[Long],//某个页面的ID action_time: String,//动作的时间点 search_keyword: String,//用户搜索的关键词 click_category_id: Option[Long],//某一个商品品类的ID click_product_id: Option[Long],//某一个商品的ID order_category_ids: String,//一次订单中所有品类的ID集合 order_product_ids: String,//一次订单中所有商品的ID集合 pay_category_ids: String,//一次支付中所有品类的ID集合 pay_product_ids: String,//一次支付中所有商品的ID集合 city_id: Option[Long] //城市ID ) ``` + 商品信息表(`product_info.txt`) ```scala case class ProductInfo( product_id: Long, //商品ID product_name: String, //商品名称 extend_info: String //商品平台类型 ) ``` + 城市信息表(`city_info.txt`) ```scala case class CityInfo( city_id: Long, //城市ID city_name: String, //城市名称 area: String //城市所属区域 ) ``` ### 三、实现 #### 3.1 实现步骤 ``` 1. 连接三张表的数据,获取完整的数据(只有点击) 2. 将数据根据地区,商品名称分组 3. 统计商品点击次数总和,取 Top3 4. 实现自定义聚合函数显示备注 ``` #### 3.2 代码实现 ```scala package com.llc.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.expressions.Aggregator import scala.collection.mutable import scala.collection.mutable.ListBuffer /** * @author lilinchao * @date 2021/11/12 * @description 1.0 **/ object PopularProductsTopN { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("PopularProductsTopN").master("local[*]").getOrCreate() import spark.implicits._ val userDataSet: Dataset[UserVisitAction] = userToDataSet("data_sql/user_visit_action.txt",spark) val cityDataSet: Dataset[CityInfo] = cityToDataSet("data_sql/city_info.txt",spark) val productDataSet: Dataset[ProductInfo] = productToDataSet("data_sql/product_info.txt",spark) userDataSet.createOrReplaceTempView("user_visit_action") cityDataSet.createOrReplaceTempView("city_info") productDataSet.createOrReplaceTempView("product_info") //1.先关联三张表求出区域、城市名、商品名称 spark.sql( """ | select | a.*, | p.product_name, | c.area, | c.city_name | from user_visit_action a | join product_info p on a.click_product_id = p.product_id | join city_info c on a.city_id = c.city_id | where a.click_product_id > -1 """.stripMargin ).createOrReplaceTempView("t1") //根据区域,商品进行数据聚合 spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF())) //2.根据区域和商品分组,统计每个区域的商品点击次数 spark.sql( """ | select | area, | product_name, | count(*) as clickCnt, | cityRemark(city_name) as city_remark | from t1 group by area,product_name """.stripMargin).createOrReplaceTempView("t2") //3.统计出每个区域的热门商品,取前三 spark.sql( """ | select | *, | rank() over( partition by area order by clickCnt desc ) as rank | from t2 """.stripMargin).createOrReplaceTempView("t3") // 取前3名 spark.sql( """ | select | * | from t3 where rank <= 3 """.stripMargin).show(false) spark.close() } /** * 用户访问记录 => 读取txt文件并转换成为DataSet * @param path * @param spark */ def userToDataSet(path: String, spark: SparkSession): Dataset[UserVisitAction] = { import spark.implicits._ val frame: DataFrame = spark.read .schema(Encoders.product[UserVisitAction].schema) // .option("inferSchema", "true") //是否自动推断数据类型 .option("header", "false") .option("delimiter", "\t") //分割符【\t】,默认【,】逗号 .csv(path) // .toDF("date","user_id","session_id","page_id","action_time","search_keyword","click_category_id","click_product_id","order_category_ids","order_product_ids","pay_category_ids","pay_product_ids","city_id") frame.as[UserVisitAction] } /** * 城市信息 => 读取txt文件并转换成为DataSet * @param path * @param spark * @return */ def cityToDataSet(path: String, spark: SparkSession): Dataset[CityInfo] = { //读取外部txt文件 val dataRDD: RDD[String] = spark.sparkContext.textFile(path) //导入隐士转换,否则RDD不能够调用toDF方法 import spark.implicits._ dataRDD.map { line => { val split = line.split("\\s+") CityInfo(split(0).toLong, split(1), split(2)) } }.toDS() } /** * 商品信息 => 读取txt文件并转换成为DataSet * @param path * @param spark */ def productToDataSet(path: String, spark: SparkSession): Dataset[ProductInfo] = { val dataRDD: RDD[String] = spark.sparkContext.textFile(path) //导入隐士转换,否则RDD不能够调用toDF方法 import spark.implicits._ dataRDD.map { line => { val split = line.split("\\s+") ProductInfo(split(0).toLong, split(1), split(2)) } }.toDS() } abstract class Base extends Serializable with Product //用户访问动作表 case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Option[Long],//用户的ID session_id: String,//Session的ID page_id: Option[Long],//某个页面的ID action_time: String,//动作的时间点 search_keyword: String,//用户搜索的关键词 click_category_id: Option[Long],//某一个商品品类的ID click_product_id: Option[Long],//某一个商品的ID order_category_ids: String,//一次订单中所有品类的ID集合 order_product_ids: String,//一次订单中所有商品的ID集合 pay_category_ids: String,//一次支付中所有品类的ID集合 pay_product_ids: String,//一次支付中所有商品的ID集合 city_id: Option[Long] //城市ID ) extends Base //商品信息表 case class ProductInfo( product_id: Long, //商品ID product_name: String, //商品名称 extend_info: String //商品平台类型 ) extends Base //城市信息表 case class CityInfo( city_id: Long, //城市ID city_name: String, //城市名称 area: String //城市所属区域 ) extends Base case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] ) // 自定义聚合函数:实现城市备注功能 // 1. 继承Aggregator, 定义泛型 // IN : 城市名称 // BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】 // OUT : 备注信息 // 2. 重写方法(6) class CityRemarkUDAF extends Aggregator[String, Buffer, String]{ // 缓冲区初始化 override def zero: Buffer = { Buffer(0, mutable.Map[String, Long]()) } // 更新缓冲区数据 override def reduce(buff: Buffer, city: String): Buffer = { buff.total += 1 val newCount = buff.cityMap.getOrElse(city, 0L) + 1 buff.cityMap.update(city, newCount) buff } // 合并缓冲区数据 override def merge(buff1: Buffer, buff2: Buffer): Buffer = { buff1.total += buff2.total val map1 = buff1.cityMap val map2 = buff2.cityMap // 两个Map的合并操作 // buff1.cityMap = map1.foldLeft(map2) { // case ( map, (city, cnt) ) => { // val newCount = map.getOrElse(city, 0L) + cnt // map.update(city, newCount) // map // } // } map2.foreach{ case (city , cnt) => { val newCount = map1.getOrElse(city, 0L) + cnt map1.update(city, newCount) } } buff1.cityMap = map1 buff1 } // 将统计的结果生成字符串信息 override def finish(buff: Buffer): String = { val remarkList = ListBuffer[String]() val totalcnt = buff.total val cityMap = buff.cityMap // 降序排列 val cityCntList = cityMap.toList.sortWith( (left, right) => { left._2 > right._2 } ).take(2) val hasMore = cityMap.size > 2 var rsum = 0L cityCntList.foreach{ case ( city, cnt ) => { val r = cnt * 100 / totalcnt remarkList.append(s"${city} ${r}%") rsum += r } } if ( hasMore ) { remarkList.append(s"其他 ${100 - rsum}%") } remarkList.mkString(", ") } override def bufferEncoder: Encoder[Buffer] = Encoders.product override def outputEncoder: Encoder[String] = Encoders.STRING } } ``` **运行结果** ``` +----+------------+--------+------------------------------+----+ |area|product_name|clickCnt|city_remark |rank| +----+------------+--------+------------------------------+----+ |华东|商品_86 |371 |上海 16%, 杭州 15%, 其他 69% |1 | |华东|商品_47 |366 |杭州 15%, 青岛 15%, 其他 70% |2 | |华东|商品_75 |366 |上海 17%, 无锡 15%, 其他 68% |2 | |西北|商品_15 |116 |西安 54%, 银川 45% |1 | |西北|商品_2 |114 |银川 53%, 西安 46% |2 | |西北|商品_22 |113 |西安 54%, 银川 45% |3 | |华南|商品_23 |224 |厦门 29%, 福州 24%, 其他 47% |1 | |华南|商品_65 |222 |深圳 27%, 厦门 26%, 其他 47% |2 | |华南|商品_50 |212 |福州 27%, 深圳 25%, 其他 48% |3 | |华北|商品_42 |264 |郑州 25%, 保定 25%, 其他 50% |1 | |华北|商品_99 |264 |北京 24%, 郑州 23%, 其他 53% |1 | |华北|商品_19 |260 |郑州 23%, 保定 20%, 其他 57% |3 | |东北|商品_41 |169 |哈尔滨 35%, 大连 34%, 其他 31%|1 | |东北|商品_91 |165 |哈尔滨 35%, 大连 32%, 其他 33%|2 | |东北|商品_58 |159 |沈阳 37%, 大连 32%, 其他 31% |3 | |东北|商品_93 |159 |哈尔滨 38%, 大连 37%, 其他 25%|3 | |华中|商品_62 |117 |武汉 51%, 长沙 48% |1 | |华中|商品_4 |113 |长沙 53%, 武汉 46% |2 | |华中|商品_57 |111 |武汉 54%, 长沙 45% |3 | |华中|商品_29 |111 |武汉 50%, 长沙 49% |3 | +----+------------+--------+------------------------------+----+ ``` *注意:本次使用的Scala 2.12版本、Spark 3.0.0版本,如果代码报红,请查看版本是否对应* ### 结尾 因为本篇使用的示例数据`user_visit_action.txt`、`city_info.txt`、`product_info.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1621.html
上一篇
SparkSQL案例实操(一)
下一篇
SparkSQL案例实操(三)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
查找
MyBatisX
Thymeleaf
递归
JavaWeb
Scala
RSA加解密
Spark SQL
SpringCloudAlibaba
Jenkins
Spark Streaming
数学
MySQL
Jquery
Hive
gorm
FastDFS
机器学习
线程池
二叉树
栈
Filter
Java工具类
Spark
Beego
正则表达式
哈希表
前端
Netty
随笔
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞