欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RDD常用的transformation及分区详解

程序员文章站 2022-06-01 18:37:47
...

常用的transformation(转换,延迟加载)

创建RDD有两种方法:

1.通过driver端,也就是spark-shell端通过集合来创建。

2.可以通过集群上的数据来创建。


通过driver端创建的集合通过parallelize并行化集合可以转换为RDD

scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[91] at parallelize at <console>:26

在RDD中存在分区,例如将一个集合并行化为一个RDD后,在经过一系列转化,最后存入hdfs。尽管最后存入的数据为空,但是在hdfs上会产生固定的文件数量,我的一直是4个。

scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[91] at parallelize at <console>:26

scala> val rdd2 = rdd1.filter(_%2==0)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[92] at filter at <console>:28

scala> val rdd3 = rdd2.map(_ * 10)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[93] at map at <console>:30

scala> rdd1.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res12: Array[Int] = Array(2, 4)

scala> rdd3.collect
res13: Array[Int] = Array(20, 40)

scala> rdd3.saveAsTextFile("hdfs://slave1.hadoop:9000/spark/tmp")
                                                                                

这是存入hdfs的文件

[aaa@qq.com spark-2.2.0]# hadoop fs -ls /spark/tmp
Found 5 items
-rw-r--r--   2 root supergroup          0 2018-08-30 21:21 /spark/tmp/_SUCCESS
-rw-r--r--   2 root supergroup          0 2018-08-30 21:21 /spark/tmp/part-00000
-rw-r--r--   2 root supergroup          3 2018-08-30 21:21 /spark/tmp/part-00001
-rw-r--r--   2 root supergroup          0 2018-08-30 21:21 /spark/tmp/part-00002
-rw-r--r--   2 root supergroup          3 2018-08-30 21:21 /spark/tmp/part-00003

然后继续查看数据的分区,为什么是四个呢。

//查看分区
scala> rdd1.partitions.length
res15: Int = 4

scala> rdd2.partitions.length
res16: Int = 4

scala> rdd3.partitions.length
res17: Int = 4

其实这是跟执行运算的总核数有关系的,我的每台机器分配了两个核,一共两台worker,所以总核数就是4.

RDD常用的transformation及分区详解

RDD也可以指定分区,在RDD生成的时候可以指定。sc.parallelize(arr,2),这样生成的RDD就是有两个分区。如果不指定的话RDD分区数量就跟EXECUTOR的总核数有关。上面的例子是没有指定分区的情况,下面演示指定分区的情况:

scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(arr, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:26

scala> rdd1.partitions.length
res23: Int = 2

如果RDD是通过集群上的数据生成的,RDD的分区就跟文件的切片有关系。假如hdfs目录下有两个小文件,那么读取的数据转换成RDD就是有两个分区,因为文件较小,所以一个文件就是一个切片。

hdfs的spark文件夹下面有两个数据:

[aaa@qq.com ~]# hadoop fs -ls /spark
Found 2 items
-rwxrwxrwx   2 root supergroup         42 2018-08-24 20:11 /spark/a.txt
-rw-r--r--   2 root supergroup         44 2018-08-30 22:09 /spark/b.txt

那么读取该数据后生成的RDD为几个分区呢!以下例子进行展示:

scala> val rdd1 = sc.textFile("/spark")
rdd1: org.apache.spark.rdd.RDD[String] = /spark MapPartitionsRDD[105] at textFile at <console>:24

scala> rdd1.partitions.length
res26: Int = 2

scala> 

结果为2,跟想象中的一样。

相关标签: spark RDD