Flink DataStream读写Kafka
Flink提供了Kafka连接器,用于从或向Kafka读写数据。
本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。
问题一: 读Kafka的方式
## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)
# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
问题二: 读Kafka与反序列化器
可通过org.apache.flink.api.common.serialization.DeserializationSchema
或org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。
Flink内置支持以下2种常用反序列化器:
-
org.apache.flink.api.common.serialization.SimpleStringSchema
:反序列化成String。 -
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
:反序列化成jackson ObjectNode。
如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema
自定义即可。主要实现deserialize
反序列化方法。
问题三: 读Kafka动态发现Topic、Partition
之前使用Spark Streaming,Spark 2.2.2
不支持动态发现Kafka 0.10.1
中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。
在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。
kafkaProperties.put("flink.partition-discovery.interval-millis","10000");
flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。
问题四: 读Kafka与Exactly Once语义
没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。
开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。
总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer
能提供Exactly-Once语义。
问题五: 写Kafka与Exactly Once语义
-
Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。
-
Kafka 0.9、0.10 Flink启用Checkpoint,
FlinkKafkaProducer09
和FlinkKafkaProducer010
提供At-Least-Once语义。除此之外,还需设置以下参数:setLogFailuresOnly(false)
: 若为true
,Producer
遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false
,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。setFlushOnCheckpoint(true)
: Checkpoint中包含Kafka Producer Buffer
中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。retries
: 重试次数,默认0,建议设置更大。 -
Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于
Two Phase Commit
,FlinkKafkaProducer011
和FlinkKafkaProducer(Kafka >=1.0.0)
默认提供Exactly-Once语义。
如需要其他语义Semantic.NONE(可能会丢或重)
、Semantic.AT_LEAST_ONCE(可能会重)
、Semantic.EXACTLY_ONCE(默认)
,可手动选择。
从Kafka 0.10.1
读数据并写入到Kafka 0.11.0.3
并实现PV统计
项目依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.8.0</version>
</dependency>
代码实现
package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* Author: Wang Pei
* Summary: 读写Kafka
*/
public class ReadWriteKafka {
public static void main(String[] args) throws Exception{
/**解析命令行参数*/
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
//checkpoint参数
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
//fromKafka参数
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
//toKafka参数
String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
String toKafkaTopic = parameterTool.getRequired("toKafka.topic");
//窗口参数
long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");
/**配置运行环境*/
//设置Local Web Server
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
//设置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
/**配置数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
/**抽取转换*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
.name("Map: ExtractTransform").uid("map-id");
/**过滤掉异常数据*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
.name("Filter: FilterExceptionData").uid("filter-id");
/**抽取时间戳并发射水印*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(Tuple4<String, String, String, Integer> element) {
long timestamp = 0L;
try {
Date date = format.parse(element.f1);
timestamp = date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return timestamp;
}
}).uid("watermark-id");
/**窗口统计*/
SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
//默认用Hash方式
.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
//在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)
//CustomAggFunction用于增量聚合
//在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出
//具体: 可参考底层AggregateApplyWindowFunction的实现
.aggregate(new CustomAggFunction(), new CustomWindowFunction());
//aggregate.print();
/**结果输出*/
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
toKafkaTopic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProducerProperties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);
aggregate.addSink(kafkaProducer011).name("outputToKafka");
env.execute();
}
/**
* 自定义AggregateFunction
* 增量聚合,这里实现累加效果
*/
static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
/**
* 自定义WindowFunction
* 对增量聚合的结果再做处理,并输出
*/
static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long windowStart = window.getStart();
long windowEnd = window.getEnd();
Long windowPV = input.iterator().next();
String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;
out.collect(output);
}
}
}
上一篇: Centos7 安装GMT5.4.4
下一篇: WPF实现地震波警示效果
推荐阅读
-
Flink1.9整合Kafka
-
Flink DataStream API之Operators
-
flink学习之六-数据持久化to-kafka
-
Flink入门(二)(使用kafka作为sink和source)
-
Flink kafka source & sink 源码解析
-
Flink实战(八)Flink 使用 Kafka Source & Kafka Sink
-
Flink DataSet Kafka Sink
-
使用IntelliJ IDEA导入 Flink 消费kafka报错 Error: A JNI error has occurred, please check your installation an
-
Flink DataStream API编程指南
-
flink sql-clent MATCH_RECOGNIZE kafka 例子