实时推荐系统之Flink数据预处理(一)
程序员文章站
2022-06-02 17:06:48
...
文章目录
前言
刚写了一个demo,直接上代码
一、Flink 数据预处理
将Kafka中的数据,根据指定窗口求出每个用户时间段内的点击商品的TopN,这里就临时写了一个source测试.
其实就是在传统求TopN的基础上,加了一个以用户Id 做KeyBy()
代码如下(示例):
object PersonasV2 {
case class UserDisplayBehavior(userId: Long, behavior: String, channelSet:String, itemId: Long, categoryId:Int, timestamp: Long)
case class ItemUserClickCount(key: (Long,Long), windowEnd: Long, count: Long) //key = userId + itemId
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1)
var topSize = 0;
try {
val tool = ParameterTool.fromArgs(args)
topSize = tool.get("topSize").toInt
println("topSize: " + topSize)
} catch {
case e:Exception => {
e.printStackTrace()
topSize = 3
println("未设置参数使用默认值topSize=3");
}
};
val kafkaDstream: DataStream[UserDisplayBehavior] = env.addSource(new UserDisplayEventSource)
val stream = kafkaDstream
.assignAscendingTimestamps(_.timestamp )
.filter(_.behavior=="CLICK")
.keyBy(data=>(data.userId,data.itemId))
.timeWindow(Time.minutes(10), Time.minutes(1))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(_.windowEnd)
.keyBy(_.key._1)
.process(new TopNHotItems(topSize))
.addSink(new PersonasV2RedisSink)
env.execute("User Click Hot Items Job")
}
// COUNT 统计的聚合函数实现,每出现一条记录加一
class CountAgg extends AggregateFunction[UserDisplayBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserDisplayBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
// 用于输出窗口的结果
class WindowResultFunction extends WindowFunction[Long, ItemUserClickCount, (Long,Long), TimeWindow] {
override def apply(key: (Long,Long), window: TimeWindow, aggregateResult: Iterable[Long],
collector: Collector[ItemUserClickCount]) : Unit = {
val count = aggregateResult.iterator.next
collector.collect(ItemUserClickCount(key, window.getEnd, count))
}
}
// 求某个窗口中 统计每个用户的 前 topSize 名的热门点击商品,key 为用户ID
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]] {
private var itemState : ListState[ItemUserClickCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名状态变量的名字和状态变量的类型
val itemsStateDesc = new ListStateDescriptor[ItemUserClickCount]("itemState-state", classOf[ItemUserClickCount])
// 从运行时上下文中获取状态并赋值
itemState = getRuntimeContext.getListState(itemsStateDesc)
}
override def processElement(input: ItemUserClickCount, context: KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]]#Context, collector: Collector[List[ItemUserClickCount]]): Unit = {
// 每条数据都保存到状态中
itemState.add(input)
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
// 也就是当程序看到windowend + 1的水位线watermark时,触发onTimer回调函数
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]]#OnTimerContext, out: Collector[List[ItemUserClickCount]]): Unit = {
// 获取收到的所有商品点击量
val allItems: ListBuffer[ItemUserClickCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 提前清除状态中的数据,释放空间
itemState.clear()
// 按照点击量从大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
out.collect(sortedItems.toList)
}
}
}
二、写入Redis缓存
将每个用户的商品点击TopN数据放入Redis中。
数据以UserId作为key,存储ZSet类型格式,这样商品就会有相应的排名
例如:
key value score
userId_1 itemId1 0
itemId2 1
itemId3 2
class PersonasV2RedisSink extends RichSinkFunction[List[ItemUserClickCount]]{
var jedisPool :JedisPool = _
var jedis: Jedis = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
jedisPool = new JedisPool(new GenericObjectPoolConfig, "192.168.67.21", 6389, 1000, "123456")
jedis = jedisPool.getResource
}
override def invoke(value: List[ItemUserClickCount], context: SinkFunction.Context[_]): Unit = {
if (value != null && value.size >0) {
//移除指定key有序集合中给定的分数区间的所有成员
val k = "userId_"+value(0).key._1.toString
println("clear key: "+k+" 0-10");
jedis.zremrangeByScore(k,0,10)
//插入数据 (userId,分数排名,值)
for (i <- 0 until value.size) {
//val k = "userId_"+value(i).key._1.toString
val v = value(i).key._2.toString
println("sink to redis: "+k,i,value(i).count,v);
jedis.zadd(k,i,v)
}
}
}
override def close(): Unit = {
jedis.close()
jedisPool.close()
super.close()
}
}
总结
使用了Flink做ETL,得到用户窗口内点击量TopN,后续传输到Redis中,使用了Redis的ZSet存储特性,让value做到有序存储,供后续查询调用。
上一篇: 网站优化为什么要为网站增加优质的内容?
推荐阅读