spark stream读取kafka wordcount程序
程序员文章站
2022-06-14 15:06:26
...
java kafka生产者/消费者 代码参考这篇博客 https://blog.csdn.net/fanghailiang2016/article/details/108249158
通过aparkStreaming读取kafka中数据,进行wordcount单词统计
gradle配置如下
implementation "org.apache.spark:spark-streaming-kafka-0-10_$scalaVersion:$sparkVersion"
读取kafkaStram进行单词统计代码如下:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaWordCount")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test4",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("first_topic")
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val resultDstream: DStream[(String, Int)] = dstream.flatMap(record => record.value.split(" ")).map((_, 1)).reduceByKey(_ + _)
resultDstream.print()
ssc.start()
ssc.awaitTermination()
}
上一篇: 解析PHP无限级分类方法及代码
下一篇: php自定义大局常量与类常量
推荐阅读
-
spark读取kafka数据(两种方式比较及flume配置文件)
-
spark stream读取kafka wordcount程序
-
将java开发的wordcount程序部署到spark集群上运行
-
Mac Spark 运行 wordcount 程序
-
CDH 开发运行Spark wordcount程序
-
kafka stream实现wordcount计数
-
sparkStreaming读取kafka数据实现wordcount
-
Spark-Streaming整合Kafka实现wordcount
-
spark wordcount 第一个spark 程序
-
Spark 运行第一个Scala程序WordCount