基于用户行为日志分析热门实时商品统计,统计近1小时内的热门商品,每5分钟更新一次,热门度用浏览次数(“pv”)衡量
//生产者
object KafkaProducerTask {
//执行方法
def main(args: Array[String]): Unit = {
writeToKafka(“hotitem”)
}
def writeToKafka(topic: String): Unit = {
val pro = new Properties()
pro.setProperty(“bootstrap.servers”,“spark1:9092”)
pro.setProperty(“key.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
pro.setProperty(“value.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
pro.setProperty(“auto.offset.reset”,“latest”)
//把配置传进来
val producer = new KafkaProducerString, String
//用io.Source接收数据
val source = io.Source.fromFile(“input/UserBehavior.csv”)
//一行一行发数据
for (elem <- source.getLines()) {
//发数据前先new出producerRecord
val value = new ProducerRecord[String, String](topic, elem)
producer.send(value)
}
producer.close()
}
}
========================================================
//消费端
object HotLog {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "spark1:9092")
properties.setProperty("group.id", "hotitem")
//传入配置
val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitem", new SimpleStringSchema(), properties))
ds.map(x => {
val dar: Array[String] = x.split(",")
//传入样例类
UserBehavior(dar(0).toLong, dar(1).toLong, dar(2).toInt, dar(3), dar(4).toLong)
})
//分配时间戳,表内无乱序,升序使用assignAscendingTimestamps
.assignAscendingTimestamps(_.timeStamp * 1000)
//筛选行为是pv的数据
.filter(_.behavior == "pv")
.keyBy(_.itemid)
//一小时内5分钟统计一次
.timeWindow(Time.hours(1), Time.minutes(5))
//窗口聚合,agrregate可以自定义聚合规则,而不是用sum统计数量
.aggregate(new CountAgg(), new WindowResult())
//基于窗口排序
.keyBy(_.windowEnd)
//拿到前3的热门商品,聚合完后有很多items,用process排序其功能较多
.process(new TopNHot(3))
.print()
env.execute("")
}
}
//累加器,输入来源为样例类输入
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//自定义窗口函数,包装成中间输出的商品浏览量的样例类ItemViewCount输出
//in:累加的输出结果,out:ItemViewCount样例类输出结果,key:itemid,window:timewindow
class WindowResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
//定义key的变量
val itemId = key
//拿到窗口结束时间
val windowEnd = window.getEnd
//计数
val count = input.iterator.next()
//收集输出结果,传入样例类
out.collect(new ItemViewCount(itemId, windowEnd, count))
}
}
class TopNHot(n: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
//定义一个中间状态
var itemState: ListState[ItemViewCount] = _
//初始化状态
override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]))
}
//处理每一个元素时调用
override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
//每一条数据都存到state里
itemState.add(i)
//注册定时器,定时器触发的时候当前windowEnd的数据都到了,做排序处理
context.timerService().registerEventTimeTimer(i.windowEnd + 100)
}
//定时调用,注册定时器Timer并触发之后的回调操作
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
//收集所有数据放到list里面
val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
//使用itemstate.get的foreach方法,首先引入javaConversions
import scala.collection.JavaConversions._
for (item <- itemState.get()) {
allItems += item
}
//清除状态,释放空间
itemState.clear()
//按照itemviewCount大小排序
val sortedItem = allItems.sortBy(_.count)(Ordering.Long.reverse).take(n)
//将排序号的数据打印输出
val result = new StringBuilder()
print("-------------")
//当前时间窗口
result.append("时间").append(new Timestamp(timestamp - 100)).append("\n")
//通过for循环输出商品信息
for (elem <- sortedItem.indices) {
val current = sortedItem(elem)
result.append("No").append(elem + 1).append(":")
.append("商品ID=").append(current.itemid)
.append("浏览量").append(current.count)
.append("\n")
}
result.append("``````````````````")
Thread.sleep(1000)
out.collect(result.toString())
}
}
上一篇: mysql获取一个小时内的数据