欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark核心编程-共享变量

程序员文章站 2022-07-15 18:29:02
...

Spark一个非常重要的特性就是共享变量。

默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。

一、广播变量

广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。

Spark应用程序作业的执行由一系列调度阶段构成,而这些调度阶段通过Shuffle进行分隔。Spark能够在每个调度阶段自动广播任务所需通用的数据,这些数据在广播时需进行序列化缓存,并在任务运行前需进行反序列化。这就意味着当多个调度阶段的任务需要相同的数据,显示地创建广播变量才有用。

可以通过调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取该变量value的方法进行访问。

public class BroadcastVariable {

  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("BroadcastVariable").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    int factor = 3;
    final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);

    List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> numbers = sc.parallelize(numberList);

    // 让集合中的每个数字,都乘以外部定义的那个factor
    JavaRDD<Integer> multipleNumbers = numbers.map(new Function<Integer, Integer>() {
      private static final long serialVersionUID = 1L;

      public Integer call(Integer v1) throws Exception {
        // 使用共享变量时,调用其value()方法,即可获取其内部封装的值
        int factor = factorBroadcast.value();
        return v1 * factor;
      }
    });

    multipleNumbers.foreach(new VoidFunction<Integer>() {
      private static final long serialVersionUID = 1L;

      public void call(Integer t) throws Exception {
        System.out.println(t);
      }
    });

    sc.close();
  }
}

当广播变量创建后,在集群中所有函数将以变量v代表广播变量,并且该变量v一次性分发到各节点中。另外,为了确保所有的节点获得相同的变量,对象v广播后只读不能够被修改。

Spark核心编程-共享变量

二、累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对一个变量并行操作的功能。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

public class AccumulatorVariable {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Accumulator").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    final Accumulator<Integer> sum = sc.accumulator(0);

    List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> numbers = sc.parallelize(numberList);

    numbers.foreach(new VoidFunction<Integer>() {
      private static final long serialVersionUID = 1L;

      public void call(Integer t) throws Exception {
        // 然后在函数内部,就可以对Accumulator变量,调用add()方法,累加值
        sum.add(t);
      }

    });

    // 在driver程序中,可以调用Accumulator的value()方法,获取其值
    System.out.println(sum.value());
    sc.close();
  }
}

累加器只能由Spark内部进行更新,并保证每个任务在累加器的更新操作仅执行一次,也就是说重启任务也不应该更新。在转换操作中,用户必须意识到任务和作业的调度过程重新执行会造成累加器的多次更新。

累加器同样具有Spark懒加载的求值模型。如果它们在RDD的操作中进行更新,它们的值只在RDD进行行动操作时才进行更新。