欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink学习笔记(六):flink数据广播

程序员文章站 2022-05-18 20:25:51
...

一、需求场景

在很多情况下,对数据流处理需要动态更新一些参数,但是但因为该参数是在算子中作为一个变量,一旦flink作业启动,想修改关键字不得不停掉作业,然后再重新启动作业,实时性和便利性都比较差。

记得公司一个项目需要动态配置校验参数,检测日志中的该参数是否存在,频繁的重启flink作业,最后把服务器都搞挂了……,就差拉去祭天了T_T,还好是测试环境,逃过一劫

那么有没有可以动态修改算子参数的呢?如何进行动态修改算子的参数呢?

flink为我们提供了一个广播模式来解决这个问题。

广播流的流数据能够被算子的所有分区所处理,而数据流的流数据只能够被算子的某一分区处理。

广播流的特点也决定适合做配置的动态更新

二、实现代码

通过网络获取一个ObjectA对象的数据结构
通过kafka获取一个ObjectB对象的datastream
然后把通过广播连接两个数据流,在BroadcastProcessFunction进行数据处理
最后将处理好的数据通过sink invoke到指定的容器中。

定义广播流

private static BroadcastStream broadcastConfig = null;

获取广播流

//获取广播流
DataStream<ObjectA> stream = env.addSource(new RichSourceFunction<ObjectA>() {
    @Override
    public void run(SourceContext<ObjectA> sourceContext) throws Exception {
        //从服务端获取数据,比如http get获取
    }

    @Override
    public void cancel() {

    }
});

广播

final MapStateDescriptor<String, ObjectA> broadcastStateDescriptor = new MapStateDescriptor<>(
        "objectA",
        String.class,
        ObjectA.class);
broadcastConfig = stream.broadcast(broadcastStateDescriptor);

数据流连接广播流,dataStream参考前文从kafka获取

DataStream<ObjectB> broadcastDataStream = dataStream
        .connect(broadcastConfig)
        .process(new BroadcastProcessFunction<ObjectB, ObjectA, ObjectB>() {
            //拦截的关键字
            private ObjectA oa = null;
            @Override
            public void processElement(ObjectB objectB, ReadOnlyContext readOnlyContext, Collector<ObjectB> collector) throws Exception {
				//TODO 处理 ObjectA 和 ObjectB
				collector.collect(objectB);
            }

            @Override
            public void processBroadcastElement(ObjectA objectA, Context context, Collector<ObjectB> collector) throws Exception {
				oa = objectA;
            }
        });

数据处理

broadcastDataStream.addSink(new RichSinkFunction<ObjectB>() {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void invoke(ObjectB objectB, Context context) throws Exception {

    }
});
相关标签: 大数据处理