欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Spark的算子:方法、函数

程序员文章站 2022-05-09 23:31:27
1、什么是RDD? 最核心 *)弹性分布式数据集,Resilent distributed DataSet (*)Spark中数据的基本抽象 (*)结...

1、什么是RDD? 最核心

*)弹性分布式数据集,Resilent distributed DataSet
        (*)Spark中数据的基本抽象
        (*)结合源码,查看RDD的概念
         * Internally, each RDD is characterized by five main properties:
         *
         *  - A list of partitions
            一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker

         *  - A function for computing each split
            一个函数,用于计算每个分区中的数据
            RDD的函数(算子)
                (1)Transformation(延时加载)
                (2)Action(会触发计算)

         *  - A list of dependencies on other RDDs
            RDD之间存在依赖关系:(1)窄依赖  (2)宽依赖
            根据依赖的关系,来划分任务的Stage(阶段)

         *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
         *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)        

        如何创建一个RDD?有两种方式
        (1)使用sc.parallelize方法
               val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)

        (2)通过使用外部的数据源创建RDD:比如:HDFS
               val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt")
               val rdd2 = sc.textFile("/root/temp/input/data.txt")
RDD的分区

Spark的算子:方法、函数

2、Transformation算子:不会触发计算、延时加载(lazy值)
        map(func):该操作是对原来的RDD进行操作后,返回一个新的RDD
        filter: 过滤操作、返回一个新的RDD
        flatMap:类似map
        mapPartitions:对每个分区进行操作
        mapPartitionsWithIndex: 对每个分区进行操作,带分区的下标
        union   并集
        intersection 交集
        distinct  去重
        groupByKey:   都是按照Key进行分组
        reduceByKey:  都是按照Key进行分组、会有一个本地操作(相当于:Combiner操作)

3、Action算子:会触发计算
        collect: 触发计算、打印屏幕上。以数组形式返回
        count:  求个数
        first: 第一个元素(take(1))
        take(n)
        saveAsTextFile: 会转成String的形式,会调用toString()方法
        foreach: 在RDD的每个元素上进行某个操作
7、RDD算子的基础例子
1、创建一个RDD(数字)
    val rdd1 = sc.parallelize(List(5,6,1,2,10,4,12,20,100,30))

    每个元素*2,然后排序
    val rdd2 = rdd1.map(_*2).sortBy(x=>x,true)

    完整
    val rdd2 = rdd1.map((x:Int)=>x*2)

    过滤出大于10的元素
    val rdd3 = rdd2.filter(_>10)
    rdd3.collect

2、创建一个RDD(字符)
   val rdd1 = sc.parallelize(Array("a b c","d e f","h i j"))
   val rdd2 = rdd1.flatMap(_.split(' '))
   rdd2.collect

3、集合运算、去重
    val rdd1 = sc.parallelize(List(5,6,7,8,1,2))
    val rdd2 = sc.parallelize(List(1,2,3,4))

    val rdd3 = rdd1.union(rdd2)
    rdd3.distinct.collect
    val rdd4 = rdd1.intersection(rdd2)

4、分组
    val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))
    val rdd2 = sc.parallelize(List(("Jerry",500),("Tom",3000),("Mike",2000)))

    并集
    val rdd3 = rdd1 union rdd2
    scala> val rdd4 = rdd3.groupByKey
    rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[27] at groupByKey at :30

    scala> rdd4.collect
    res8: Array[(String, Iterable[Int])] = Array((Tom,CompactBuffer(1000, 3000)),
                                                 (Jerry,CompactBuffer(3000, 500)), 
                                                 (Mike,CompactBuffer(2000)), 
                                                 (Mary,CompactBuffer(2000)))


按照Value排序
提示:交换了key-value的位置(两次)
sortByKey: 按照key进行排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
4、RDD的缓存机制:默认在内存中
        (*)提高效率
        (*)默认:缓存在Memory中
        (*)调用:方法:persist或者cache

            def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
            def cache(): this.type = persist()

        (*)缓存的位置:StorageLevel定义的
              val NONE = new StorageLevel(false, false, false, false)
              val DISK_ONLY = new StorageLevel(true, false, false, false)
              val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
              val MEMORY_ONLY = new StorageLevel(false, true, false, true)
              val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
              val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
              val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
              val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
              val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
              val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
              val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
              val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

        (*)示例:  
                测试数据:Oracle数据库的订单变 sales表(大概92万)
                步骤
                (1)读入数据
                     val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")

                (2)计算
                      rdd1.count      ---> Action,这一次没有缓存
                      rdd1.cache      ---> 缓存数据,但是不会触发计算,cache是一个Transformation
                      rdd1.count      ----> 触发计算,将结果缓存
                      rdd1.count      ----> ???会从哪里得到数据,从缓存中得到

5、RDD的容错机制:checkpoint检查点:两种类型 (1)本地目录  (2)HDFS目录
        (1)复习检查点:HDFS中,合并元信息
                         Oracle中,会以最高优先级唤醒数据库写进程(DBWn),来写内存中的脏数据---> 数据文件
        (2)RDD的检查点:容错机制,辅助Lineage(血统)---> 整个计算的过程
             如果lineage越长,出错的概率就越大(生成检查点,如果出错,就从之前的检查点开始计算)


        两种类型 (1)本地目录  : 需要将spark-shell运行在本地模式上


                 (2)HDFS目录: 需要将spark-shell运行在集群模式上
                    scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint")

                    scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")
                    rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at :24

                    scala> rdd1.checkpoint

                    scala> rdd1.count

        源码中的说明:
          /**
           * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
           * directory set with `SparkContext#setCheckpointDir` and all references to its parent
           * RDDs will be removed. This function must be called before any job has been
           * executed on this RDD. It is strongly recommended that this RDD is persisted in
           * memory, otherwise saving it on a file will require recomputation.
           */       


6、RDD的依赖关系、划分Spark任务的Stage(阶段)
        (*)窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用
                    比方:独生子女


        (*)宽依赖:多个子RDD的分区会依赖同一个父RDD的分区
                   比方:超生