基于Flink实时项目:用户行为分析(一:实时热门商品统计)
程序员文章站
2022-05-14 21:26:47
...
1.需求:实时热门商品的统计
每隔5分钟输出一次最近一小时内点击量最多的前N个商品
步骤:
• 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
• 过滤出点击行为数据
• 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
• 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品
【注】此过程还可以进行优化,目前是从文件中读取数据,可以考虑换成从kafka中读取数据,这个等之后我再来更新。
2.代码实现:
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.sql.Timestamp
import scala.collection.mutable.ListBuffer
//定义输入数据样例类
case class UserBehavior(UserId:Long,itemID:Long , categoryId : Int , behavior:String , timestamp:Long )
//商品点击量(窗口操作的输出类型)
case class ItemViewCount(itemId:Long,windowEnd:Long,count:Long)
object TopNItems {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置EventTime(flink1.12默认的时间是事件时间)
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream: DataStream[UserBehavior] = env
.readTextFile("E:\\WY\\programme\\MusicProject\\src\\main\\resources\\UserBehavior.csv")
.map(line => {
val arr = line.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
})
//指定时间戳和watermark
.assignAscendingTimestamps(_.timestamp * 1000)
val aggStream: DataStream[ItemViewCount] = dataStream
.filter(_.behavior == "pv") //过滤出点击事件
.keyBy("itemID") //使用keyby对商品进行分组
.timeWindow(Time.minutes(60),Time.minutes(5)) //对每个商品做滑动窗口(1小时的窗口,5分钟滑动一次)
.aggregate(new CountAgg(),new WindowResultFunction()) //增量聚合操作,好处是提前聚合数据,减少state的存储压力,
//CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一
val resultStream = aggStream
.keyBy("windowEnd") //统计每个窗口内的热门商品,需要再次按窗口进行分组,这里根据窗口的结束时间windowEnd进行分组
.process(new Top(3)) //使用process实现一个自定义的TopN函数
resultStream.print()
env.execute("TopNItems")
}
}
//Count统计的聚合函数的实现,没出现一条数据就加一
class CountAgg extends AggregateFunction[UserBehavior,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
//用于窗口的输出操作
class WindowResultFunction() extends WindowFunction[Long,ItemViewCount,Tuple,TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
val itemId:Long = key.asInstanceOf[Tuple1[Long]].f0
val windowEnd = window.getEnd
val count = input.iterator.next()
out.collect(ItemViewCount(itemId,windowEnd,count))
}
}
//求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
class Top(topSize: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String] {
private var itemState : ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//命名状态变量的名字和状态变量的类型
val itemStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state",classOf[ItemViewCount])
//定义状态变量
itemState = getRuntimeContext.getListState(itemStateDesc)
}
override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
//每条数据保存到状态中
itemState.add(input)
//注册windowEnd + 1的EventTime Timer,当触发时,说明收齐了属于windowEnd窗口的所有商品数据
//也就是当程序看到windowEnd + 1 的水位线watermark时,触发onTimer回调函数
context.timerService().registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
//获取当前商品的所有点击量
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
val iter = itemState.get().iterator()
while (iter.hasNext){
allItems += iter.next()
}
//提前清除状态中的数据,释放空间
itemState.clear()
//按照点击量从大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
//将排名信息格式化成String,便于打印
val result:StringBuilder = new StringBuilder
result.append("==========================================\n")
result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n")
for (i <- sortedItems.indices ){
val currentItem:ItemViewCount = sortedItems(i)
//输出结果样式 No1: 商品 ID=12224 浏览量=2413
result.append("No").append(i+1).append(":")
.append("商品ID=").append(currentItem.itemId)
.append("浏览量=").append(currentItem.count).append("\n")
}
result.append("==========================================\n\n")
//控制输出频率,模拟实时的滚动效果
Thread.sleep(1000)
out.collect(result.toString())
}
}
3.计算结果展示:
每隔5分钟会统计最近一小时之内的数据:
下一篇: alarm一小时一次