Flink 写入Kafka多个topic
程序员文章站
2022-07-14 12:29:56
...
Flink 写入Kafka 到多个Topic 我想到的有两种方法
第一种:分流
利用split() 方法,不过现在已经被官方定位弃用的方法 ,给出的代替方案是side output,其实大致一样 都要多产生一个流。side output 在学习窗口算子的超时事件时应该能更清楚的了解(我当时是在这了解到的)。
这种方案的缺点就是每多一个topic 都要多产生一个流再添加一个addSink(),需要修改逻辑。而且还要,如果有100个呢?
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
...
第二种:重写序列化方法
具体的说明看注释 主要代码看serialize方法
package kafka.connection;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
public class KafkaSerializationTest<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
private final FlinkKafkaPartitioner<T> partitioner;
private final SerializationSchema<T> serializationSchema;
private String topic;
private boolean writeTimestamp;
private int[] partitions;
/**
*
* @param topic
* @param partitioner 定义写入分区规则 具体参考FlinkFixedPartitioner类
* @param writeTimestamp
* @param serializationSchema 定义序列化方法
*/
public KafkaSerializationTest(String topic, FlinkKafkaPartitioner<T> partitioner, boolean writeTimestamp, SerializationSchema<T> serializationSchema) {
this.partitioner = partitioner;
this.serializationSchema = serializationSchema;
this.topic = topic;
this.writeTimestamp = writeTimestamp;
}
public void open(InitializationContext context) throws Exception {
this.serializationSchema.open(context);
}
public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
byte[] serialized = this.serializationSchema.serialize(element);
Integer partition;
if (this.partitioner != null) {
//选择写入的分区 partitions在 setPartitions赋值
partition = this.partitioner.partition(element, (byte[])null, serialized, this.topic, this.partitions);
} else {
partition = null;
}
Long timestampToWrite;
if (this.writeTimestamp) {
timestampToWrite = timestamp;
} else {
timestampToWrite = null;
}
topic =this.topic;
/*
判断使用哪个Topic
根绝事件决定Topic, 提取事件中关键字,或者关键信息
也可以直接拿关键字当作Topic
*/
if(Integer.parseInt(element.toString().replaceAll("[^\\d]+",""))%2>0){
topic ="odd";
}
return new ProducerRecord(topic, partition, timestampToWrite, (Object)null, serialized);
}
//获取Topic提供给setPartitions,用于获取topic的可用分区
public String getTargetTopic(T element) {
return topic;
}
//获取可用分区
public void setPartitions(int[] partitions) {
this.partitions = partitions;
}
public void setWriteTimestamp(boolean writeTimestamp) {
this.writeTimestamp = writeTimestamp;
}
}
/**
* 使用
* new FlinkKafkaProducer<String>(topic_produce, new KafkaSerializationTest3(topic_produce,new FlinkFixedPartitioner(),true,new SerializationSchemaWrapperTest()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
*/
如果有问题或者有更好的方法请留言,我们可以成为对方的小巨人。