欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Broadcast

程序员文章站 2022-07-14 13:31:43
...

Streaming Broadcast (元素广播)

  • 把元素广播给所有的分区,数据会被重复处理,类似于storm中的allGrouping
  • 使用技巧:dataStream.broadcast()
  //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //获取数据源     注意:针对此source,并行度只能设置为1
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        //使用技巧:dataStream.broadcast()
        DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                long id = Thread.currentThread().getId();
                System.out.println("线程id:" + id + ",接收到数据:" + value);
                return value;
            }
        });

        //每2秒钟处理一次数据
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);

        //打印结果
        sum.print().setParallelism(1);

        String jobName = Demo5.class.getSimpleName();
        env.execute(jobName);

发现整个Map元素别处理了4次:

线程id:44,接收到数据:1
线程id:46,接收到数据:1
线程id:42,接收到数据:1
线程id:48,接收到数据:1
4

Flink Broadcast(广播变量)

  • 只能在批处理程序中使用
  • 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
    另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

  • 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。

  • 如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

   【Flink入门与实战】解释:广播变量允许编程人员在每台机器上保持一个只读的缓存变量,而不是传送变量的副本给Task(Stream Broad 感觉应该是传送变量的副本给Task,不知是否正确)。

用法:

  1:初始化数据
  DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
  2:广播数据
  .withBroadcastSet(toBroadcast, "broadcastSetName");
  3:获取数据
  Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

步骤:

第一步:封装DataSet,调用withBroadcastSet。
第二步:getRuntimeContext().getBroadcastVariable,获得广播变量
第三步:RichMapFunction中执行获得广播变量的逻辑
package com.steven.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @Description: Flink-Broadcast
 * @author: :  Steven
 * @Date: 2020/5/18 14:12
 */
public class Demo6 {
    public static void main(String[] args) throws Exception {
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:准备需要广播的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zs", 18));
        broadData.add(new Tuple2<>("ls", 20));
        broadData.add(new Tuple2<>("ww", 17));
        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);


        //1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });

        //源数据
        DataSource<String> data = env.fromElements("zs", "ls", "ww");

        //注意:在这里需要使用到RichMapFunction获取广播变量
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 这个方法只会执行一次
             * 可以在这里实现一些初始化的功能
             *
             * 所以,就可以在open方法中获取广播变量数据
             *
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:获取广播数据
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }

            }

            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作
        result.print();
    }
}

运行结果:

    zs,18
    ls,20
    ww,17

 

相关标签: Flink