Flink Sink
程序员文章站
2022-07-14 14:04:53
...
文章目录
Flink Sink
官网提供的Sink
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
1 pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.0</version>
</dependency>
2 kafka sink
- 代码
public static void main(String[] args) throws Exception {
// 从test-in读出数据 加工 输出到test
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
consumerProperties.setProperty("group.id", "test");
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"test-in",
new SimpleStringSchema(),
consumerProperties);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
"test", // 目标 topic
new SimpleStringSchema(), // 序列化 schema
producerProperties); // 容错
// 确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
/**
* 设置Kafka Consumer 开始消费的位置
* 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。
* 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置
*/
kafkaConsumer.setStartFromGroupOffsets();
DataStream<String> dataStream = env.addSource(kafkaConsumer);
dataStream.print().setParallelism(1);
dataStream.map(s -> s + System.currentTimeMillis()).addSink(kafkaProducer);
env.execute();
}
- 结果
-
kafka生产者
-
flink 输出
-
sink输出
上一篇: 链式队列的实现
下一篇: kafka 获取metadata
推荐阅读
-
Flink SQL 系列 | 5 个 TableEnvironment 我该用哪个? sqlscalajavajvm
-
使用maven命令生成flink项目出现问题及解决办法
-
Flink MVN 打包配置文件找不到路径问题
-
新一代大数据处理引擎 Apache Flink
-
《大数据实时计算引擎 Flink 实战与性能优化》新专栏
-
Flink入门宝典(详细截图版)
-
基于Flink和规则引擎的实时风控解决方案 活动工作互联网redishbase
-
JAVA EE BigData Apache Storm / Spark / Samza / apache storm / Flink
-
新一代大数据计算引擎 Flink从入门到实战
-
新一代大数据计算引擎 Flink从入门到实战 flink