李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(四)
Leefs
2021-11-14 AM
944℃
0条
[TOC] ### 一、需求 > 统计有过连续3天以上销售的店铺,并计算销售额 **结果示例** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 二、数据准备 + order.csv ``` sid,datatime,money shop1,2019-01-18,500 shop1,2019-02-10,500 shop1,2019-02-10,200 shop1,2019-02-11,600 shop1,2019-02-12,400 shop1,2019-02-13,200 shop1,2019-02-15,100 shop1,2019-03-05,180 shop1,2019-04-05,280 shop1,2019-04-06,220 shop2,2019-02-10,100 shop2,2019-02-11,100 shop2,2019-02-13,100 ``` + sid:店铺ID + datatime:订单创建日期 + money:订单金额 ### 三、SQL实现 #### 3.1 步骤 ``` 1.将每天的金额求和(同一天可能会有多个订单) 2.给每个商家中每日的订单按时间排序并打上编号 3.获取date与rn的差值的字段 4.获得最终结果 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/13 * @description 统计有过连续3天以上销售的店铺,并计算销售额 **/ object RollupMthIncomeSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data_sql/order.csv") df.createOrReplaceTempView("v_orders") //1.将每天的金额求和(同一天可能会有多个订单) spark.sql( """ | select | sid, | datatime, | SUM(money) day_money | from v_orders | group by sid,datatime """.stripMargin).createOrReplaceTempView("t1") //2.给每个商家中每日的订单按时间排序并打上编号 spark.sql( """ | select | sid, | datatime, | day_money, | ROW_NUMBER() OVER(PARTITION BY sid ORDER BY datatime) rn | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.获取date与rn的差值的字段 spark.sql( """ | select | sid, | datatime, | day_money, | DATE_SUB(datatime,rn) diff | from t2 """.stripMargin).createOrReplaceTempView("t3") //4.获得最终结果 spark.sql( """ | select | sid, | MIN(datatime) begin_date, | MAX(datatime) end_date, | COUNT(*) times, | SUM(day_money) total_sales | from t3 | GROUP BY sid,diff | HAVING times >=3 """.stripMargin).show() } } ``` **运行结果** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 四、RDD编码实现 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/14 * @description 1.0 **/ object RollupMthIncomeRDD { def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("data_sql/order.csv") //过滤掉第一行表头 val header = linesRDD.first() val lines = linesRDD.filter(row => row != header) val reduced: RDD[((String, String), Double)] = lines.map(line => { val fields: Array[String] = line.split(",") val sid: String = fields(0) val dateStr: String = fields(1) val month: String = dateStr.substring(0, 7) val money: Double = fields(2).toDouble ((sid, month), money) }).reduceByKey(_ + _) // 按照shop id分组 val result: RDD[(String, String, String, Double)] = reduced.groupBy(_._1._1).mapValues(it => { //将迭代器中的数据toList放入到内存 //并且按照月份排序【字典顺序】 val sorted: List[((String, String), Double)] = it.toList.sortBy(_._1._2) var rollup = 0.0 sorted.map(t => { val sid = t._1._1 val month = t._1._2 val month_sales = t._2 rollup += month_sales (sid, month, rollup) }) }).flatMapValues(lst => lst).map(t => (t._1, t._2._1, t._2._2, t._2._3)) result.foreach(println) sc.stop() } } ``` **运行结果** ``` (shop1,shop1,2019-01,500.0) (shop1,shop1,2019-02,2500.0) (shop1,shop1,2019-03,2680.0) (shop1,shop1,2019-04,3180.0) (shop2,shop2,2019-02,300.0) ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1623.html
上一篇
SparkSQL案例实操(三)
下一篇
SparkSQL案例实操(五)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Python
MyBatisX
HDFS
Stream流
微服务
Typora
Flume
JavaWEB项目搭建
随笔
持有对象
DataX
Java阻塞队列
线程池
Filter
Java编程思想
FastDFS
SpringCloud
人工智能
并发编程
Elastisearch
Map
Git
设计模式
字符串
JavaSE
Sentinel
容器深入研究
Flink
Spark SQL
序列化和反序列化
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞