flink实时产生的数据流式写入到kafka中
程序员文章站
2022-07-14 12:27:20
...
flink实时产生的数据流式写入到kafka中
import cn.itcast.day03.source.custom.MyNoParallelSource;
import cn.itcast.day03.source.custom.Order;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
/**
* 实时产生的数据流式写入到kafka中
*/
public class StreamKafkaDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink流处理的运行环境
* 2)设置并行度为1
* 3)添加自定义数据源,读取数据
* 4)对读取到的数据进行转换成字符串
* 5)将转换后的字符串实时写入到kafka集群
* 6)递交作业
*/
//TODO 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)设置并行度为1
env.setParallelism(1);
//TODO 3)添加自定义数据源,读取数据
DataStreamSource<Order> orderDataStreamSource = env.addSource(new MyNoParallelSource());
//TODO 4)对读取到的数据进行转换成字符串
SingleOutputStreamOperator<String> outputStreamOperator = orderDataStreamSource.map(new MapFunction<Order, String>() {
@Override
public String map(Order order) throws Exception {
return order.toString();
}
});
//构建kafka的连接参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node1:9092");
//实例化kafka生产者对象
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("test",
new MyKafkaSerializationSchema(),
properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
//TODO 5)将转换后的字符串实时写入到kafka集群
outputStreamOperator.addSink(kafkaProducer);
//TODO 6)递交作业
env.execute();
}
/**
* 自定义构建kafka的序列化对象
*/
public static class MyKafkaSerializationSchema implements KafkaSerializationSchema<String>{
//指定写入的topic名称
String topicName = "test";
/**
* 序列化方法,存储数据的时候需要对数据进行序列化
* @param element
* @param aLong
* @return
*/
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long aLong) {
return new ProducerRecord<byte[], byte[]>(topicName, element.getBytes());
}
}
}
上一篇: 快速开始一个springboot项目
下一篇: 在Tomcat中创建一个项目