RDD常用的transformation及分区详解
程序员文章站
2022-06-01 18:37:47
...
常用的transformation(转换,延迟加载)
创建RDD有两种方法:
1.通过driver端,也就是spark-shell端通过集合来创建。
2.可以通过集群上的数据来创建。
通过driver端创建的集合通过parallelize并行化集合可以转换为RDD
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[91] at parallelize at <console>:26
在RDD中存在分区,例如将一个集合并行化为一个RDD后,在经过一系列转化,最后存入hdfs。尽管最后存入的数据为空,但是在hdfs上会产生固定的文件数量,我的一直是4个。
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[91] at parallelize at <console>:26
scala> val rdd2 = rdd1.filter(_%2==0)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[92] at filter at <console>:28
scala> val rdd3 = rdd2.map(_ * 10)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[93] at map at <console>:30
scala> rdd1.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd2.collect
res12: Array[Int] = Array(2, 4)
scala> rdd3.collect
res13: Array[Int] = Array(20, 40)
scala> rdd3.saveAsTextFile("hdfs://slave1.hadoop:9000/spark/tmp")
这是存入hdfs的文件
[aaa@qq.com spark-2.2.0]# hadoop fs -ls /spark/tmp
Found 5 items
-rw-r--r-- 2 root supergroup 0 2018-08-30 21:21 /spark/tmp/_SUCCESS
-rw-r--r-- 2 root supergroup 0 2018-08-30 21:21 /spark/tmp/part-00000
-rw-r--r-- 2 root supergroup 3 2018-08-30 21:21 /spark/tmp/part-00001
-rw-r--r-- 2 root supergroup 0 2018-08-30 21:21 /spark/tmp/part-00002
-rw-r--r-- 2 root supergroup 3 2018-08-30 21:21 /spark/tmp/part-00003
然后继续查看数据的分区,为什么是四个呢。
//查看分区
scala> rdd1.partitions.length
res15: Int = 4
scala> rdd2.partitions.length
res16: Int = 4
scala> rdd3.partitions.length
res17: Int = 4
其实这是跟执行运算的总核数有关系的,我的每台机器分配了两个核,一共两台worker,所以总核数就是4.
RDD也可以指定分区,在RDD生成的时候可以指定。sc.parallelize(arr,2),这样生成的RDD就是有两个分区。如果不指定的话RDD分区数量就跟EXECUTOR的总核数有关。上面的例子是没有指定分区的情况,下面演示指定分区的情况:
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(arr, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:26
scala> rdd1.partitions.length
res23: Int = 2
如果RDD是通过集群上的数据生成的,RDD的分区就跟文件的切片有关系。假如hdfs目录下有两个小文件,那么读取的数据转换成RDD就是有两个分区,因为文件较小,所以一个文件就是一个切片。
hdfs的spark文件夹下面有两个数据:
[aaa@qq.com ~]# hadoop fs -ls /spark
Found 2 items
-rwxrwxrwx 2 root supergroup 42 2018-08-24 20:11 /spark/a.txt
-rw-r--r-- 2 root supergroup 44 2018-08-30 22:09 /spark/b.txt
那么读取该数据后生成的RDD为几个分区呢!以下例子进行展示:
scala> val rdd1 = sc.textFile("/spark")
rdd1: org.apache.spark.rdd.RDD[String] = /spark MapPartitionsRDD[105] at textFile at <console>:24
scala> rdd1.partitions.length
res26: Int = 2
scala>
结果为2,跟想象中的一样。
上一篇: Spark RDD概念及常用算子创建过程
下一篇: 这个奇葩的八脚机器人是未来的交通工具