Spark常用算子总结
1 简介
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集( resilientdistributed dataset,RDD),它是逻辑集中的实体,在集群中的多台机器上进行了数据分区。通过对多台机器上不同RDD分区的控制,就能够减少机器之间的数据重排(data shuffling)。 Spark提供了“ partition By”运算符,能够通过集群中多台机器之间对原始RDD进行数据再分配来创建一个新的RDD。RDD是 Spark的核心数据结构,通过RDD的依赖关系形成 Spark的调度顺序。通过对RDD的操作形成整个 Spark程序。
2 创建方式
1)从 Hadoop文件系统(或与 Hadoop兼容的其他持久化存储系统,如Hive、 Cassandra、Hbase)输人(如HDFS)创建。
2)从父RDD转换得到新的RDD。
3 RDD的两种操作算子
对于RDD可以有两种计算操作算子: Transformation(变换)与 Action(行动)。
1)Transformation(变换)。
Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有 Actions操作时,才真正触发运算。
2)Action(行动)
Action算子会触发 Spark提交作业(Job),并将数据输出到 Spark系统。
4 RDD的重要内部属性
1)分区列表。
2)计算每个分片的函数。
3)对父RDD的依赖列表。
4)对Key- Value对数据类型RDD的分区器,控制分区策略和分区数。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。
5 Spark的数据存储
Spark数据存储的核心是弹性分布式数据集(RDD)。RDD可以被抽象地理解为个大的数组(Aray),但是这个数组是分布在集群上的。逻辑上RDD的每个分区叫一个Partition。在 Spark的执行过程中,RDD经历一个个的 Transfomation算子之后,最后通过 Action算子进行触发操作。逻辑上每经历一次变换,就会将RDD转换为一个新的RD,RDD之间通过 Lineage产生依赖关系,这个关系在容错中有很重要的作用。变换的输人和输出都是RDD。RDD会被划分成很多的分区分布到集群的多个节点中。分区是个逻辑概念,变换前后的新旧分区在物理上可能是同一块内存存储。这是很重要的优化,以防止函数式数据不
变性( immutable)导致的内存需求无限扩张。有些RDD是计算的中间结果,其分区并不定有相应的内存或磁盘数据与之对应,如果要迭代使用数据,可以调 cacheo函数缓存数据。
6 算子
6.1 算子的分类
1) Value数据类型的 Transformation算子,这种变换并不触发提交作业,针对处理的数据项是 Value型的数据。
2)Key- Value数据类型的 Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key- Value型的数据对。
3)Action算子,这类算子会触发 Spark Context提交Job作业。
6.2 Value类型
1)map类型,作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
需求:创建一个RDD,使每个元素*2组成新的RDD
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
//4、每个数变成原来的2倍
val rsRdd: RDD[Int] = listRdd.map(_ * 2)
//5、打印
rsRdd.foreach(println(_))
}
}
2)mapPartitions,类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
需求:创建一个RDD,使每个元素*2组成新的RDD
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
//4、每个数变成原来的2倍
val rsRdd: RDD[Int] = listRdd.mapPartitions(i=>{i.map(_*2)})
//5、打印
rsRdd.foreach(println)
}
}
3)mapPartitionsWithIndex(func),类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建2个分区的RDD
val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)
//4、每个数变成原来的2倍
val rsRdd: RDD[(Int, Int)] = listRdd.mapPartitionsWithIndex((index, item) => {
item.map((index, _))
})
//5、打印
rsRdd.foreach(println)
}
}
4)flatMap(func) ,对集合中每个元素进行操作然后再扁平化。
需求:把集合中元素按照’ '提取出来
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建2个分区的RDD
val listRdd: RDD[String] = sc.makeRDD(List("Hello World","Hello Spark"))
//4、扁平化
val rsRdd: RDD[String] = listRdd.flatMap(_.split(" "))
//5、打印
rsRdd.foreach(println)
}
}
5)glom, 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
需求:创建一个3个分区的RDD,并将每个分区的数据放到一个数组
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建3个分区的RDD
val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)
//4、glom
val rsRdd: RDD[Array[Int]] = listRdd.glom()
//5、打印
rsRdd.foreach(item => {
println(item.mkString(","))
})
}
}
6)groupBy,分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
需求:创建一个RDD,按照元素模以2的值进行分组。
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val listRdd: RDD[Int] = sc.makeRDD(1 to 20)
//4、分组
val rsRdd: RDD[(Int, Iterable[Int])] = listRdd.groupBy(_%2)
//5、打印
rsRdd.foreach(t=>println(t._1,t._2))
}
7)filter作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
需求:创建一个RDD ,过滤出和2取余为0的数
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
//4、过滤
val rsRdd: RDD[Int] = listRdd.filter(_%2==0)
//5、打印
rsRdd.foreach(println)
}
}
8)distinct去重分区
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val listRdd: RDD[Int] = sc.makeRDD(List(1,2,1,2,1,2))
//4、用2个分区保存结果
val rsRdd: RDD[Int] = listRdd.distinct(2)
//5、打印
rsRdd.foreach(println)
}
}
9)repartition(numPartitions)重分区,根据分区数,重新通过网络随机洗牌所有数据。
需求:创建一个4个分区的RDD,对其重新分区
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建4个分区的Rdd
val listRdd: RDD[Int] = sc.makeRDD(1 to 10, 4)
println("repartition before:" + listRdd.partitions.size)
//4、重新分区
val rsRdd: RDD[Int] = listRdd.repartition(2)
//5、打印
println("repartition after:" + rsRdd.partitions.size)
}
}
10)sortBy(func,[ascending], [numTasks]),作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
需求:创建一个RDD,按照不同的规则进行排序
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建4个分区的Rdd
val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
//4、升序
val sortAsc: Array[Int] = listRdd.sortBy(x => x).collect()
println(sortAsc.mkString(","))
//5、降序
val sortDesc: Array[Int] = listRdd.sortBy(x => x, false).collect()
println(sortDesc.mkString(","))
}
}
6.2 双Value类型交互。
1) union(otherDataset),作用:对源RDD和参数RDD求并集后返回一个新的RDD
需求:创建两个RDD,求并集
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建2个list
val list1: RDD[Int] = sc.makeRDD(1 to 2)
val list2: RDD[Int] = sc.makeRDD(2 to 4)
//4、求并级
val rsRdd: Array[Int] = list1.union(list2).collect()
//5、打印
rsRdd.foreach(println)
}
}
2)subtract (otherDataset),差集,计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来,创建两个RDD,求第一个RDD与第二个RDD的差集
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建2个list
val list1: RDD[Int] = sc.makeRDD(1 to 10)
val list2: RDD[Int] = sc.makeRDD(5 to 10)
//4、求并级
val rsRdd: Array[Int] = list1.subtract(list2).collect()
//5、打印
rsRdd.foreach(println)
}
}
3)intersection(otherDataset),交集,对源RDD和参数RDD求交集后返回一个新的RDD
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建2个list
val list1: RDD[Int] = sc.makeRDD(1 to 10)
val list2: RDD[Int] = sc.makeRDD(5 to 10)
//4、交集
val rsRdd: Array[Int] = list1.intersection(list2).collect()
//5、打印
rsRdd.foreach(println)
}
}
6.3 Key-Value类型
1)groupByKey,groupByKey也是对每个key进行操作,但只生成一个sequence。
需求:wordcount
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建list
val list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))
//4、wordcount
val mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))
//5、groupByKey
val groupByKeyRdd: RDD[(String, Iterable[Int])] = mapRdd.groupByKey()
groupByKeyRdd.foreach(println)
//6、聚合
val wcRdd: RDD[(String, Int)] = groupByKeyRdd.map(t=>(t._1,t._2.size))
wcRdd.foreach(println)
}
}
2)reduceByKey,在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
需求:wordcount
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建list
val list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))
//4、扁平化
val mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))
//5、wordcount
val reduceMapRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
//6、打印
reduceMapRdd.foreach(println)
}
}
3)sortByKey([ascending], [numTasks]),作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
创建一个pairRDD,按照key的正序和倒序进行排序
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建list
val listRdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (1, 2), (1, 4), (2, 3), (3, 6), (3, 8)))
//4、升序
val sortAsc: Array[(Int, Int)] = listRdd.sortByKey(true).collect()
println("升序:" + sortAsc.mkString(","))
//5、降序
val sortDesc: Array[(Int, Int)] = listRdd.sortByKey(false).collect()
println("降序:" + sortDesc.mkString(","))
}
}
4)join(otherDataset, [numTasks]),在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
val r1: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val r2: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rsRdd: Array[(Int, (String, String))] = r1.join(r2).collect()
rsRdd.foreach(println)
}
}
6.4 Action
1)reduce,作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
需求:创建一个RDD,将所有元素聚合得到结果。
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 5)
//4、reduce
val rs: Int = rdd.reduce(_+_)
//5、结果
println(rs)
}
}
2) collect(),在驱动程序中,以数组的形式返回数据集的所有元素。
创建一个RDD,并将RDD内容收集到Driver端打印
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val lisrRdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
val ints: Array[Int] = lisrRdd.collect()
println(ints.mkString(","))
}
}
3) count(),返回RDD中元素的个数
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
val rddCount: Long = rdd.count()
println(rddCount)
}
}
4)first(),返回RDD中的第一个元素
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
//4、返回RDD中第一个元素
val firstNum: Int = rdd.first()
println(firstNum)
}
}
5) take(n),返回一个由RDD的前n个元素组成的数组
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
val top3: Array[Int] = rdd.take(3)
top3.foreach(println)
}
}
6) takeOrdered(n),返回该RDD排序后的前n个元素组成的数组
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(5, 3, 2, 1, 7, 6))
val top3: Array[Int] = rdd.takeOrdered(3)
println(top3.mkString(","))
}
7)countByKey(),针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)))
//4、统计key
val countKey: collection.Map[Int, Long] = rdd.countByKey()
//5、打印
println(countKey)
}
}
8)foreach打印
object SparkRdd {
def main(args: Array[String]): Unit = {
//1、创建本地spark配置文件
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2、创建Spark上下文对象
val sc = new SparkContext(config)
//3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
rdd.foreach(println)
}
}