欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast & Accumulators & Counters &Distributed Cache

程序员文章站 2022-07-14 13:31:31
...

1.13.Flink 支持的DataType和序列化
1.13.1.Flink支持的DataType
1.13.2.Flink的序列化
1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache
1.14.1.DataStreaming中的Broadcast
1.14.2.Flink Broadcast(广播变量)
1.14.3.Flink Accumulators & Counters
1.14.4.Flink Broadcast和Accumulators的区别
1.14.5.Flink Distributed Cache(分布式缓存)

1.13.Flink 支持的DataType和序列化

1.13.1.Flink支持的DataType

Java Tuple 和 Scala case class
Java POJOs:java实体类
Primitive Types
默认支持java和scala基本类型
General Class Types
默认支持大多数java和scala class
Hadoop Writables
支持hadoop中实现了org.apache.hadoop.Writable的数据类型。
Special Types
例如scala中的Either Option和Try

1.13.2.Flink的序列化

Flink自带了针对诸如int,long,String等标准类型的序列化器
针对Flink无法实现序列化的数据类型,我们可以交给Avro和Kryo
使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache

1.14.1.DataStreaming中的Broadcast

把元素广播给所有的分区,数据会被重复处理
一、类似于storm中的allGrouping
二、dataStream.broadcast()

1.14.2.Flink Broadcast(广播变量)

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

用法:
1:初始化数据
DataSet toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
.withBroadcastSet(toBroadcast,”broadcastSetName”)
3:获取数据
Collection broadcastSet = getRuntimeContext().getBroadcastVariable(“broadcastSetName”);
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * broadcast广播变量
 *
 * 需求:
 * flink会从数据源中获取到用户的姓名
 *
 * 最终需要把用户的姓名和年龄信息打印出来
 *
 * 分析:
 * 所以就需要在中间的map处理的时候获取用户的年龄信息
 *
 * 建议吧用户的关系数据集使用广播变量进行处理
 *
 * 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量
 * Created by xxx.xxx on 2018/10/8
 */
public class BatchDemoBroadcast {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:准备需要广播的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zs",18));
        broadData.add(new Tuple2<>("ls",20));
        broadData.add(new Tuple2<>("ww",17));
        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
        
        //1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });

        //源数据
        DataSource<String> data = env.fromElements("zs", "ls", "ww");

        //注意:在这里需要使用到RichMapFunction获取广播变量
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 这个方法只会执行一次
             * 可以在这里实现一些初始化的功能
             *
             * 所以,就可以在open方法中获取广播变量数据
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:获取广播数据
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }
            }

            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作
        
        result.print();
    }

}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ListBuffer

/**
  * broadcast 广播变量
  * Created by xxxx on 2020/10/09 on 2018/10/30.
  */
object BatchDemoBroadcastScala {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    //1: 准备需要广播的数据
    val broadData = ListBuffer[Tuple2[String,Int]]()
    broadData.append(("zs",18))
    broadData.append(("ls",20))
    broadData.append(("ww",17))

    //1.1处理需要广播的数据
    val tupleData = env.fromCollection(broadData)
    val toBroadcastData = tupleData.map(tup=>{
      Map(tup._1->tup._2)
    })


    val text = env.fromElements("zs","ls","ww")

    val result = text.map(new RichMapFunction[String,String] {
      var listData: java.util.List[Map[String,Int]] = null
      var allMap  = Map[String,Int]()

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
        val it = listData.iterator()
        while (it.hasNext){
          val next = it.next()
          allMap = allMap.++(next)
        }
      }

      override def map(value: String) = {
        val age = allMap.get(value).get
        value+","+age
      }
    }).withBroadcastSet(toBroadcastData,"broadcastMapName")
    
  result.print()
  }

}

1.14.3.Flink Accumulators & Counters

Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现
IntCounter, LongCounter 和 DoubleCounter
用法:
1:创建累加器
private IntCounter numLines = new IntCounter();

2:注册累加器
getRuntimeContext().addAccummulator(“num-lines”,this.numLines);

3:使用累加器
this.numLines.add(1);

4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult(“num-lines”)

案例:

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

/**
 * 全局累加器
 *
 * counter 计数器
 *
 * 需求:
 * 计算map函数中处理了多少数据
 *
 * 注意:只有在任务执行结束后,才能获取到累加器的值
 *
 * Created by xxx.xxx on 2018/10/8.
 */
public class BatchDemoCounter {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("a", "b", "c", "d", "e");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            //1:创建累加器
            private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:注册累加器
                getRuntimeContext().addAccumulator("num-lines",this.numLines);
            }

            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
                //sum++;
                //System.out.println("sum:"+sum);
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);

        //result.print();

        result.writeAsText("d:\\data\\count10");

        JobExecutionResult jobResult = env.execute("counter");
        //3:获取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);
    }

}
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * counter 累加器
  * Created by xxxx on 2020/10/09 on 2018/10/30.
  */
object BatchDemoCounterScala {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val data = env.fromElements("a","b","c","d")

    val res = data.map(new RichMapFunction[String,String] {
      //1:定义累加器
      val numLines = new IntCounter

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        //2:注册累加器
        getRuntimeContext.addAccumulator("num-lines",this.numLines)
      }

      override def map(value: String) = {
        this.numLines.add(1)
        value
      }

    }).setParallelism(4)


    res.writeAsText("d:\\data\\count21")
    val jobResult = env.execute("BatchDemoCounterScala")
    //3:获取累加器
    val num = jobResult.getAccumulatorResult[Int]("num-lines")
    println("num:"+num)

  }

}

1.14.4.Flink Broadcast和Accumulators的区别

Broadcast(广播变量)允许程序将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改。
Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

1.14.5.Flink Distributed Cache(分布式缓存)

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件
此缓存的工作机制如下:程序注册一个文件或者目录(本地或远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名字。当程序执行,Flink自动将文件或目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
用户:
1:注册一个文件
env.registerCachedFile(“hdfs:///path/to/your/file”, “hdfsFile”)
2、访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile(“hdfsFile”);

案例:

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
 * Distributed Cache
 *
 * Created by xxxx on 2020/10/09 .
 */
public class BatchDemoDisCache {

    public static void main(String[] args) throws Exception{
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:注册一个文件,可以使用hdfs或者s3上的文件
        env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用文件
                File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
                List<String> lines = FileUtils.readLines(myFile);
                for (String line : lines) {
                    this.dataList.add(line);
                    System.out.println("line:" + line);
                }
            }

            @Override
            public String map(String value) throws Exception {
                //在这里就可以使用dataList
                return value;
            }
        });

        result.print();
    }
}
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * Distributed Cache
  * Created by xxxx on 2020/10/09
  */
object BatchDemoDisCacheScala {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    
    //1:注册文件
    env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")

    val data = env.fromElements("a","b","c","d")

    val result = data.map(new RichMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")
        val lines = FileUtils.readLines(myFile)
        val it = lines.iterator()
        while (it.hasNext){
          val line = it.next();
          println("line:"+line)
        }
      }
      
      override def map(value: String) = {
        value
      }
      
    })

    result.print()
  }
}
相关标签: # Flink