欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于用户行为日志分析热门实时商品统计,统计近1小时内的热门商品,每5分钟更新一次,热门度用浏览次数(“pv”)衡量

程序员文章站 2022-03-03 09:07:59
...

//生产者
object KafkaProducerTask {

//执行方法
def main(args: Array[String]): Unit = {
writeToKafka(“hotitem”)

}

def writeToKafka(topic: String): Unit = {
val pro = new Properties()
pro.setProperty(“bootstrap.servers”,“spark1:9092”)
pro.setProperty(“key.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
pro.setProperty(“value.serializer”,“org.apache.kafka.common.serialization.StringSerializer”)
pro.setProperty(“auto.offset.reset”,“latest”)
//把配置传进来
val producer = new KafkaProducerString, String
//用io.Source接收数据
val source = io.Source.fromFile(“input/UserBehavior.csv”)
//一行一行发数据
for (elem <- source.getLines()) {
//发数据前先new出producerRecord
val value = new ProducerRecord[String, String](topic, elem)
producer.send(value)

}
producer.close()

}
}

========================================================
//消费端
object HotLog {

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  //设置时间语义
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val properties = new Properties()
  properties.setProperty("bootstrap.servers", "spark1:9092")
  properties.setProperty("group.id", "hotitem")
  //传入配置
  val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitem", new SimpleStringSchema(), properties))

  ds.map(x => {
    val dar: Array[String] = x.split(",")
    //传入样例类
    UserBehavior(dar(0).toLong, dar(1).toLong, dar(2).toInt, dar(3), dar(4).toLong)
  })
    //分配时间戳,表内无乱序,升序使用assignAscendingTimestamps
    .assignAscendingTimestamps(_.timeStamp * 1000)
    //筛选行为是pv的数据
    .filter(_.behavior == "pv")
    .keyBy(_.itemid)
    //一小时内5分钟统计一次
    .timeWindow(Time.hours(1), Time.minutes(5))
    //窗口聚合,agrregate可以自定义聚合规则,而不是用sum统计数量
    .aggregate(new CountAgg(), new WindowResult())
    //基于窗口排序
    .keyBy(_.windowEnd)
    //拿到前3的热门商品,聚合完后有很多items,用process排序其功能较多
    .process(new TopNHot(3))
    .print()

  env.execute("")
}

}

//累加器,输入来源为样例类输入
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L

override def add(in: UserBehavior, acc: Long): Long = acc + 1

override def getResult(acc: Long): Long = acc

override def merge(acc: Long, acc1: Long): Long = acc + acc1

}

//自定义窗口函数,包装成中间输出的商品浏览量的样例类ItemViewCount输出
//in:累加的输出结果,out:ItemViewCount样例类输出结果,key:itemid,window:timewindow
class WindowResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
//定义key的变量
val itemId = key
//拿到窗口结束时间
val windowEnd = window.getEnd
//计数
val count = input.iterator.next()
//收集输出结果,传入样例类
out.collect(new ItemViewCount(itemId, windowEnd, count))

}

}

class TopNHot(n: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
//定义一个中间状态
var itemState: ListState[ItemViewCount] = _
//初始化状态

override def open(parameters: Configuration): Unit = {
  itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]))
}

//处理每一个元素时调用
override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  //每一条数据都存到state里
  itemState.add(i)
  //注册定时器,定时器触发的时候当前windowEnd的数据都到了,做排序处理
  context.timerService().registerEventTimeTimer(i.windowEnd + 100)

}

//定时调用,注册定时器Timer并触发之后的回调操作
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  //收集所有数据放到list里面
  val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
  //使用itemstate.get的foreach方法,首先引入javaConversions
  import scala.collection.JavaConversions._
  for (item <- itemState.get()) {
    allItems += item
  }
  //清除状态,释放空间
  itemState.clear()
  //按照itemviewCount大小排序
  val sortedItem = allItems.sortBy(_.count)(Ordering.Long.reverse).take(n)
  //将排序号的数据打印输出
  val result = new StringBuilder()
  print("-------------")
  //当前时间窗口
  result.append("时间").append(new Timestamp(timestamp - 100)).append("\n")
  //通过for循环输出商品信息
  for (elem <- sortedItem.indices) {
    val current = sortedItem(elem)
    result.append("No").append(elem + 1).append(":")
      .append("商品ID=").append(current.itemid)
      .append("浏览量").append(current.count)
      .append("\n")
  }
  result.append("``````````````````")
  Thread.sleep(1000)
  out.collect(result.toString())
}

}