欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink写入Kafka

程序员文章站 2022-07-14 12:27:38
...

Flink写入Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class DataToKafla {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "kafka-test");
        //这里直接把本地文本写入kafka.
        DataStreamSource<String> data = env.readTextFile("datas/1.txt");
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("kafka-test", new SimpleStringSchema(), props);
        data.addSink(producer);
        env.execute();
    }
}

上一篇: java注解原理

下一篇: 队列实现