李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
Spark Core案例实操(九)
Leefs
2021-11-05 AM
1031℃
0条
### 一、需求 > 分析CDN日志统计出访问PV、UV、IP地址: > > + 计算独立ip数 > + 统计每个视频独立ip数 > + 统计一天中每个小时的流量(统计每天24小时中每个小时的流量) **说明** **PV(page view):** 页面浏览量,页面点击率;通常衡量一个网站或者新闻频道一条新闻的指标; **UV(unique visitor ):** 指访问某个站点或者点击某条新闻的不同的ip的人数 ### 二、数据说明 ``` 100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)" ``` *注:通过空格进行分隔* + 第一列:用户IP + 第二列:命中率(hit/miss) + 第三列:响应时间 + 第四列:请求时间 + 第五列:请求方法 + 第六列:请求URL + 第七列:请求协议 + 第八列:状态码 + 第九列:相应数据流量 + 第十列:refer(从哪个url跳转到当前url) + 第十一列:useragent ### 三、实现 #### 3.1 步骤 ``` 1. 计算独立ip数 - 从每行日志中筛选出ip - 对每个ip计数 - 去重ip,获取到独立的ip 2. 统计每个视频独立ip数 - 筛选视频文件,拆分(文件名,ip地址) - 按照文件名称分组:(文件名,[ip,ip,ip]) - 将每个文件名ip地址去重 3. 统计一天中每个小时的流量(统计每天24小时中每个小时的流量) - 将日志中的访问时间以及请求大小两个数据提取出来(访问时间:访问大小),去除非法的访问(404) - 按照访问时间分组:(访问时间,[流量,流量,。。。]) - 将访问时间的对应的流量叠加 ``` #### 3.2 代码实现 ```scala package com.llc.spark.core_demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.util.matching.Regex /** * @author lilinchao * @date 2021/11/4 * @description 分析CDN日志统计出访问PV、UV、IP地址 **/ object CDNLogAnaluze { // 匹配ip规则: val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r // 匹配.mp4的视频规则 val VideoPattern = "([0-9]+).mp4".r // 匹配http响应码和请求大小 val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r // [15/Feb/2017:00:00:46 +0800] 匹配2017:00:00:46 val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("CDNLogAnaluze").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) //加载数据 val input:RDD[String] = sc.textFile("datas/cdn.txt") //1. 独立IP数统计 // ipStatic(input) //2. 每个视频对应的IP数 // videoIpStatic(input) //3. 统计一天中每个小时的流量 flowOfHour(input) } /** * 统计独立IP数 */ def ipStatic(data: RDD[String]):Unit = { //1.统计独立IP val ipNums = data.map(line => (IPPattern.findFirstIn(line).get,1)) .reduceByKey(_+_) .sortBy(_._2,false) //2.获取到Top10访问次数 ipNums.take(10).foreach(println) //3.获取独立IP数 println("独立ip数:" + ipNums.count()) } /** * 统计视频对应的ip数 */ def videoIpStatic(data: RDD[String]):Unit = { //1. 获取到访问视频的行 def getFileNameAndIp(line:String):(String,String) = (VideoPattern.findFirstIn(line).mkString, IPPattern.findFirstIn(line).mkString) //2. 统计每个视频的独立ip数 val dataRDD = data .filter(_.matches(".*([0-9]+)\\.mp4.*")) .map(getFileNameAndIp) .groupByKey .map(file_ip => (file_ip._1, file_ip._2.toList.distinct)) .sortBy(_._2.size, false) //3. 输出结果 dataRDD.foreach(t => println("视频:" + t._1 + ", 独立ip数:" + t._2.size)) } /** * 统计一天中每个小时的流量 */ def flowOfHour(data: RDD[String]):Unit ={ /** * 使用str匹配pattern */ /** * 使用str匹配pattern */ def isMatch(pattern:Regex, str:String): Boolean = { str match { case pattern(_*) => true case _ => false } } /** * 获取日志中小时和http请求的大小的元组 */ def getTimeAndSize(line: String) = { var res = ("", 0L) try { val httpSizePattern(code, size) = line val timePattern(year, hour) = line res = (hour, size.toLong) }catch { case e : Exception => e.printStackTrace() } res } /** * 统计一天中每个小时的流量 */ val resultRDD = data.filter(isMatch(httpSizePattern, _)) .filter(isMatch(timePattern, _)) .map(getTimeAndSize(_)) .groupByKey() .map(hour_size => (hour_size._1, hour_size._2.sum)) .sortByKey(false, 1) resultRDD.foreach(hour_size => println(hour_size._1 + "时 CDN流量 = " + hour_size._2 / (1024 * 1024 * 1024) + "GB")) } } ``` **运行结果** ``` 23时 CDN流量 = 25GB 22时 CDN流量 = 42GB 21时 CDN流量 = 53GB 20时 CDN流量 = 55GB 19时 CDN流量 = 51GB 18时 CDN流量 = 45GB 17时 CDN流量 = 44GB 16时 CDN流量 = 45GB 15时 CDN流量 = 45GB 14时 CDN流量 = 55GB 13时 CDN流量 = 51GB 12时 CDN流量 = 46GB 11时 CDN流量 = 45GB 10时 CDN流量 = 61GB 09时 CDN流量 = 52GB 08时 CDN流量 = 43GB 07时 CDN流量 = 22GB 06时 CDN流量 = 11GB 05时 CDN流量 = 4GB 04时 CDN流量 = 3GB 03时 CDN流量 = 3GB 02时 CDN流量 = 5GB 01时 CDN流量 = 3GB 00时 CDN流量 = 14GB ``` ### 结尾 因为本篇使用的示例数据`cdn.txt`文件由于数据量较大将不在下方贴出。 直接在微信公众号【Java和大数据进阶】回复:**sparkdata**,即可获取。
标签:
Spark
,
Spark Core
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1611.html
上一篇
Spark Core案例实操(八)
下一篇
Spark Core案例实操(十)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Shiro
前端
Nacos
Ubuntu
算法
队列
Docker
Eclipse
递归
FastDFS
Java
SQL练习题
Flink
ClickHouse
Quartz
HDFS
Hbase
稀疏数组
GET和POST
机器学习
锁
Http
Java工具类
Spring
正则表达式
随笔
工具
Zookeeper
数学
VUE
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞