Flink1.9整合Kafka
本文基于flink1.9版本简述如何连接kafka。
流式连接器
我们知道可以自己来开发source 和 sink ,但是一些比较基本的 source 和 sink 已经内置在 flink 里。
预定义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。
预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:
- apache kafka
- apache cassandra(sink)
- amazon kinesis streams(source/sink)
- elasticsearch(sink)
- hadoop filesystem (sink)
- rabbitmq(source/sink)
- apache nifi(source/sink)
- twitter streaming api(source)
请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。
apache bahir 中定义了其他一些连接器
- apache activemq(source/sink)
- apache flume(sink)
- redis(sink)
- akka (sink)
- netty (source)
使用connector并不是唯一可以使数据进入或者流出flink的方式。一种常见的模式是从外部数据库或者 web 服务查询数据得到初始数据流,然后通过 map
或者 flatmap
对初始数据流进行丰富和增强,这里要使用flink的异步io。
而向外部存储推送大量数据时会导致 i/o 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用从 flink 拉取所需的数据,需要用到flink的可查询状态接口。
本文重点介绍apache kafka connector
kafka连接器
此连接器提供对apache kafka提供的事件流的访问。
flink提供特殊的kafka连接器,用于从/向kafka主题读取和写入数据。flink kafka consumer集成了flink的检查点机制,可提供一次性处理语义。为实现这一目标,flink并不完全依赖kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。
下表为不同版本的kafka与flink kafka consumer的对应关系。
maven dependency | supported since | consumer and producer class name | kafka version |
---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | flinkkafkaconsumer08 flinkkafkaproducer08 | 0.8.x |
flink-connector-kafka-0.9_2.11 | 1.0.0 | flinkkafkaconsumer09 flinkkafkaproducer09 | 0.9.x |
flink-connector-kafka-0.10_2.11 | 1.2.0 | flinkkafkaconsumer010 flinkkafkaproducer010 | 0.10.x |
flink-connector-kafka-0.11_2.11 | 1.4.0 | flinkkafkaconsumer011 flinkkafkaproducer011 | 0.11.x |
flink-connector-kafka_2.11 | 1.7.0 | flinkkafkaconsumer flinkkafkaproducer | >= 1.0.0 |
而从最新的flink1.9.0版本开始,使用kafka 2.2.0客户端。
下面简述使用步骤。
导入maven依赖:
<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-connector-kafka_2.11</artifactid> <version>1.9.0</version> </dependency>
安装kafka:
可以参照 kafka入门宝典(详细截图版)
兼容性:
从flink 1.7开始,它不跟踪特定的kafka主要版本。相反,它在flink发布时跟踪最新版本的kafka。如果您的kafka代理版本是1.0.0或更高版本,则应使用此kafka连接器。如果使用旧版本的kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。
升级connect要注意flink升级作业,同时
- 在整个过程中使用flink 1.9或更新版本。
不要同时升级flink和运营商。
确保您作业中使用的kafka consumer和/或kafka producer分配了唯一标识符(
uid
)。使用stop with savepoint功能获取保存点(例如,使用
stop --withsavepoint
)。
用法:
引入依赖后,实例化新的source(flinkkafkaconsumer
)和sink(flinkkafkaproducer
)。
kafka consumer
先分步骤介绍构建过程,文末附flink1.9连接kafka完整代码。
kafka consumer 根据版本分别叫做flinkkafkaconsumer08 flinkkafkaconsumer09等等
kafka >= 1.0.0 的版本就叫flinkkafkaconsumer。
构建flinkkafkaconsumer
java示例代码如下:
properties properties = new properties(); properties.setproperty("bootstrap.servers", "localhost:9092"); // only required for kafka 0.8 properties.setproperty("zookeeper.connect", "localhost:2181"); properties.setproperty("group.id", "test"); datastream<string> stream = env .addsource(new flinkkafkaconsumer<>("topic", new simplestringschema(), properties));
scala:
val properties = new properties() properties.setproperty("bootstrap.servers", "localhost:9092") // only required for kafka 0.8 properties.setproperty("zookeeper.connect", "localhost:2181") properties.setproperty("group.id", "test") stream = env .addsource(new flinkkafkaconsumer[string]("topic", new simplestringschema(), properties)) .print()
必须有的:
1.topic名称
2.用于反序列化kafka数据的deserializationschema / kafkadeserializationschema
3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)
配置消费起始位置
java:
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>(...); myconsumer.setstartfromearliest(); // start from the earliest record possible myconsumer.setstartfromlatest(); // start from the latest record myconsumer.setstartfromtimestamp(...); // start from specified epoch timestamp (milliseconds) myconsumer.setstartfromgroupoffsets(); // the default behaviour //指定位置 //map<kafkatopicpartition, long> specificstartoffsets = new hashmap<>(); //specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l); //myconsumer.setstartfromspecificoffsets(specificstartoffsets); datastream<string> stream = env.addsource(myconsumer);
scala:
val env = streamexecutionenvironment.getexecutionenvironment() val myconsumer = new flinkkafkaconsumer[string](...) myconsumer.setstartfromearliest() // start from the earliest record possible myconsumer.setstartfromlatest() // start from the latest record myconsumer.setstartfromtimestamp(...) // start from specified epoch timestamp (milliseconds) myconsumer.setstartfromgroupoffsets() // the default behaviour //指定位置 //val specificstartoffsets = new java.util.hashmap[kafkatopicpartition, java.lang.long]() //specificstartoffsets.put(new kafkatopicpartition("mytopic", 0), 23l) //myconsumer.setstartfromspecificoffsets(specificstartoffsets) val stream = env.addsource(myconsumer)
检查点
启用flink的检查点后,flink kafka consumer将使用主题中的记录,并以一致的方式定期检查其所有kafka偏移以及其他操作的状态。如果作业失败,flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用kafka的记录。
如果禁用了检查点,则flink kafka consumer依赖于内部使用的kafka客户端的自动定期偏移提交功能。
如果启用了检查点,则flink kafka consumer将在检查点完成时提交存储在检查点状态中的偏移量。
java
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); env.enablecheckpointing(5000); // checkpoint every 5000 msecs
scala
val env = streamexecutionenvironment.getexecutionenvironment() env.enablecheckpointing(5000) // checkpoint every 5000 msecs
分区发现
flink kafka consumer支持发现动态创建的kafka分区,并使用一次性保证消费它们。
还可以使用正则:
java
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); properties properties = new properties(); properties.setproperty("bootstrap.servers", "localhost:9092"); properties.setproperty("group.id", "test"); flinkkafkaconsumer011<string> myconsumer = new flinkkafkaconsumer011<>( java.util.regex.pattern.compile("test-topic-[0-9]"), new simplestringschema(), properties); datastream<string> stream = env.addsource(myconsumer); ...
scala
val env = streamexecutionenvironment.getexecutionenvironment() val properties = new properties() properties.setproperty("bootstrap.servers", "localhost:9092") properties.setproperty("group.id", "test") val myconsumer = new flinkkafkaconsumer08[string]( java.util.regex.pattern.compile("test-topic-[0-9]"), new simplestringschema, properties) val stream = env.addsource(myconsumer) ...
时间戳和水印
在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。
我们可以定义好timestamp extractors / watermark emitters,通过以下方式将其传递给您的消费者:
java
properties properties = new properties(); properties.setproperty("bootstrap.servers", "localhost:9092"); // only required for kafka 0.8 properties.setproperty("zookeeper.connect", "localhost:2181"); properties.setproperty("group.id", "test"); flinkkafkaconsumer08<string> myconsumer = new flinkkafkaconsumer08<>("topic", new simplestringschema(), properties); myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter()); datastream<string> stream = env .addsource(myconsumer) .print();
scala
val properties = new properties() properties.setproperty("bootstrap.servers", "localhost:9092") // only required for kafka 0.8 properties.setproperty("zookeeper.connect", "localhost:2181") properties.setproperty("group.id", "test") val myconsumer = new flinkkafkaconsumer08[string]("topic", new simplestringschema(), properties) myconsumer.assigntimestampsandwatermarks(new customwatermarkemitter()) stream = env .addsource(myconsumer) .print()
kafka producer
kafka producer 根据版本分别叫做flinkproducer011 flinkkafkaproducer010等等
kafka >= 1.0.0 的版本就叫flinkkafkaproducer 。
构建flinkkafkaconsumer
java
datastream<string> stream = ...; flinkkafkaproducer011<string> myproducer = new flinkkafkaproducer011<string>( "localhost:9092", // broker list "my-topic", // target topic new simplestringschema()); // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to kafka; // this method is not available for earlier kafka versions myproducer.setwritetimestamptokafka(true); stream.addsink(myproducer);
scala
val stream: datastream[string] = ... val myproducer = new flinkkafkaproducer011[string]( "localhost:9092", // broker list "my-topic", // target topic new simplestringschema) // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to kafka; // this method is not available for earlier kafka versions myproducer.setwritetimestamptokafka(true) stream.addsink(myproducer)
需要指定broker list , topic,序列化类。
自定义分区:默认情况下,将使用flinkfixedpartitioner
将每个flink kafka producer并行子任务映射到单个kafka分区。
可以实现flinkkafkapartitioner类自定义分区。
flink1.9消费kafka完整代码:
import org.apache.flink.api.common.serialization.simplestringschema; import org.apache.flink.streaming.api.datastream.datastream; import org.apache.flink.streaming.api.environment.streamexecutionenvironment; import org.apache.flink.streaming.connectors.kafka.flinkkafkaconsumer; import java.util.properties; public class kafkaconsumer { public static void main(string[] args) throws exception { final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); properties properties = new properties(); properties.setproperty("bootstrap.servers", "localhost:9092"); properties.setproperty("group.id", "test"); //构建flinkkafkaconsumer flinkkafkaconsumer<string> myconsumer = new flinkkafkaconsumer<>("topic", new simplestringschema(), properties); //指定偏移量 myconsumer.setstartfromearliest(); datastream<string> stream = env .addsource(myconsumer); env.enablecheckpointing(5000); stream.print(); env.execute("flink streaming java api skeleton"); }
项目地址:
更多flink知识,欢迎关注实时流式计算
上一篇: 悄悄告诉你晚上吃什么水果不会胖,不用拒绝食物的诱惑
下一篇: Jupyter 快捷键总结