欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark—键值对操作

程序员文章站 2022-07-13 22:10:30
...

1.JavaPairRDD背景
键值对 RDD 通常用来进行聚合计算。先通过一些初始 ETL(抽取、转 化、装载)操作来将数据转化为键值对形式。键值对 RDD 提供了一些新的操作接口

让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区。
使用可控的分区方式把常被一起访问的数据放到同一个节点上,可以大大减少应用的通信 开销。这会带来明显的性能提升。

Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。

通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。

2.基本操作

  • filter:对pair RDD的元素按照某种规则进行过滤,去掉不需要的pair RDD
  • reduceByKey:对pairRDD中的元素中所有相同key对应的元素进行统计,下面是单词个数统计的例子
rdd.flatMap(line -> Arrays.asList(line.split("")).iterator())
    .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
    .reduceByKey((s1, s2) -> s1 + s2)// 将相同的key进行reduce,并将value相加
  • combineByKey:groupByKey、reduceByKey都是由combineByKey实现的。该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。此函数有三个参数必传:
    createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C
    mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C
    mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C
    numPartitions:结果RDD分区数,不传则默认保持原有的分区数
    partitioner:分区函数,不传则默认为HashPartitioner
    mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,不传则默认为true

combineByKey()的实现是一边进行aggregate,一边进行compute() 的基础操作。假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner将第一个 record 的 value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,如果想对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算;然后依据partitioner进行不同分区合并。

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转化为pairRDD
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(
                            new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {  
      return new Tuple2<Integer, Integer>(integer,1);   
  }
});

JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(
  new Function<Integer, String>() {    
    @Override    
    public String call(Integer v1) throws Exception {  
      return v1 + " :createCombiner: ";    
  }}, 
  new Function2<String, Integer, String>() {    
    @Override    
    public String call(String v1, Integer v2) throws Exception {        
      return v1 + " :mergeValue: " + v2;    
  }}, 
  new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + " :mergeCombiners: " + v2;    
  }});
  • 设置分区个数:每个RDD都有固定数目的分区,在建立RDD时,如果不指定分区个数的话,系统会根据集群大小推断出一个有意义的默认值。

3.数据分组

  • groupByKey():对pair RDD中相同key对应的元素进行合并,形成一个元素列表作为value,key值保持不变。

4.数据连接

  • join:对于2个pair RDD中都含有的key,设其value为v1,v2,返回一个pair RDD,其中的元素为(key, (v1,v2))。
  • leftOuterJoin与rightOuterJoin:允许key值有缺失的连接,分别允许右边与左边的值有缺失,对应为None

5.数据排序

  • sortByKey:按照key进行升序或者降序排列,可以指定比较key时采用的函数,即不是比较key1与key2,而是比较f(key1)与f(key2),但是f(key)的返回值类型需要与key相同。

6.pair RDD的行动操作

  • countByKey:对每个key对应的元素个数进行计数
  • collectAsMap:将结果以映射表的形式返回,便于查询
  • lookup:返回给定键对应的所有值,为list的形式