Spark的转换操作基础知识和一些使用技巧!!!(Unit3)
在阅读完最早之前的帖子(点击打开链接)说了如何理解RDD和什么是RDD和对一些基础的术语的解读示例,然后我又发了一份如何创建RDD(点击打开链接)我们这节课来学学习Spark的一些对RDD的转换操作,转换操作就是不会真的进行分布式计算,而是将RDD从一种状态转换到另外一种状态,延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作,转换操作大致分为以下两种形式我们着重介绍第一种
(1)非KV转换操作即基础转换操作
1.map(func):数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD
对应图解析
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10) //创建RDD
val map = rdd.map(_*2) //对RDD中的每个元素都乘于2
map.foreach(x => print(x+" "))
sc.stop()
}
}
输出结果:
2 4 6 8 10 12 14 16 18 20
2.flatMap(func):与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出个人觉得所谓的扁平化处理就是对map之后的的单个集合在进行一次分散把RDD的数据变成不存在
对应图解析:
实列
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 5)
val fm = rdd.flatMap(x => (1 to x)).collect()
fm.foreach( x => print(x + " ")))
sc.stop()
}
}
输出:
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
如果是map那么他的输出应该是:
Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)
图解:
实例
object MapPartitions {
//定义函数
def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {
var woman = List[String]()
while (iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman
case _ =>
}
}
return woman.iterator
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("mappartitions")
val sc = new SparkContext(conf)
val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
val rdd = sc.parallelize(l,2)
val mp = rdd.mapPartitions(partitionsFun)
/*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
mp.collect.foreach(x => (print(x +" "))) //将分区中的元素转换成Aarray再输出
}
}
结果:
kpop lucy
4.union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重示例
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd1 = sc.paralleliz(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.union(rdd2)
unionRDD.collect.foreach(x => print(x + " "))
sc.stop
}
}
结果:
1 2 3 3 4 5
5.intersection(otherDataset):返回两个RDD的交集
实例:
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.intersection(rdd2)
unionRDD.collect.foreach(x => print(x + " "))
sc.stop
输出
3
6.distinct([numTasks]):对RDD中的元素进行去重
实例:
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(x => print(x + " "))
结果:1 6 9 5 2
7.cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作
图解:
实例:
object Map {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("map")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.foreach(x => println(x + " "))
结果:
(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7)
println("重新分区后的分区个数:"+coalesceRDD.partitions.size)
结果:重新分区后的分区个数:3
//分区后的数据集
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12, 13, 14, 15, 16)
8.repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和coalesce(numPartition,true)的一样(2)KV转换操作即键值转换
Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pairRDD。提供并行操作各个节点或跨界点重新进行数据分组的操作接口。
1、聚合操作
pair RDD上择优相应的针对键的转化操作。
Scala中使用mapValues()和reduceByKey()计算每个键对应的平均值:
2、并行度调优
Spark提供了repartion()函数。他会把数据通过网络进行混洗。并创建出新的分区集合。对数据分区是代价比较大。
优化版的repartion()函数叫做coalesce()。java或者Scala可以使用rdd.partitions.size查看RDD的分区数。并确保调用coalesce()时将RDD合并到比现在的分区更少的分区中。
3、数据分组
groupByKey()使用RDD中的键对数据进行分组。对于一个由类型k的键和类型v的值组成的RDD,所得到的结果RDD类型会是[K,Iterable[V]];groupByKey可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组。它可以接受一个函数,对源RDD中的每个元素使用该函数,将返回结果作为键在进行分组。
cogroup()函数对多个共享同一个键的RDD进行分组。对两个键类型均为K而值的类型分别为V和W的RDD进行cogroup()时,得到的结果RDD类型为[(K,(Iterable[V],Iterable[W]))].
4、连接
普通的join连接是内链接,只有在两个Pair RDD中都存在的键菜叫输出。当一个输入对应的某个键有多个值时生成的Pair RDD会包括来自两个输入RDD的每一组相对应的记录。
leftOuterJoin()源RDD的每一个键都有对应的记录。每个键相对应的值是由一个源RDD的值与一个包含第二个RDD的值的option对象组成的二元数组。而rightOuterJoin()的结果与其相反。
5、数据排序
使用sortByKey()函数接受一个参数,表示是否让结果按升序排列(默认是true)
上一篇: 仿Js的二级菜单
下一篇: 原生JS制作二级菜单+轮播图效果