欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark stream读取kafka wordcount程序

程序员文章站 2022-06-14 15:06:26
...

java kafka生产者/消费者 代码参考这篇博客 https://blog.csdn.net/fanghailiang2016/article/details/108249158

通过aparkStreaming读取kafka中数据,进行wordcount单词统计

gradle配置如下

implementation "org.apache.spark:spark-streaming-kafka-0-10_$scalaVersion:$sparkVersion"

读取kafkaStram进行单词统计代码如下:

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaWordCount")
    val sc = SparkContext.getOrCreate(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "127.0.0.1:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test4",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("first_topic")
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val resultDstream: DStream[(String, Int)] = dstream.flatMap(record => record.value.split(" ")).map((_, 1)).reduceByKey(_ + _)
    resultDstream.print()

    ssc.start()
    ssc.awaitTermination()
  }