欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink1.9整合Kafka

程序员文章站 2022-05-29 09:47:52
本文基于Flink1.9版本简述如何连接Kafka。 流式连接器 我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义的source支持从文件、目录、socket,以及 collections 和 iterators ......

Flink1.9整合Kafka

本文基于flink1.9版本简述如何连接kafka。

流式连接器

Flink1.9整合Kafka

我们知道可以自己来开发source 和 sink ,但是一些比较基本的 source 和 sink 已经内置在 flink 里。

预定义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。

预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:

  • apache kafka
  • apache cassandra(sink)
  • amazon kinesis streams(source/sink)
  • elasticsearch(sink)
  • hadoop filesystem (sink)
  • rabbitmq(source/sink)
  • apache nifi(source/sink)
  • twitter streaming api(source)

请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。

apache bahir 中定义了其他一些连接器

  • apache activemq(source/sink)
  • apache flume(sink)
  • redis(sink)
  • akka (sink)
  • netty (source)

使用connector并不是唯一可以使数据进入或者流出flink的方式。一种常见的模式是从外部数据库或者 web 服务查询数据得到初始数据流,然后通过 map 或者 flatmap 对初始数据流进行丰富和增强,这里要使用flink的异步io。

而向外部存储推送大量数据时会导致 i/o 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用从 flink 拉取所需的数据,需要用到flink的可查询状态接口。

本文重点介绍apache kafka connector

kafka连接器

此连接器提供对apache kafka提供的事件流的访问。

flink提供特殊的kafka连接器,用于从/向kafka主题读取和写入数据。flink kafka consumer集成了flink的检查点机制,可提供一次性处理语义。为实现这一目标,flink并不完全依赖kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。

下表为不同版本的kafka与flink kafka consumer的对应关系。

maven dependency supported since consumer and producer class name kafka version
flink-connector-kafka-0.8_2.11 1.0.0 flinkkafkaconsumer08 flinkkafkaproducer08 0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 flinkkafkaconsumer09 flinkkafkaproducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 flinkkafkaconsumer010 flinkkafkaproducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 flinkkafkaconsumer011 flinkkafkaproducer011 0.11.x
flink-connector-kafka_2.11 1.7.0 flinkkafkaconsumer flinkkafkaproducer >= 1.0.0

而从最新的flink1.9.0版本开始,使用kafka 2.2.0客户端。

下面简述使用步骤。

导入maven依赖:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-connector-kafka_2.11</artifactid>
  <version>1.9.0</version>
</dependency>

安装kafka:

可以参照 kafka入门宝典(详细截图版)

兼容性:

从flink 1.7开始,它不跟踪特定的kafka主要版本。相反,它在flink发布时跟踪最新版本的kafka。如果您的kafka代理版本是1.0.0或更高版本,则应使用此kafka连接器。如果使用旧版本的kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。

升级connect要注意flink升级作业,同时

  • 在整个过程中使用flink 1.9或更新版本。
  • 不要同时升级flink和运营商。

  • 确保您作业中使用的kafka consumer和/或kafka producer分配了唯一标识符(uid)。

  • 使用stop with savepoint功能获取保存点(例如,使用stop --withsavepoint)。

用法:

引入依赖后,实例化新的source(flinkkafkaconsumer)和sink(flinkkafkaproducer)。

kafka consumer

先分步骤介绍构建过程,文末附flink1.9连接kafka完整代码。

kafka consumer 根据版本分别叫做flinkkafkaconsumer08 flinkkafkaconsumer09等等
kafka >= 1.0.0 的版本就叫flinkkafkaconsumer。

构建flinkkafkaconsumer

java示例代码如下:

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181");
properties.setproperty("group.id", "test");
datastream<string> stream = env
    .addsource(new flinkkafkaconsumer<>("topic", new simplestringschema(), properties));

scala:

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181")
properties.setproperty("group.id", "test")
stream = env
    .addsource(new flinkkafkaconsumer[string]("topic", new simplestringschema(), properties))
    .print()

必须有的:

1.topic名称

2.用于反序列化kafka数据的deserializationschema / kafkadeserializationschema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

配置消费起始位置

java:

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>(...);
myconsumer.setstartfromearliest();     // start from the earliest record possible
myconsumer.setstartfromlatest();       // start from the latest record
myconsumer.setstartfromtimestamp(...); // start from specified epoch timestamp (milliseconds)
myconsumer.setstartfromgroupoffsets(); // the default behaviour

//指定位置
//map<kafkatopicpartition, long> specificstartoffsets = new hashmap<>();
//specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l);
//myconsumer.setstartfromspecificoffsets(specificstartoffsets);

datastream<string> stream = env.addsource(myconsumer);

scala:

val env = streamexecutionenvironment.getexecutionenvironment()

val myconsumer = new flinkkafkaconsumer[string](...)
myconsumer.setstartfromearliest()      // start from the earliest record possible
myconsumer.setstartfromlatest()        // start from the latest record
myconsumer.setstartfromtimestamp(...)  // start from specified epoch timestamp (milliseconds)
myconsumer.setstartfromgroupoffsets()  // the default behaviour

//指定位置
//val specificstartoffsets = new java.util.hashmap[kafkatopicpartition, java.lang.long]()
//specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l)
//myconsumer.setstartfromspecificoffsets(specificstartoffsets)

val stream = env.addsource(myconsumer)
检查点

启用flink的检查点后,flink kafka consumer将使用主题中的记录,并以一致的方式定期检查其所有kafka偏移以及其他操作的状态。如果作业失败,flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用kafka的记录。

如果禁用了检查点,则flink kafka consumer依赖于内部使用的kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则flink kafka consumer将在检查点完成时提交存储在检查点状态中的偏移量。

java

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
env.enablecheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = streamexecutionenvironment.getexecutionenvironment()
env.enablecheckpointing(5000) // checkpoint every 5000 msecs
分区发现

flink kafka consumer支持发现动态创建的kafka分区,并使用一次性保证消费它们。

还可以使用正则:

java

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
properties.setproperty("group.id", "test");

flinkkafkaconsumer011<string> myconsumer = new flinkkafkaconsumer011<>(
    java.util.regex.pattern.compile("test-topic-[0-9]"),
    new simplestringschema(),
    properties);

datastream<string> stream = env.addsource(myconsumer);
...

scala

val env = streamexecutionenvironment.getexecutionenvironment()

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
properties.setproperty("group.id", "test")

val myconsumer = new flinkkafkaconsumer08[string](
  java.util.regex.pattern.compile("test-topic-[0-9]"),
  new simplestringschema,
  properties)

val stream = env.addsource(myconsumer)
...
时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好timestamp extractors / watermark emitters,通过以下方式将其传递给您的消费者:

java

properties properties = new properties();
properties.setproperty("bootstrap.servers", "localhost:9092");
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181");
properties.setproperty("group.id", "test");

flinkkafkaconsumer08<string> myconsumer =
    new flinkkafkaconsumer08<>("topic", new simplestringschema(), properties);
myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter());

datastream<string> stream = env
    .addsource(myconsumer)
    .print();

scala

val properties = new properties()
properties.setproperty("bootstrap.servers", "localhost:9092")
// only required for kafka 0.8
properties.setproperty("zookeeper.connect", "localhost:2181")
properties.setproperty("group.id", "test")

val myconsumer = new flinkkafkaconsumer08[string]("topic", new simplestringschema(), properties)
myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter())
stream = env
    .addsource(myconsumer)
    .print()

kafka producer

kafka producer 根据版本分别叫做flinkproducer011 flinkkafkaproducer010等等
kafka >= 1.0.0 的版本就叫flinkkafkaproducer 。

构建flinkkafkaconsumer

java

datastream<string> stream = ...;

flinkkafkaproducer011<string> myproducer = new flinkkafkaproducer011<string>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new simplestringschema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to kafka;
// this method is not available for earlier kafka versions
myproducer.setwritetimestamptokafka(true);

stream.addsink(myproducer);

scala

val stream: datastream[string] = ...

val myproducer = new flinkkafkaproducer011[string](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new simplestringschema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to kafka;
// this method is not available for earlier kafka versions
myproducer.setwritetimestamptokafka(true)

stream.addsink(myproducer)

需要指定broker list , topic,序列化类。

自定义分区:默认情况下,将使用flinkfixedpartitioner将每个flink kafka producer并行子任务映射到单个kafka分区。

可以实现flinkkafkapartitioner类自定义分区。

flink1.9消费kafka完整代码:

import org.apache.flink.api.common.serialization.simplestringschema;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer;
import java.util.properties;

public class kafkaconsumer {

    public static void main(string[] args) throws exception {
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

        properties properties = new properties();
        properties.setproperty("bootstrap.servers", "localhost:9092");
        properties.setproperty("group.id", "test");
        //构建flinkkafkaconsumer
        flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>("topic", new simplestringschema(), properties);
        //指定偏移量
        myconsumer.setstartfromearliest();


        datastream<string> stream = env
                .addsource(myconsumer);

        env.enablecheckpointing(5000);
        stream.print();

        env.execute("flink streaming java api skeleton");
    }

项目地址:

更多flink知识,欢迎关注实时流式计算

Flink1.9整合Kafka