欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink整合Kafka实现WordCount

程序员文章站 2022-06-14 13:45:48
...

一.简介

Flink的基本信息和API信息以及Kafka的基本信息在此不再赘述,需要了解的参考博客:
Flink:Flink流处理API编程指南
Kafka:Kafka基本信息

二.代码实战

package cn.kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time

// import org.apache.flink.api.scala._ // 有限数据集类型隐式转换
import org.apache.flink.streaming.api.scala._ // 无限数据集类型隐式转换
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

object FlinkKafka {

  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 将Flink默认的开发环境并行度设置为1
    env.setParallelism(1)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers" , "master:9092,slave01:9092,slave02:9092")
    properties.setProperty("zookeeper.connect" , "master:2181,slave01:2181,slave02:2181")
    properties.setProperty("group.id" , "spark")
    properties.setProperty("enable.auto.commit" , "true")
    properties.setProperty("auto.commit.interval.ms" ,"5000")

    /**
      * 配置下次重新消费的话,从哪里开始消费:
      * latest:从上一次提交的offset位置开始的
      * earlist:从头开始进行(重复消费数据)
      */
    properties.setProperty("auto.offset.reset" , "latest")
    // 配置序列化和反序列化
    properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")

    //获取数据源 kafka
    val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String](
     "spark", new SimpleStringSchema(), properties
    )

    val kafkaDataStream : DataStream[String] = env.addSource(consumer)

    val result : DataStream[(String, Int)] = kafkaDataStream
        .flatMap(row => row.split("\\s"))
        .map(row => (row, 1))
        .keyBy(_._1)
        .timeWindow(Time.seconds(2))
        .sum(1)

    result.print()
    env.execute("FlinkKafkaWordCount")
  }
}

三.执行结果

1.kafka生产者
Flink整合Kafka实现WordCount
2.Flink消费者
Flink整合Kafka实现WordCount