欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink学习之六-数据持久化to-kafka

程序员文章站 2022-07-14 18:46:41
...

上面将数据从kafka搬运到了mysql中,而很多时候,在处理之后也可以继续放到kafka中,供下游消费。

FlinkKafkaProducer

Flink提供了kafka的对应sink,FlinkKafkaProducer010,下面看看对应实现。

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
    private static final long serialVersionUID = 1L;
    private boolean writeTimestampToKafka;

    public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)customPartitioner);
    }

    public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
        this(topicId, (KeyedSerializationSchema)serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema)serializationSchema, producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
        super(topicId, serializationSchema, producerConfig, customPartitioner);
        this.writeTimestampToKafka = false;
    }

    public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
        this.writeTimestampToKafka = writeTimestampToKafka;
    }
    
    ......

}

基本上都是构造函数,而追到上层,可以看到最终还是实现了RichSinkFunction:

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {}

From Kafka To Kafka

下面的示例从kafka中获取到数据,然后写入到另一个kakfa topic中,当然中间可以做其他处理,这里为了求简单,省掉这一步,代码如下:

import java.util.Properties;

public class KafkaToKakfaJob {

    public static void main(String[] args) throws Exception {

        // 以kafka作为datasource
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 以event time 为准
        // event time: 数据在源头的发生时间,跟flink无关,数据产生时就已经确定过了
        // processing time : 数据在flink中开始被处理的时间,跟flink有关
        // ingestion time : 数据到达flink集群中的时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 构造kafka 及 zk的链接服务器时间
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");// 始终消费最新的数据
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<String> dataStreamSource =
                env.addSource(new FlinkKafkaConsumer010<String>(
                        "testjin", // topic
                        new SimpleStringSchema(),
                        properties
                )).setParallelism(1)
//                .map(string -> JSON.parseObject(string,UrlInfo.class))
                ;

        // 写入kafka,可以通过kafka-topic.sh查看是否新增了一个 dest_testjin的topic
        dataStreamSource.addSink(new FlinkKafkaProducer010<String>(
                "localhost:9092",
                "dest_testjin",//dest topic,会重新创建一个
                new SimpleStringSchema()
        )).setParallelism(1).name("add to kafka dest topic");

        env.execute("execute from kafka to kafka");

可以看到,这里直接addSink(new FlinkKafkaProducer010),构造函数中传入kafka的broker、topic,以及序列化方式(继续采用最简单的SimpleStringSchema),执行后可以验证,在对应的kafka broker中可以看到新增加的topic dest_testjin.