欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

一种动态更新flink任务配置的方法

程序员文章站 2022-06-01 16:31:35
...

1 原理

参考Flink/Spark 如何实现动态更新作业配置,讲得比较详细,这篇的文章的参考参考文献也可阅读一下。flink任务配置动态更新的实现方法,可通过添加一个控制流,将数据流与控制流连接后,再读取控制流中的消息来更新数据流处理逻辑中的参数,这样即实现了数据流处理逻辑中配置参数动态更新的方法,无需重启任务或修改代码。

2 例,整数过滤

本例中,控制流发送的整数作为除数,如果数据流中的整数除以控制流中的整数余数为0,则将数据流中的数据向下游发送。

2.1 并行度为1

先设置并行为1,测试效果。定义2个数据源,一个dataStream产生数据流,其中数据为0到19,一个configStream产生控制流,向流中发送一个整数2。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> configStream = env.addSource(new SourceFunction<Integer>() {
   private static final long serialVersionUID = -4529394264795596001L;

   private boolean running = true;

   @Override
   public void run(SourceContext<Integer> sourceContext) throws Exception {
       sourceContext.collect(2);
   }

   @Override
   public void cancel() {
       running = false;
   }
});
DataStream<Integer> dataStream = env.addSource(new SourceFunction<Integer>() {

   private static final long serialVersionUID = -1885959149409672550L;
   private boolean running = true;

   @Override
   public void run(SourceContext<Integer> sourceContext) throws Exception {
       for (int i = 0; running && i < 20; i += 1) {
           sourceContext.collect(i);
       }
   }

   @Override
   public void cancel() {
       running = false;
   }
});

连接数据流与控制流

ConnectedStreams connectedStreams = dataStream.connect(configStream);

处理连接后的流,如下代码所示,先定义除数为1,processElement2函数处理的控制流发来的数据,接收到后更新除数的值;processElement1处理的数据流发来的数据,如果数据除以除数的余数为0,则通过collector发出去。

DataStream<Integer> resultStream = connectedStreams.process(new CoProcessFunction<Integer, Integer, Integer>() {
    private static final long serialVersionUID = 8870659158532269705L;

    private Integer divisor = 1;

    @Override
    public void processElement1(Integer o, Context context, Collector<Integer> collector) throws Exception {
        log.info("[Data] Received {}, divisor {}", o, divisor);
        if (o % divisor == 0) {
            collector.collect(o);
        }
    }

    @Override
    public void processElement2(Integer o, Context context, Collector<Integer> collector) throws Exception {
        log.info("[Divisor] Received {}", o);
        divisor = o;
    }
});
resultStream.print();
env.execute();

主要输出如下:

[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 0, divisor 1
0
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Received 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 1, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 2, divisor 2
2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 3, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 4, divisor 2
4
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 5, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 6, divisor 2
6
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 7, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 8, divisor 2
8
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 9, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 10, divisor 2
10
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 11, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 12, divisor 2
12
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 13, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 14, divisor 2
14
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 15, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 16, divisor 2
16
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 17, divisor 2
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 18, divisor 2
18
[Co-Process -> Sink: Print to Std. Out (1/1)] INFO test.flink.streaming.connectstream.Test2  - [Data] Received 19, divisor 2

可见,数据流中的0到达时,除数仍为1,0被发出并打印出来。然后接收到控制流的2,除数被更新为2,后面数据流的数据只有偶数被打印出来。这已经实现了除数由1变为2的动态更新。

2.2 并行度大于1

注释env.setParallelism(1);,flink默认设置的并行度与处理器核数相同,在上面CoProcessFunction匿名类的2个处理数据方法中打印相关信息的同时打印出线程id。输出结果如下:

[Co-Process -> Sink: Print to Std. Out (2/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 81 received 4, divisor 1
[Co-Process -> Sink: Print to Std. Out (4/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 83 received 6, divisor 1
[Co-Process -> Sink: Print to Std. Out (8/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 87 received 10, divisor 1
4> 6
[Co-Process -> Sink: Print to Std. Out (10/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 89 received 0, divisor 1
8> 10
2> 4
[Co-Process -> Sink: Print to Std. Out (12/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 91 received 2, divisor 1
10> 0
[Co-Process -> Sink: Print to Std. Out (7/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 86 received 9, divisor 1
7> 9
12> 2
[Co-Process -> Sink: Print to Std. Out (3/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 82 received 5, divisor 1
3> 5
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 76 received 3, divisor 1
1> 3
[Co-Process -> Sink: Print to Std. Out (6/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 85 received 8, divisor 1
6> 8
[Co-Process -> Sink: Print to Std. Out (9/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 88 received 11, divisor 1
9> 11
[Co-Process -> Sink: Print to Std. Out (5/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 84 received 7, divisor 1
5> 7
[Co-Process -> Sink: Print to Std. Out (11/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 90 received 1, divisor 1
11> 1
[Co-Process -> Sink: Print to Std. Out (5/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 84 received 19, divisor 1
5> 19
[Co-Process -> Sink: Print to Std. Out (10/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 89 received 12, divisor 1
10> 12
[Co-Process -> Sink: Print to Std. Out (3/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 82 received 17, divisor 1
3> 17
[Co-Process -> Sink: Print to Std. Out (4/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 83 received 18, divisor 1
4> 18
[Co-Process -> Sink: Print to Std. Out (12/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 91 received 14, divisor 1
12> 14
[Co-Process -> Sink: Print to Std. Out (11/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 90 received 13, divisor 1
11> 13
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 76, received 2
[Co-Process -> Sink: Print to Std. Out (2/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 81 received 16, divisor 1
2> 16
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 76 received 15, divisor 2

只有id为76的线程收到了控制流中的2,并过滤了数据流中的数据15,而其他线程中的除数一直为1,没有过滤数据。

因此,应将控制流的数据进行广播,将控制流数据源定义代码修改为如下内容:

DataStream<Integer> configStream = env.addSource(new SourceFunction<Integer>() {
   private static final long serialVersionUID = -4529394264795596001L;

   private boolean running = true;

   @Override
   public void run(SourceContext<Integer> sourceContext) throws Exception {
       sourceContext.collect(2);
   }

   @Override
   public void cancel() {
       running = false;
   }
}).broadcast();

再次运行后打印的信息如下:

[Co-Process -> Sink: Print to Std. Out (9/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 88 received 0, divisor 1
[Co-Process -> Sink: Print to Std. Out (2/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 77, received 2
[Co-Process -> Sink: Print to Std. Out (4/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 79, received 2
9> 0
[Co-Process -> Sink: Print to Std. Out (7/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 86 received 10, divisor 1
7> 10
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 76 received 4, divisor 1
1> 4
[Co-Process -> Sink: Print to Std. Out (7/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 86, received 2
[Co-Process -> Sink: Print to Std. Out (12/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 91 received 3, divisor 1
12> 3
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 76, received 2
[Co-Process -> Sink: Print to Std. Out (5/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 84 received 8, divisor 1
5> 8
[Co-Process -> Sink: Print to Std. Out (1/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 76 received 16, divisor 2
1> 16
[Co-Process -> Sink: Print to Std. Out (12/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 91, received 2
[Co-Process -> Sink: Print to Std. Out (9/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 88, received 2
[Co-Process -> Sink: Print to Std. Out (12/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 91 received 15, divisor 2
[Co-Process -> Sink: Print to Std. Out (2/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 77 received 5, divisor 2
[Co-Process -> Sink: Print to Std. Out (4/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 79 received 7, divisor 2
[Co-Process -> Sink: Print to Std. Out (6/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 85 received 9, divisor 1
6> 9
[Co-Process -> Sink: Print to Std. Out (11/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 90 received 2, divisor 1
11> 2
[Co-Process -> Sink: Print to Std. Out (10/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 89 received 1, divisor 1
10> 1
[Co-Process -> Sink: Print to Std. Out (8/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 87 received 11, divisor 1
8> 11
[Co-Process -> Sink: Print to Std. Out (11/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 90, received 2
[Co-Process -> Sink: Print to Std. Out (10/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 89, received 2
[Co-Process -> Sink: Print to Std. Out (3/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 78, received 2
[Co-Process -> Sink: Print to Std. Out (10/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 89 received 13, divisor 2
[Co-Process -> Sink: Print to Std. Out (8/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 87, received 2
[Co-Process -> Sink: Print to Std. Out (11/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 90 received 14, divisor 2
11> 14
[Co-Process -> Sink: Print to Std. Out (2/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 77 received 17, divisor 2
[Co-Process -> Sink: Print to Std. Out (6/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 85, received 2
[Co-Process -> Sink: Print to Std. Out (9/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 88 received 12, divisor 2
9> 12
[Co-Process -> Sink: Print to Std. Out (5/12)] INFO test.flink.streaming.connectstream.Test2  - [Divisor] Thread 84, received 2
[Co-Process -> Sink: Print to Std. Out (4/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 79 received 19, divisor 2
[Co-Process -> Sink: Print to Std. Out (3/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 78 received 6, divisor 2
3> 6
[Co-Process -> Sink: Print to Std. Out (3/12)] INFO test.flink.streaming.connectstream.Test2  - [Data] Thead 78 received 18, divisor 2
3> 18

可见,每个线程都收到了控制流的数据2,如,id为91的线程,收到数据3时除数还是1,3被发出并打印了,接收到控制流中的除数2后,再接收到数据7时,就将奇数7过滤了。

2.3 完整代码

package test.flink.streaming.connectstream;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

/**
* ClassName: Test2
* Description:
*
* @author 0x3E6
* @version 1.0.0
* @date 2020/4/25 9:57
*/
@Slf4j
public class Test2 {

   public static void main(String[] args) throws Exception {
       Logger root = Logger.getRootLogger();
       root.setLevel(Level.INFO);
       root.addAppender(new ConsoleAppender(new PatternLayout("[%t] %p %c %x - %m%n")));
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
       log.info("Parallelism {}", env.getParallelism());
       DataStream<Integer> configStream = env.addSource(new SourceFunction<Integer>() {
           private static final long serialVersionUID = -4529394264795596001L;

           private boolean running = true;

           @Override
           public void run(SourceContext<Integer> sourceContext) throws Exception {
               sourceContext.collect(2);
           }

           @Override
           public void cancel() {
               running = false;
           }
       }).broadcast();
       DataStream<Integer> dataStream = env.addSource(new SourceFunction<Integer>() {

           private static final long serialVersionUID = -1885959149409672550L;
           private boolean running = true;

           @Override
           public void run(SourceContext<Integer> sourceContext) throws Exception {
               for (int i = 0; running && i < 20; i += 1) {
                   sourceContext.collect(i);
               }
           }

           @Override
           public void cancel() {
               running = false;
           }
       });
       ConnectedStreams connectedStreams = dataStream.connect(configStream);
       DataStream<Integer> resultStream = connectedStreams.process(new CoProcessFunction<Integer, Integer, Integer>() {
           private static final long serialVersionUID = 8870659158532269705L;

           private Integer divisor = 1;

           @Override
           public void processElement1(Integer o, Context context, Collector<Integer> collector) throws Exception {
               log.info("[Data] Thead {} received {}, divisor {}", Thread.currentThread().getId(), o, divisor);
               if (o % divisor == 0) {
                   collector.collect(o);
               }
           }

           @Override
           public void processElement2(Integer o, Context context, Collector<Integer> collector) throws Exception {
               log.info("[Divisor] Thread {}, received {}", Thread.currentThread().getId(), o);
               divisor = o;
           }
       });
       resultStream.print();
       env.execute();
   }
}

参考链接