欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink实时产生的数据流式写入到kafka中

程序员文章站 2022-07-14 12:27:20
...

flink实时产生的数据流式写入到kafka中

import cn.itcast.day03.source.custom.MyNoParallelSource;
import cn.itcast.day03.source.custom.Order;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;

/**
 * 实时产生的数据流式写入到kafka中
 */
public class StreamKafkaDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)初始化flink流处理的运行环境
         * 2)设置并行度为1
         * 3)添加自定义数据源,读取数据
         * 4)对读取到的数据进行转换成字符串
         * 5)将转换后的字符串实时写入到kafka集群
         * 6)递交作业
         */

        //TODO 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2)设置并行度为1
        env.setParallelism(1);

        //TODO 3)添加自定义数据源,读取数据
        DataStreamSource<Order> orderDataStreamSource = env.addSource(new MyNoParallelSource());

        //TODO 4)对读取到的数据进行转换成字符串
        SingleOutputStreamOperator<String> outputStreamOperator = orderDataStreamSource.map(new MapFunction<Order, String>() {
            @Override
            public String map(Order order) throws Exception {
                return order.toString();
            }
        });

        //构建kafka的连接参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node1:9092");

        //实例化kafka生产者对象
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("test",
                new MyKafkaSerializationSchema(),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        //TODO 5)将转换后的字符串实时写入到kafka集群
        outputStreamOperator.addSink(kafkaProducer);

        //TODO 6)递交作业
        env.execute();
    }

    /**
     * 自定义构建kafka的序列化对象
     */
    public static class MyKafkaSerializationSchema implements KafkaSerializationSchema<String>{
        //指定写入的topic名称
        String topicName = "test";
        /**
         * 序列化方法,存储数据的时候需要对数据进行序列化
         * @param element
         * @param aLong
         * @return
         */
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long aLong) {
            return new ProducerRecord<byte[], byte[]>(topicName, element.getBytes());
        }
    }
}