欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RDD与算子

程序员文章站 2022-06-01 18:15:00
...

1.RDD

[1].概念

RDD(Resilient Distributed Dateset),弹性分布式数据集

RDD与算子

[2].RDD的五大特性:

①.RDD是由一系列的partition组成的。

②.函数是作用在每一个partition(split)上的。

③.RDD之间有一系列的依赖关系。

④.分区器是作用在K,V格式的RDD上。

⑤.RDD提供一系列最佳的计算位置。利于数据处理的本地化

[3].注意:

①.textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。

②.RDD实际上不存储数据,Partition也不存数据。

③.什么是K,V格式的RDD?

如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。

④.哪里体现RDD的弹性(容错)?

i.partition数量,大小没有限制,体现了RDD的弹性。

ii.RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。

⑤.哪里体现RDD的分布式?

RDD是由Partition组成,partition是分布在不同节点上的。

⑥.RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

2.Lineage(血统)

[1].可以把每个“猴”看做一个RDD

RDD与算子

[2].上图也可以理解为一个简单的DAG(有向无环图)

3.Spark任务执行原理

RDD与算子

[1].以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。

[2].Driver与集群节点之间有频繁的通信。

[3].Driver负责任务(tasks)的分发和结果(result)的回收、任务的调度。如果task的计算结果非常大就不要回收了,会造成oom。

[4].Worker是Standalone资源调度框架里面资源管理的从节点。是JVM进程。

[5].Master(未画出)是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

4.算子

[1].在spark中,RDD调用的方法称之为算子

[2].在spark中,算子大致分为3类(有些地方将持久化算子不列入分类之中):Transformation(转换)算子、Action(行动)算子、持久化(控制)算子

5.Spark代码流程

[1].创建SparkConf对象

①.可以设置Application name。

②.可以设置运行模式及资源需求。

[2].创建SparkContext对象

[3].基于Spark的上下文创建一个RDD,对RDD进行处理。

[4].应用程序中要有Action类算子来触发Transformation类算子执行。

[5].关闭Spark上下文对象SparkContext。

6.Transformations转换算子

[1].概念:

①.Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。

②.Transformations算子是延迟执行,也叫懒加载执行。

③.Transformations算子的运算结果是RDD类型(也即:RDD->RDD)。

[2].Transformation类算子举例:

①.filter

过滤符合条件的记录数,true保留,false过滤掉。

②.map

将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

③.flatMap

先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

④.sample

随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

⑤.reduceByKey

将相同的Key根据相应的逻辑进行处理。

⑥.sortByKey/sortBy

作用在K,V格式的RDD上,对key进行升序或者降序排序。

7.Action行动算子

[1].概念:

①.Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。

②.Transformations类算子是延迟执行,Action类算子是触发执行。

③.一个application应用程序中有几个Action类算子执行,就有几个job运行。

④.Action算子的运算结果是非RDD类型(也即:RDD->非RDD)

[2]Action类算子举例

①.count

返回数据集中的元素数。会在结果计算完成后回收到Driver端。

②.take(n)

返回一个包含数据集前n个元素的集合。

③.first

first=take(1),返回数据集中的第一个元素。

④.foreach

循环遍历数据集中的每个元素,运行相应的逻辑。

⑤.collect

将计算结果回收到Driver端。

8.持久化(控制)算子

[1].概念:

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。注意:cache、persist、checkpoint持久化单位都是partition

[2].cache

①.默认将RDD的数据持久化到内存中。cache是懒执行。

②.注意:chche() = persist() = persist(StorageLevel.Memory_Only)

③.测试

i.测试文件:NASA_access_log_Aug95

ii.程序

package com.w4xj.scala.spark.wordcount



import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD



/**

* Cache算子持久化测试

* cache()默认将RDD中的数据缓存在内存中,属于懒执行算子

* @author w4xj

* @date 2019年6月12日 14:44:02

*/

object ScalaCache {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ScalaCache")

    val context: SparkContext = new SparkContext(conf)

    val lines: RDD[String] = context.textFile("./NASA_access_log_Aug95")



    lines.cache

    val startTime01: Long = System.currentTimeMillis

    //此次数据来自于磁盘

    val count01: Long = lines.count

    val endTime01: Long = System.currentTimeMillis

    println("共" + count01 + "条数据," + "初始化时间+cache时间+计算时间=" + (endTime01 - startTime01) + "ms")



    val startTime02: Long = System.currentTimeMillis

    //此次数据来自于缓存在内存中的数据

    val count02: Long = lines.count

    val endTime02: Long = System.currentTimeMillis

    println("共" + count02 + "条数据," + "初始化时间+cache时间+计算时间=" + (endTime02 - startTime02) + "ms")



    context.stop

  }

}    

iii.运行结果

共1569898条数据,初始化时间+cache时间+计算时间=11655ms

共1569898条数据,初始化时间+cache时间+计算时间=207ms

[3].persist:

①.可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。

②.查看StorageLevel持久化级别源码

i.我们直接点进去,发现虽然我们用maven构建,依然没有下载源码,显示的是IDEA反编译字节码(Eclipse就更不友好了)

RDD与算子

ii.解决:下载源码(IDEA直接点击下载然后自动关联,这也是为什么推荐IDEA的原因)然后关联源码

iii.再次查看持久化级别

RDD与算子

RDD与算子

iv.使用注意:

尽量避免使用DISK_ONLY级别

尽量避免使用xxx_2级别

③.cache和persist的注意事项:

i.cache和persist都是懒执行,必须有一个action类算子触发执行。

ii.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

iii.cache和persist算子后不能立即紧跟action算子。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。所以是不能持久化的

iv.当application执行完成后,持久化的数据将会被清除

[4].checkpoint

①.checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系

②.checkpoint 的执行原理:

i 当RDD的job执行完毕后,会从finalRDD从后往前回溯。

ii 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

iii.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到指定的checkpoint目录中。

iv.切断RDD的依赖关系

③.优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

④.checkpoint与persist

i.persist也可使用磁盘存储,但是在application完成之后就会进行回收,checkpoint执行完成之后,磁盘上依然存在数据

ii.checkpoint会切断与前面RDD的关联关系

iii.某些特定场景必须用checkpoint

⑤.示例

i.

package com.w4xj.scala.spark.wordcount



import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}



/**

  * @Author by w4xj

  * @Classname ScalaCheckpoint

  * @Description TODO

  * @Date 2019/6/14 11:28

  * @Created by IDEA

  */

object ScalaCheckpoint {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ScalaCheckpoint")

    val context = new SparkContext(conf)

    context.setCheckpointDir("./checkpointDir")

    //文件中加载数据

    //val parallelize: RDD[String] = context.textFile("./words")

    //模拟RDD数据

    val parallelize: RDD[Int] = context.parallelize(List(1,2,3))

    //懒执行,需要action触发

    parallelize.checkpoint

    println(parallelize.count)

    context.stop

  }

}

ii.运行结果

3

iii.生成持久化文件

RDD与算子