李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(四)
Leefs
2021-11-14 AM
1522℃
0条
[TOC] ### 一、需求 > 统计有过连续3天以上销售的店铺,并计算销售额 **结果示例** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 二、数据准备 + order.csv ``` sid,datatime,money shop1,2019-01-18,500 shop1,2019-02-10,500 shop1,2019-02-10,200 shop1,2019-02-11,600 shop1,2019-02-12,400 shop1,2019-02-13,200 shop1,2019-02-15,100 shop1,2019-03-05,180 shop1,2019-04-05,280 shop1,2019-04-06,220 shop2,2019-02-10,100 shop2,2019-02-11,100 shop2,2019-02-13,100 ``` + sid:店铺ID + datatime:订单创建日期 + money:订单金额 ### 三、SQL实现 #### 3.1 步骤 ``` 1.将每天的金额求和(同一天可能会有多个订单) 2.给每个商家中每日的订单按时间排序并打上编号 3.获取date与rn的差值的字段 4.获得最终结果 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/13 * @description 统计有过连续3天以上销售的店铺,并计算销售额 **/ object RollupMthIncomeSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data_sql/order.csv") df.createOrReplaceTempView("v_orders") //1.将每天的金额求和(同一天可能会有多个订单) spark.sql( """ | select | sid, | datatime, | SUM(money) day_money | from v_orders | group by sid,datatime """.stripMargin).createOrReplaceTempView("t1") //2.给每个商家中每日的订单按时间排序并打上编号 spark.sql( """ | select | sid, | datatime, | day_money, | ROW_NUMBER() OVER(PARTITION BY sid ORDER BY datatime) rn | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.获取date与rn的差值的字段 spark.sql( """ | select | sid, | datatime, | day_money, | DATE_SUB(datatime,rn) diff | from t2 """.stripMargin).createOrReplaceTempView("t3") //4.获得最终结果 spark.sql( """ | select | sid, | MIN(datatime) begin_date, | MAX(datatime) end_date, | COUNT(*) times, | SUM(day_money) total_sales | from t3 | GROUP BY sid,diff | HAVING times >=3 """.stripMargin).show() } } ``` **运行结果** ``` +-----+----------+----------+-----+-----------+ | sid|begin_date| end_date|times|total_sales| +-----+----------+----------+-----+-----------+ |shop1|2019-02-10|2019-02-13| 4| 1900| +-----+----------+----------+-----+-----------+ ``` ### 四、RDD编码实现 ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author lilinchao * @date 2021/11/14 * @description 1.0 **/ object RollupMthIncomeRDD { def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val linesRDD: RDD[String] = sc.textFile("data_sql/order.csv") //过滤掉第一行表头 val header = linesRDD.first() val lines = linesRDD.filter(row => row != header) val reduced: RDD[((String, String), Double)] = lines.map(line => { val fields: Array[String] = line.split(",") val sid: String = fields(0) val dateStr: String = fields(1) val month: String = dateStr.substring(0, 7) val money: Double = fields(2).toDouble ((sid, month), money) }).reduceByKey(_ + _) // 按照shop id分组 val result: RDD[(String, String, String, Double)] = reduced.groupBy(_._1._1).mapValues(it => { //将迭代器中的数据toList放入到内存 //并且按照月份排序【字典顺序】 val sorted: List[((String, String), Double)] = it.toList.sortBy(_._1._2) var rollup = 0.0 sorted.map(t => { val sid = t._1._1 val month = t._1._2 val month_sales = t._2 rollup += month_sales (sid, month, rollup) }) }).flatMapValues(lst => lst).map(t => (t._1, t._2._1, t._2._2, t._2._3)) result.foreach(println) sc.stop() } } ``` **运行结果** ``` (shop1,shop1,2019-01,500.0) (shop1,shop1,2019-02,2500.0) (shop1,shop1,2019-03,2680.0) (shop1,shop1,2019-04,3180.0) (shop2,shop2,2019-02,300.0) ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1623.html
上一篇
SparkSQL案例实操(三)
下一篇
SparkSQL案例实操(五)
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
8
标签云
Ubuntu
Hive
Jquery
Livy
SQL练习题
Zookeeper
排序
JavaWeb
Spark Streaming
Netty
MySQL
Hadoop
Kibana
SpringCloud
Java
递归
Spring
Nacos
pytorch
RSA加解密
DataWarehouse
Elasticsearch
链表
查找
Docker
Spark Core
锁
JavaScript
CentOS
数学
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭