欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka的receive方式实现WordCount,使用updateStateByKey函数,累加所有批次的wordCount

程序员文章站 2022-06-14 13:45:54
...

Spark Streaming的updateStateByKey可以把DStream中的数据按key做reduce操作,然后对各个批次的数据进行累加。注意
wordDstream.updateStateByKey[Int]每次传递给updateFunc函数两个参数,其中,
1、第一个参数是某个key(即某个单词)的当前批次的一系列值的列表(Seq[Int]形式),updateFunc函数中 val currentCount = values.foldLeft(0)(_ + _)的作用,就是计算这个被传递进来的与某个key对应的当前批次的所有值的总和,也就是当前批次某个单词的出现次数,保存在变量currentCount中。
2、传递给updateFunc函数的第二个参数是某个key的历史状态信息,也就是某个单词历史批次的词频汇总结果。实际上,某个单词的历史词频应该是一个Int类型,这里为什么要采用Option[Int]呢?
Option[Int]是类型 Int的容器,更确切地说,你可以把它看作是某种集合,这个特殊的集合要么只包含一个元素(即单词的历史词频),要么就什么元素都没有(这个单词历史上没有出现过,所以没有历史词频信息)。之所以采用 Option[Int]保存历史词频信息,这是因为,历史词频可能不存在,很多时候,在值不存在时,需要进行回退,或者提供一个默认值,Scala 为Option类型提供了getOrElse方法,以应对这种情况。 state.getOrElse(0)的含义是,如果该单词没有历史词频统计汇总结果,那么,就取值为0,如果有历史词频统计结果,就取历史结果,然后赋值给变量previousCount。最后,当前值和历史值进行求和,并包装在Some中返回。

package day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}

object kafkaConsumerDemo01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
    //new StreamingContext, 设置每个批次的时间间隔是5秒
    val ssc =new StreamingContext(conf,Duration(5000))
    //zookeeper地址,通过zookeeper获取元数据
    val zkQurme = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
    //使用checkPoint,保存中间的结果
    ssc.checkpoint("D:\\Yue\\chk_0")
    val groupid = "tt01"//组名
    val topics = Map("test02"->2)//要消费的topic名,和线程数,可以有多个topic
    //创建一个ReceiverInputDStream,相当于kafka的消费者
    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQurme,groupid,topics)
    val fm: DStream[String] = kafkaDStream.flatMap(_._2.split(" "))
    val maped: DStream[(String, Int)] = fm.map(x=>(x,1))
    //定义一个函数seq代表当前批次的某个key的所对应的value的集合
    //state是以前读取的这个key的value的总和
    val updateFunc =(seq:Seq[Int],state:Option[Int])=>{
      val crunt: Int = seq.foldLeft(0)(_+_)
      //使用foldLeft,进行累加,不能使用reduceLeft/reduce 会报 empty.reduceLeft
      val sum = crunt + state.getOrElse(0)
      Some(sum)
    }
    val update: DStream[(String, Int)] = maped.updateStateByKey(updateFunc)
    update.print()
    ssc.start()
    ssc.awaitTermination()

    
  }

}