Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis
程序员文章站
2022-07-07 12:51:29
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/ 本片博文为大家带来的是Spark Streaming 项目实战 ....
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有csdn这一个平台,博客主页:https://buwenbuhuo.blog.csdn.net/
本片博文为大家带来的是Spark Streaming 项目实战 (4) | 得到最近1小时广告点击量实时统计并写入到redis。
统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量
此部分最终想要得到的结果如下:
一. 得到最近1小时广告点击量实时统计
-
1. 新建类
LastHourApp
package com.buwenbuhuo.streaming.project.app
import com.buwenbuhuo.streaming.project.bean.AdsInfo
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream
/*
* @author 不温卜火
* @create 2020-08-17 11:19
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*/
object LastHourApp extends App {
override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {
adsInfoStream
// 1. 先把窗口分好
.window(Minutes(60),Seconds(3))
// 2. 按照广告分钟 进行聚合
.map(info => (info.adsId,info.hmString) -> 1)
.reduceByKey(_+_)
// 3. 再按照广告分组,把这个广告下所有的分钟记录放在一起
.map{
case ((ads,hm),count) => (ads,(hm,count))
}
.groupByKey()
.print(10000)
}
}
/*
统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量,每6秒统计一次
1. 各广告,每分钟 -> 按照(广告,分钟) 分组
2. 最近1小时,每6秒统计一次 -> 窗口: 窗口长度1小时 窗口的滑动步长 5s
----
1. 先把窗口分好
2. 按照广告分钟 进行聚合
3. 再按照广告分组,把这个广告下所有的分钟记录放在一起
4. 把结果写在redis中
*/
- 2. 运行结果
二. 写入到redis中
- 1. 源码(添加)
// 4. 把结果写在redis中
.foreachRDD(rdd => {
rdd.foreachPartition(it=>{
if (it.nonEmpty){ // 只是判断是否有下一个元素,指针不会跳过这个元素
// 1. 先建立到redis连接
val client: Jedis = RedisUtil.getClient
// 2. 写元素到redis
// 2.1 一个一个的写(昨天)
// 2.2 批次写入
import org.json4s.JsonDSL._
val key: String = "last:ads:hour:count"
val map: Map[String, String] = it.toMap.map {
case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))
}
// scala集合转换成java集合
import scala.collection.JavaConversions._
println(map)
client.hmset(key,map)
// 3. 关闭redis(用的是连接池,实际是把连接归还给连接池)
client.close()
}
})
- 2. 运行结果
- 3. 在redis中查看
三. 完整代码
package com.buwenbuhuo.streaming.project.app
import com.buwenbuhuo.streaming.project.bean.AdsInfo
import com.buwenbuhuo.streaming.project.util.RedisUtil
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis
/*
* @author 不温卜火
* @create 2020-08-17 11:19
* MyCSDN : https://buwenbuhuo.blog.csdn.net/
*/
object LastHourApp extends App {
override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {
adsInfoStream
// 1. 先把窗口分好
.window(Minutes(60),Seconds(3))
// 2. 按照广告分钟 进行聚合
.map(info => (info.adsId,info.hmString) -> 1)
.reduceByKey(_+_)
// 3. 再按照广告分组,把这个广告下所有的分钟记录放在一起
.map{
case ((ads,hm),count) => (ads,(hm,count))
}
.groupByKey()
// 4. 把结果写在redis中
.foreachRDD(rdd => {
rdd.foreachPartition(it=>{
if (it.nonEmpty){ // 只是判断是否有下一个元素,指针不会跳过这个元素
// 1. 先建立到redis连接
val client: Jedis = RedisUtil.getClient
// 2. 写元素到redis
// 2.1 一个一个的写(昨天)
// 2.2 批次写入
import org.json4s.JsonDSL._
val key: String = "last:ads:hour:count"
val map: Map[String, String] = it.toMap.map {
case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))
}
// scala集合转换成java集合
import scala.collection.JavaConversions._
println(map)
client.hmset(key,map)
// 3. 关闭redis(用的是连接池,实际是把连接归还给连接池)
client.close()
}
})
})
}
}
/*
统计各广告最近1小时内的点击量趋势:各广告最近1小时内各分钟的点击量,每6秒统计一次
1. 各广告,每分钟 -> 按照(广告,分钟) 分组
2. 最近1小时,每6秒统计一次 -> 窗口: 窗口长度1小时 窗口的滑动步长 5s
----
1. 先把窗口分好
2. 按照广告分钟 进行聚合
3. 再按照广告分组,把这个广告下所有的分钟记录放在一起
4. 把结果写在redis中
----
写到redis的时候的数据的类型
1.
key value
广告id json字符串每分钟的点击量
2.
key value
"last:ads:hour:count" hash
field value
adsId json字符串
"1" {"09:24":100,"09:25":110,...}
*/
本次的分享就到这里了,
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!
本文地址:https://blog.csdn.net/qq_16146103/article/details/108051997
推荐阅读
-
输入框的字数时时统计—关于 onpropertychange 和 oninput 使用
-
C#实现xml文件反序列化读入数据到object的方法
-
vue项目在安卓低版本机显示空白的原因分析(两种)
-
报表统计怎么做(月报表的统计制作方法)
-
idea如何设置jdk版本(idea项目配置jdk方法)
-
selenium python虚拟点击网页 爬虫翻页功能 href=javascript:void(0)怎么翻页
-
ASP.NET MVC4中使用Html.DropDownListFor的方法示例
-
【最新漏洞】IE中使用Rds.DataSpace下载并运行病毒文件
-
java web教程pdf(javaweb完整项目源码)
-
head first java怎么样(java从入门到精通)