欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ReduceByKey算子理解

程序员文章站 2022-06-01 18:14:54
...

前言

最近经常使用到reduceByKey这个算子,想着结合*ByKey算子和stage划分一起总结一下,所以沉下心来仔细过了一遍相关的博客和帖子,在此整体过一遍这个算子,那么我们开始:

国外的大牛一上来给出这么一句话,个人感觉高度概括了reduceByKey的功能:

Spark RDD reduceByKey function merges the values for each key using an associative reduce function.
中文意思是:
【Spark的RDD的reduceByKey是使用一个相关的函数来合并每个key的value的值的一个算子(那么主干就是reduceByKey是个算子/函数)】。

那么这就基本奠定了reduceByKey的作用域是key-value类型的键值对,并且是只对每个key的value进行处理,如果含有多个key的话,那么就对多个values进行处理。这里的函数是我们自己传入的,也就是说是可人为控制的【其实这是废话,人为控制不了这算子一点用没有】。那么举个例子:

scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),
     | ("a", 1), ("b", 1), ("b", 1),
     | ("b", 1), ("b", 1)), 3)

我们创建了一个Array的字符串,并把其存入Spark的集群上,设置了三个分区【这里我们不关注分区,只关注操作】。那么我们调用reduceByKey并且传入函数进行相应操作【本处我们对相同key的value进行相加操作,类似于统计单词出现次数】:

scala> val y = x.reduceByKey((pre, after) => (pre + after))

这里两个参数我们逻辑上让他分别代表同一个key的两个不同values,那么结果想必大家应该猜到了:

scala> y.collect
res2: Array[(String, Int)] = Array((a,3), (b,5))

嗯,想必到这里大家对reduceByKey有了初步的认识和体会。论坛中有一段写的也很有帮助,由于怕翻译过来误导大家,所以每次附上原话:

“Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation which means it is lazily evaluated. We need to pass one associative function as a parameter, which will be applied to the source RDD and will create anew RDD as with resulting values(i.e. key value pair). This operation is a wide operation as data shuffling may happen across the partitions.”

【本质上来讲,reduceByKey函数(说算子也可以)只作用于包含key-value的RDDS上,它是 transformation类型的算子,这也就意味着它是懒加载的(就是说不调用Action的方法,是不触发计算的),在使用时,我们需要传递一个相关的函数作为参数,这个函数将会被应用到源RDD上并且创建一个新的
RDD作为返回结果,这个算子作为data Shuffling(出现reduceByKey即表示到这个位置处划分为一个新的stage) 在分区的使用被广泛使用】

想必看到这大家对这个算子有了更加深入的认识,那么再附上我的Scala的一个小例子,同样是统计字母出现次数:

import org.apache.spark.{SparkContext, SparkConf}  

/**  
 * samuel ko  
 * Created by samuel on 2017/5/27.  
 */  

object MyTest {  
  def main(args: Array[String]) {  
    val conf = new SparkConf().setAppName("WordCountTest").setMaster("local[*]")  
    val sc = new SparkContext(conf)  
    val x = sc.parallelize(List("a", "b", "a", "a", "b", "b", "b", "b"))  
    val s = x.map((_, 1))  
    val result = s.reduceByKey((pre, after) => pre + after)  
    println(result.collect())  

  }  
}  

结果是:ArrayBuffer((a,3), (b,5)),很简单对吧。python的版本如下:
ReduceByKey算子理解

ps:关于stage划分的内容,详见博客深入研究 spark 运行原理之 job, stage, task

相关标签: spark 算子