Flink整合Kafka实现WordCount
程序员文章站
2022-06-14 13:45:48
...
一.简介
Flink的基本信息和API信息以及Kafka的基本信息在此不再赘述,需要了解的参考博客:
Flink:Flink流处理API编程指南
Kafka:Kafka基本信息
二.代码实战
package cn.kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
// import org.apache.flink.api.scala._ // 有限数据集类型隐式转换
import org.apache.flink.streaming.api.scala._ // 无限数据集类型隐式转换
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
object FlinkKafka {
def main(args: Array[String]): Unit = {
//获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 将Flink默认的开发环境并行度设置为1
env.setParallelism(1)
val properties = new Properties()
properties.setProperty("bootstrap.servers" , "master:9092,slave01:9092,slave02:9092")
properties.setProperty("zookeeper.connect" , "master:2181,slave01:2181,slave02:2181")
properties.setProperty("group.id" , "spark")
properties.setProperty("enable.auto.commit" , "true")
properties.setProperty("auto.commit.interval.ms" ,"5000")
/**
* 配置下次重新消费的话,从哪里开始消费:
* latest:从上一次提交的offset位置开始的
* earlist:从头开始进行(重复消费数据)
*/
properties.setProperty("auto.offset.reset" , "latest")
// 配置序列化和反序列化
properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")
//获取数据源 kafka
val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String](
"spark", new SimpleStringSchema(), properties
)
val kafkaDataStream : DataStream[String] = env.addSource(consumer)
val result : DataStream[(String, Int)] = kafkaDataStream
.flatMap(row => row.split("\\s"))
.map(row => (row, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(2))
.sum(1)
result.print()
env.execute("FlinkKafkaWordCount")
}
}
三.执行结果
1.kafka生产者
2.Flink消费者
推荐阅读
-
Flink1.9整合Kafka
-
Flink+Kafka整合的实例
-
Flink的WordCount实现(Java和Scala)
-
使用flink实现一个简单的wordcount
-
【Flink】如何整合flink和kafka,将kafka作为flink的source和sink
-
flink实战--MAC-flink环境搭建&wordcount实现
-
kafka的receive方式实现WordCount,使用updateStateByKey函数,累加所有批次的wordCount
-
Flink整合Kafka实现WordCount
-
kafka stream实现wordcount计数
-
sparkStreaming读取kafka数据实现wordcount