欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark的转换操作基础知识和一些使用技巧!!!(Unit3)

程序员文章站 2022-06-01 18:14:48
...

在阅读完最早之前的帖子(点击打开链接)说了如何理解RDD和什么是RDD和对一些基础的术语的解读示例,然后我又发了一份如何创建RDD(点击打开链接)我们这节课来学学习Spark的一些对RDD的转换操作,转换操作就是不会真的进行分布式计算,而是将RDD从一种状态转换到另外一种状态,延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作,转换操作大致分为以下两种形式我们着重介绍第一种

(1)非KV转换操作即基础转换操作        

1.map(func)数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD 

对应图解析

Spark的转换操作基础知识和一些使用技巧!!!(Unit3)

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10)  //创建RDD
    val map = rdd.map(_*2)             //对RDD中的每个元素都乘于2
    map.foreach(x => print(x+" "))
    sc.stop()
  }
}


输出结果:

2 4 6 8 10 12 14 16 18 20

2.flatMap(func):与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出个人觉得所谓的扁平化处理就是对map之后的的单个集合在进行一次分散把RDD的数据变成不存在

对应图解析:

Spark的转换操作基础知识和一些使用技巧!!!(Unit3)

实列

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
      val rdd = sc.parallelize(1 to 5)
   val fm = rdd.flatMap(x => (1 to x)).collect()
   fm.foreach( x => print(x + " ")))
    sc.stop()
  }
}    

输出:
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
如果是map那么他的输出应该是:
Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)

3.mapPartitions(func):类似与map,map作用于每个分区的每个元素,但mapPartitions作用于每个分区工
func的类型:Iterator[T] => Iterator[U]
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,当在映射的过程中不断的创建对象时就可以使用mapPartitions比map的效率要高很多,比如当向数据库写入数据时,如果使用map就需要为每个元素创建connection对象,但使用mapPartitions的话就需要为每个分区创建connetcion对象

图解:

Spark的转换操作基础知识和一些使用技巧!!!(Unit3)

实例

object MapPartitions {
//定义函数 
  def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {
    var woman = List[String]()
    while (iter.hasNext){
      val next = iter.next()
      next match {
        case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman
        case _ =>
      }
    }
    return  woman.iterator
  }
 
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("mappartitions")
    val sc = new SparkContext(conf)
    val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
    val rdd = sc.parallelize(l,2)
    val mp = rdd.mapPartitions(partitionsFun)
    /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
    mp.collect.foreach(x => (print(x +" ")))   //将分区中的元素转换成Aarray再输出
  }
}

结果:

kpop lucy
4.union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

示例

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
   val rdd1 = sc.paralleliz(1 to 3)
   val rdd2 = sc.parallelize(3 to 5)
   val unionRDD = rdd1.union(rdd2)
   unionRDD.collect.foreach(x => print(x + " "))
   sc.stop 
   
  }
}

结果:

1 2 3 3 4 5

5.intersection(otherDataset):返回两个RDD的交集

实例:

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(1 to 3)
    val rdd2 = sc.parallelize(3 to 5)
    val unionRDD = rdd1.intersection(rdd2)
    unionRDD.collect.foreach(x => print(x + " "))
    sc.stop 

输出

3

6.distinct([numTasks]):对RDD中的元素进行去重

实例:

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(x => print(x + " ")) 
结果:
1 6 9 5 2

7.cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作

图解:

Spark的转换操作基础知识和一些使用技巧!!!(Unit3)

实例:

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.foreach(x => println(x + " "))       

结果:

(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)
7.coalesce(numPartitions,shuffle):对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数目,但不会报错,只是分区个数还是原来的,所以就是这个方法适合将分区数减小
shuffle=false
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7)
println("重新分区后的分区个数:"+coalesceRDD.partitions.size) 
结果:
重新分区后的分区个数:3
//分区后的数据集
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12, 13, 14, 15, 16) 
8.repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和coalesce(numPartition,true)的一样


(2)KV转换操作即键值转换     

Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pairRDD。提供并行操作各个节点或跨界点重新进行数据分组的操作接口。

1、聚合操作
pair RDD上择优相应的针对键的转化操作。
Scala中使用mapValues()和reduceByKey()计算每个键对应的平均值:

2、并行度调优
Spark提供了repartion()函数。他会把数据通过网络进行混洗。并创建出新的分区集合。对数据分区是代价比较大。
优化版的repartion()函数叫做coalesce()。java或者Scala可以使用rdd.partitions.size查看RDD的分区数。并确保调用coalesce()时将RDD合并到比现在的分区更少的分区中。
3、数据分组
groupByKey()使用RDD中的键对数据进行分组。对于一个由类型k的键和类型v的值组成的RDD,所得到的结果RDD类型会是[K,Iterable[V]];groupByKey可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组。它可以接受一个函数,对源RDD中的每个元素使用该函数,将返回结果作为键在进行分组。
cogroup()函数对多个共享同一个键的RDD进行分组。对两个键类型均为K而值的类型分别为V和W的RDD进行cogroup()时,得到的结果RDD类型为[(K,(Iterable[V],Iterable[W]))].
4、连接
普通的join连接是内链接,只有在两个Pair RDD中都存在的键菜叫输出。当一个输入对应的某个键有多个值时生成的Pair RDD会包括来自两个输入RDD的每一组相对应的记录。
leftOuterJoin()源RDD的每一个键都有对应的记录。每个键相对应的值是由一个源RDD的值与一个包含第二个RDD的值的option对象组成的二元数组。而rightOuterJoin()的结果与其相反。
5、数据排序
使用sortByKey()函数接受一个参数,表示是否让结果按升序排列(默认是true)