Flink Kafka BroadCast
程序员文章站
2022-07-14 13:46:04
...
需求:在统计数据时,需要按照一定的规则将数据丢弃,规则是变化的。
方案:选择了使用kafka来完成Flink的全局广播。
未解决问题:当规则改变后,之前所统计的历史数据不可用。
代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
Kafka011JsonTableSource orderSource = new OrderStreamKafkaReader(Constants.ORDER_TOPIC).getTableSource("crm_stream");
tableEnv.registerTableSource("orderDetail", orderSource);
Table orderDetailTable = tableEnv.sqlQuery("SELECT * FROM orderDetail ");
//getItemTable 字段转换方法
Table itemIdTable = new TestBroadCast().getItemTable(orderDetailTable);
DataStream<OrderDetailInfo4Stat> dataStream = tableEnv.toAppendStream(itemIdTable, OrderDetailInfo4Stat.class);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", Constants.KAFKA_BROKER);
properties.setProperty("group.id", "crm_stream");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), properties));
//申请内存空间
MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class);
BroadcastStream<String> bdPatterns = stream.broadcast(ruleStateDescriptor);
dataStream.connect(bdPatterns).process(new BroadcastProcessFunction<OrderDetailInfo4Stat, String, OrderDetailInfo4Stat>() {
private static final long serialVersionUID = 2671485873668322356L;
@Override
public void processElement(OrderDetailInfo4Stat value, ReadOnlyContext ctx, Collector<OrderDetailInfo4Stat> out) throws Exception {
//非广播元素
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class));
String channelType = broadcastState.get("orderEnvChannelType");
if (value.getOrderEnvironmentChannelType().equals(Integer.valueOf(channelType))) {
//通过自定义到的规则发送数据
out.collect(value);
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<OrderDetailInfo4Stat> out) throws Exception {
//广播元素
//获取申请内存空间后的map
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class));
//通过广播流为map赋值
broadcastState.put("orderEnvChannelType", value);
}
}).print();
env.execute("demo broadCast");
}
在启动时需要先输入广播流,或者在processElement()方法中判断广播规则是否为null。
努力吧,皮卡丘。