Flink Broadcast
程序员文章站
2022-07-14 13:31:43
...
Streaming Broadcast (元素广播)
- 把元素广播给所有的分区,数据会被重复处理,类似于storm中的allGrouping
- 使用技巧:dataStream.broadcast()
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//获取数据源 注意:针对此source,并行度只能设置为1
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
//使用技巧:dataStream.broadcast()
DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
long id = Thread.currentThread().getId();
System.out.println("线程id:" + id + ",接收到数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = Demo5.class.getSimpleName();
env.execute(jobName);
发现整个Map元素别处理了4次:
线程id:44,接收到数据:1
线程id:46,接收到数据:1
线程id:42,接收到数据:1
线程id:48,接收到数据:1
4
Flink Broadcast(广播变量)
- 只能在批处理程序中使用
-
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。 -
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
-
如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
【Flink入门与实战】解释:广播变量允许编程人员在每台机器上保持一个只读的缓存变量,而不是传送变量的副本给Task(Stream Broad 感觉应该是传送变量的副本给Task,不知是否正确)。
用法:
1:初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");
3:获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
步骤:
第一步:封装DataSet,调用withBroadcastSet。
第二步:getRuntimeContext().getBroadcastVariable,获得广播变量
第三步:RichMapFunction中执行获得广播变量的逻辑
package com.steven.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @Description: Flink-Broadcast
* @author: : Steven
* @Date: 2020/5/18 14:12
*/
public class Demo6 {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:准备需要广播的数据
ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zs", 18));
broadData.add(new Tuple2<>("ls", 20));
broadData.add(new Tuple2<>("ww", 17));
DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
//1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
HashMap<String, Integer> res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
//源数据
DataSource<String> data = env.fromElements("zs", "ls", "ww");
//注意:在这里需要使用到RichMapFunction获取广播变量
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap = new HashMap<String, Integer>();
/**
* 这个方法只会执行一次
* 可以在这里实现一些初始化的功能
*
* 所以,就可以在open方法中获取广播变量数据
*
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//3:获取广播数据
this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
@Override
public String map(String value) throws Exception {
Integer age = allMap.get(value);
return value + "," + age;
}
}).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作
result.print();
}
}
运行结果:
zs,18
ls,20
ww,17