李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
SparkSQL案例实操(五)
Leefs
2021-11-14 PM
1592℃
0条
[TOC] ### 一、需求 > 统计用户上网流量 > > + 统计用户上网流量,如果两次上网的时间小于10min,就可以rollup(合并)到一起 ### 二、数据准备 + merge.dat文件 ``` id start_time end_time flow 1 14:20:30 14:46:30 20 1 14:47:20 15:20:30 30 1 15:37:23 16:05:26 40 1 16:06:27 17:20:49 50 1 17:21:50 18:03:27 60 2 14:18:24 15:01:40 20 2 15:20:49 15:30:24 30 2 16:01:23 16:40:32 40 2 16:44:56 17:40:52 50 3 14:39:58 15:35:53 20 3 15:36:39 15:24:54 30 ``` + id:用户ID + start_time:上网开始时间 + end_time:上网结束时间 + flow:流量 ### 三、功能实现 #### 3.1 步骤 ``` 1.利用lag函数,把start_time和end_time的数据压到下一行 2.计算差值 3.打标记,如果小于10min为0或者是为空 否则为1 4.以uid为分区,再次开窗,用sum计算和 5.以uid和flags分组,聚合得到结果 ``` #### 3.2 代码 ```scala import org.apache.spark.sql.{DataFrame, SparkSession} /** * @author lilinchao * @date 2021/11/14 * @description 1.0 **/ object SparkSQLMerge { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() import spark.implicits._ val df: DataFrame = spark.read .option("header", "true") .option("inferSchema", "true") .option("sep"," ")//指定分隔符进行切分 .csv("data_sql/merge.dat") df.createOrReplaceTempView("flow_merge") //1.利用lag函数,把start_time和end_time的数据压到下一行 spark.sql( """ | select | id, | start_time, | end_time, | lag(start_time,1) over(PARTITION BY id ORDER BY start_time) lag_start_time, -- 把这行之前的数据拿到这一行 | lag(end_time) over(PARTITION BY id ORDER BY start_time) lag_end_time, -- -- 把这行之前的数据拿到这一行 | flow | from flow_merge | ORDER BY id,start_time ASC """.stripMargin ).createOrReplaceTempView("t1") //2.计算差值 spark.sql( """ | select | id, | start_time, | end_time, | lag_start_time, | lag_end_time, | (unix_timestamp(start_time,'HH:mm:ss') - unix_timestamp(lag_end_time,'HH:mm:ss') ) as seconds, | flow | from t1 """.stripMargin).createOrReplaceTempView("t2") //3.打标记,如果小于10min为0或者是为空 否则为1 spark.sql( """ | select | id, | start_time, | end_time, | case when seconds is null then 0 | when seconds <= 10 * 60 then 0 | else 1 | end flag, | flow | from t2 """.stripMargin).createOrReplaceTempView("t3") //4.以uid为分区,再次开窗,用sum计算和 spark.sql( """ | select | id, | start_time, | end_time, | sum(flag) over(partition by id order by start_time) as flags, | flow | from t3 """.stripMargin).createOrReplaceTempView("t4") //5.以uid和flags分组,聚合得到结果 spark.sql( """ | select | id, | min(start_time) start_time, | max(end_time) end_time, | sum(flow) flow | from t4 | group by id,flags | order by id,start_time """.stripMargin).show() } } ``` **运行结果** ``` +---+----------+--------+----+ | id|start_time|end_time|flow| +---+----------+--------+----+ | 1| 14:20:30|15:20:30| 50| | 1| 15:37:23|18:03:27| 150| | 2| 14:18:24|15:01:40| 20| | 2| 15:20:49|15:30:24| 30| | 2| 16:01:23|17:40:52| 90| | 3| 14:39:58|15:35:53| 50| +---+----------+--------+----+ ```
标签:
Spark
,
Spark SQL
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1624.html
上一篇
SparkSQL案例实操(四)
下一篇
CentOS7.X安装MySQL8.0教程
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
6
标签云
Spark Streaming
Nacos
锁
nginx
Beego
Scala
SpringCloud
Http
递归
JavaWEB项目搭建
Jenkins
散列
Git
算法
Thymeleaf
Ubuntu
前端
序列化和反序列化
高并发
并发线程
JavaSE
持有对象
Hive
MyBatisX
Kafka
国产数据库改造
Flink
Java
线程池
Spring
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭