欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Kafka BroadCast

程序员文章站 2022-07-14 13:46:04
...

需求:在统计数据时,需要按照一定的规则将数据丢弃,规则是变化的。

方案:选择了使用kafka来完成Flink的全局广播。

未解决问题:当规则改变后,之前所统计的历史数据不可用。

代码:

  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
        Kafka011JsonTableSource orderSource = new OrderStreamKafkaReader(Constants.ORDER_TOPIC).getTableSource("crm_stream");
        tableEnv.registerTableSource("orderDetail", orderSource);
        Table orderDetailTable = tableEnv.sqlQuery("SELECT * FROM orderDetail ");
        //getItemTable 字段转换方法
        Table itemIdTable = new TestBroadCast().getItemTable(orderDetailTable);
        DataStream<OrderDetailInfo4Stat> dataStream = tableEnv.toAppendStream(itemIdTable, OrderDetailInfo4Stat.class);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Constants.KAFKA_BROKER);
        properties.setProperty("group.id", "crm_stream");
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), properties));
        //申请内存空间
        MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class);
        BroadcastStream<String> bdPatterns = stream.broadcast(ruleStateDescriptor);
        dataStream.connect(bdPatterns).process(new BroadcastProcessFunction<OrderDetailInfo4Stat, String, OrderDetailInfo4Stat>() {
            private static final long serialVersionUID = 2671485873668322356L;

            @Override
            public void processElement(OrderDetailInfo4Stat value, ReadOnlyContext ctx, Collector<OrderDetailInfo4Stat> out) throws Exception {
                //非广播元素
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class));
                String channelType = broadcastState.get("orderEnvChannelType");
                if (value.getOrderEnvironmentChannelType().equals(Integer.valueOf(channelType))) {
                    //通过自定义到的规则发送数据
                    out.collect(value);
                }
            }

            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<OrderDetailInfo4Stat> out) throws Exception {
                //广播元素
                //获取申请内存空间后的map
                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(new MapStateDescriptor<String, String>("RulesBroadcastState", String.class, String.class));
                //通过广播流为map赋值
                broadcastState.put("orderEnvChannelType", value);
            }
        }).print();
        env.execute("demo broadCast");
    }

在启动时需要先输入广播流,或者在processElement()方法中判断广播规则是否为null。

努力吧,皮卡丘。