欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时推荐系统之Flink数据预处理(一)

程序员文章站 2022-06-02 17:06:48
...


前言

刚写了一个demo,直接上代码


一、Flink 数据预处理

将Kafka中的数据,根据指定窗口求出每个用户时间段内的点击商品的TopN,这里就临时写了一个source测试.

其实就是在传统求TopN的基础上,加了一个以用户Id 做KeyBy()

代码如下(示例):

object PersonasV2 {

  case class UserDisplayBehavior(userId: Long, behavior: String, channelSet:String, itemId: Long, categoryId:Int, timestamp: Long)

  case class ItemUserClickCount(key: (Long,Long), windowEnd: Long, count: Long) //key = userId + itemId

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.setParallelism(1)

    var topSize = 0;

      try {
        val tool = ParameterTool.fromArgs(args)
        topSize = tool.get("topSize").toInt
        println("topSize: " + topSize)
      } catch {
        case e:Exception => {
          e.printStackTrace()
          topSize = 3
          println("未设置参数使用默认值topSize=3");
        }
      };

    val kafkaDstream: DataStream[UserDisplayBehavior] = env.addSource(new UserDisplayEventSource)

    val stream = kafkaDstream
      .assignAscendingTimestamps(_.timestamp )
      .filter(_.behavior=="CLICK")
      .keyBy(data=>(data.userId,data.itemId))
      .timeWindow(Time.minutes(10), Time.minutes(1))
      .aggregate(new CountAgg(), new WindowResultFunction())
      .keyBy(_.windowEnd)
      .keyBy(_.key._1)
      .process(new TopNHotItems(topSize))
      .addSink(new PersonasV2RedisSink)


    env.execute("User Click Hot Items Job")
  }

  // COUNT 统计的聚合函数实现,每出现一条记录加一
  class CountAgg extends AggregateFunction[UserDisplayBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(userBehavior: UserDisplayBehavior, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }
  // 用于输出窗口的结果
  class WindowResultFunction extends WindowFunction[Long, ItemUserClickCount, (Long,Long), TimeWindow] {
    override def apply(key: (Long,Long), window: TimeWindow, aggregateResult: Iterable[Long],
                       collector: Collector[ItemUserClickCount]) : Unit = {

      val count = aggregateResult.iterator.next
      collector.collect(ItemUserClickCount(key, window.getEnd, count))
    }
  }

  // 求某个窗口中 统计每个用户的 前 topSize 名的热门点击商品,key 为用户ID
  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]] {
    private var itemState : ListState[ItemUserClickCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 命名状态变量的名字和状态变量的类型
      val itemsStateDesc = new ListStateDescriptor[ItemUserClickCount]("itemState-state", classOf[ItemUserClickCount])
      // 从运行时上下文中获取状态并赋值
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }

    override def processElement(input: ItemUserClickCount, context: KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]]#Context, collector: Collector[List[ItemUserClickCount]]): Unit = {
      // 每条数据都保存到状态中
      itemState.add(input)
      // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
      // 也就是当程序看到windowend + 1的水位线watermark时,触发onTimer回调函数
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemUserClickCount, List[ItemUserClickCount]]#OnTimerContext, out: Collector[List[ItemUserClickCount]]): Unit = {
      // 获取收到的所有商品点击量
      val allItems: ListBuffer[ItemUserClickCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (item <- itemState.get) {
        allItems += item
      }
      // 提前清除状态中的数据,释放空间
      itemState.clear()

      // 按照点击量从大到小排序
      val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)

      out.collect(sortedItems.toList)

    }
  }
}

二、写入Redis缓存

将每个用户的商品点击TopN数据放入Redis中。
数据以UserId作为key,存储ZSet类型格式,这样商品就会有相应的排名
例如:
	key               value      score
	userId_1           itemId1     0
					   itemId2     1
					   itemId3     2

class PersonasV2RedisSink extends RichSinkFunction[List[ItemUserClickCount]]{

  var jedisPool :JedisPool = _
  var jedis: Jedis = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    jedisPool = new JedisPool(new GenericObjectPoolConfig, "192.168.67.21", 6389, 1000, "123456")
    jedis = jedisPool.getResource
  }


  override def invoke(value: List[ItemUserClickCount], context: SinkFunction.Context[_]): Unit = {


    if (value != null && value.size >0) {

      //移除指定key有序集合中给定的分数区间的所有成员
      val k = "userId_"+value(0).key._1.toString
      println("clear key: "+k+"  0-10");
      jedis.zremrangeByScore(k,0,10)

      //插入数据 (userId,分数排名,值)
      for (i <- 0 until value.size) {

        //val k = "userId_"+value(i).key._1.toString
        val v = value(i).key._2.toString

        println("sink to redis: "+k,i,value(i).count,v);

        jedis.zadd(k,i,v)

      }
    }
  }


  override def close(): Unit = {

    jedis.close()
    jedisPool.close()
    super.close()
  }
}

总结

使用了Flink做ETL,得到用户窗口内点击量TopN,后续传输到Redis中,使用了Redis的ZSet存储特性,让value做到有序存储,供后续查询调用。