Flink DataSet Kafka Sink
程序员文章站
2022-07-14 14:04:53
...
1. 说明
Flink 一般都是用于realtime 计算的,不过其中的 DataSet 也提供了batch API。本人在项目中也好奇试用了下,其中遇到一个需求就是把DataSet的数据Sink到 Kafka。
需要注意的是Flink 官方的DataSet是不提供Kafka Sink API的,需要自己实现。当然也分 DataSet 数据量的大小,有不同的实现方式。
2. 小数据量
这个就比较水了,对于DataSet 先 collect() 一下转成 List。再把List 中的数据发送即可。优点就是简单,缺点就是不是并行发送的,使用小规模数据集还行,大规模数据根本不可行。代码:
2.1 运行主类
import com.alibaba.fastjson.JSON;
import dataset.sinkdata.kafka.bean.Json;
import dataset.sinkdata.kafka.method2.KafkaOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import java.util.Random;
public class Demo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> value = ...
List<String> resLs = value.collect();
SinkToKafka.kafkaProducer(resLs);
env.execute();
}
}
2.2 SinkToKafka 类
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.List;
import java.util.Properties;
public class SinkToKafka {
private static Config config = ConfigFactory.load();
public static void kafkaProducer(List<String> resLS) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "0");
props.setProperty("batch.size", "10");
props.setProperty("linger.ms", "1");
props.setProperty("buffer.memory", "10240");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (String context : resLS) {
producer.send(new ProducerRecord<>("Test_Topic", String.valueOf(System.currentTimeMillis()), context));
}
producer.close();
}
}
3. 大数据量
对于大数据量,好在Flink DataSet Sink 提供了output() 方法,可以让用户自定义OutputFormat。以下是我参考 flink-jdbc 中的JDBCOutputFormat类写的 KafkaOutputFormat。试了下还是可以用的,性能上完全秒杀上文的方法。
3.1 KafkaOutputFormat
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
public class KafkaOutputFormat extends RichOutputFormat<String> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class);
private String servers;
private String topic;
private String acks;
private String retries;
private String batchSize;
private String bufferMemory;
private String lingerMS;
private Producer<String, String> producer;
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", this.servers);
if (this.acks != null) {
props.setProperty("acks", this.acks);
}
if (this.retries != null) {
props.setProperty("retries", this.retries);
}
if (this.batchSize != null) {
props.setProperty("batch.size", this.batchSize);
}
if (this.lingerMS != null) {
props.setProperty("linger.ms", this.lingerMS);
}
if (this.bufferMemory != null) {
props.setProperty("buffer.memory", this.bufferMemory);
}
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
@Override
public void writeRecord(String record) throws IOException {
producer.send(new ProducerRecord<>(this.topic, String.valueOf(System.currentTimeMillis()), record));
}
@Override
public void close() throws IOException {
producer.close();
}
public static KafkaOutputFormatBuilder buildKafkaOutputFormat() {
return new KafkaOutputFormatBuilder();
}
public static class KafkaOutputFormatBuilder {
private final KafkaOutputFormat format;
public KafkaOutputFormatBuilder() {
this.format = new KafkaOutputFormat();
}
public KafkaOutputFormatBuilder setBootstrapServers(String servers) {
format.servers = servers;
return this;
}
public KafkaOutputFormatBuilder setTopic(String topic) {
format.topic = topic;
return this;
}
public KafkaOutputFormatBuilder setAcks(String acks) {
format.acks = acks;
return this;
}
public KafkaOutputFormatBuilder setRetries(String retries) {
format.retries = retries;
return this;
}
public KafkaOutputFormatBuilder setBatchSize(String batchSize) {
format.batchSize = batchSize;
return this;
}
public KafkaOutputFormatBuilder setBufferMemory(String bufferMemory) {
format.bufferMemory = bufferMemory;
return this;
}
public KafkaOutputFormatBuilder setLingerMs(String lingerMS) {
format.lingerMS = lingerMS;
return this;
}
public KafkaOutputFormat finish() {
if (format.servers == null) {
LOG.info("servers was not supplied separately.");
}
if (format.topic == null) {
LOG.info("topic was not supplied separately.");
}
return format;
}
}
}
3.2 运行主类
import dataset.sinkdata.kafka.bean.Json;
import dataset.sinkdata.kafka.method2.KafkaOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class Demo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> value = ...
value.output(
KafkaOutputFormat.buildKafkaOutputFormat()
.setBootstrapServers("localhost:9092")
.setTopic("Test_Topic_1")
.setAcks("all")
.setBatchSize("10")
.setBufferMemory("10240")
.setLingerMs("1")
.setRetries("0")
.finish()
);
env.execute();
}
}
上一篇: flink sink to redis
下一篇: 线性顺序队列的基本实现
推荐阅读
-
Flink1.9整合Kafka
-
flume使用kafka sink报错syntax error或kafka channel输出数据抬头有乱码/奇怪字符
-
Flink入门(五)——DataSet Api编程指南
-
flink sink hudi: org.apache.hudi.org.apache.avro.InvalidAvroMagicException: Not an Avro data file
-
flink学习之六-数据持久化to-kafka
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink1.11.2-pg-source&sink
-
flink的常用Source和Sink
-
Flink入门(二)(使用kafka作为sink和source)
-
Flink学习笔记-常用Source和Sink简单示例