欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Sink

程序员文章站 2022-07-14 14:04:53
...

Flink Sink

官网提供的Sink
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

1 pom

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>

2 kafka sink

  • 代码
    public static void main(String[] args) throws Exception {

        // 从test-in读出数据 加工 输出到test
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        consumerProperties.setProperty("group.id", "test");

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "test-in",
                new SimpleStringSchema(),
                consumerProperties);

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                "test",                  // 目标 topic
                new SimpleStringSchema(),     // 序列化 schema
                producerProperties); // 容错

        // 确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

        /**
         * 设置Kafka Consumer 开始消费的位置
         * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。
         * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置
         */
        kafkaConsumer.setStartFromGroupOffsets();

        DataStream<String> dataStream = env.addSource(kafkaConsumer);

        dataStream.print().setParallelism(1);

        dataStream.map(s -> s + System.currentTimeMillis()).addSink(kafkaProducer);

        env.execute();

    }
  • 结果
  1. kafka生产者
    Flink Sink

  2. flink 输出
    Flink Sink

  3. sink输出
    Flink Sink

相关标签: flink