欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

sparkStreaming读取kafka数据实现wordcount

程序员文章站 2022-06-14 13:40:01
...

pom.xml如下

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
  </dependencies>
package nj.zb

import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
*将数据从kafkatopic A 取出数据,加工处理后输出到kafkatopic B
* */
object SparkStreamKafkaSourceToKafkaSinkWordCount {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))


    streamingContext.checkpoint("checokpoint")
    val kafkaParmas: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.125:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Seq("sparkKafkaDemo"), kafkaParmas)
    )
    //TODO
    val wordCountStream: DStream[(String, Int)] =
      kafkaStream.flatMap(v=>v.value().toString.split("\\s+"))
        .map(x => (x, 1)).reduceByKey(_ + _)

    wordCountStream.foreachRDD(
      rdd=>{
        rdd.foreachPartition(
          x=>{
            val props = new util.HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.125:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String, String](props)
            x.foreach(
              y=>{
                val word=y._1
                val num=y._2
                val record =
                  new ProducerRecord[String, String]("sparkKafkaDemoOUT", "", word+","+num)
                producer.send(record)
              }
            )
          }
        )
      }
    )
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}