RDD与算子
1.RDD
[1].概念
RDD(Resilient Distributed Dateset),弹性分布式数据集。
[2].RDD的五大特性:
①.RDD是由一系列的partition组成的。
②.函数是作用在每一个partition(split)上的。
③.RDD之间有一系列的依赖关系。
④.分区器是作用在K,V格式的RDD上。
⑤.RDD提供一系列最佳的计算位置。利于数据处理的本地化
[3].注意:
①.textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
②.RDD实际上不存储数据,Partition也不存数据。
③.什么是K,V格式的RDD?
如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
④.哪里体现RDD的弹性(容错)?
i.partition数量,大小没有限制,体现了RDD的弹性。
ii.RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
⑤.哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的。
⑥.RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。
2.Lineage(血统)
[1].可以把每个“猴”看做一个RDD
[2].上图也可以理解为一个简单的DAG(有向无环图)
3.Spark任务执行原理
[1].以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
[2].Driver与集群节点之间有频繁的通信。
[3].Driver负责任务(tasks)的分发和结果(result)的回收、任务的调度。如果task的计算结果非常大就不要回收了,会造成oom。
[4].Worker是Standalone资源调度框架里面资源管理的从节点。是JVM进程。
[5].Master(未画出)是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。
4.算子
[1].在spark中,RDD调用的方法称之为算子
[2].在spark中,算子大致分为3类(有些地方将持久化算子不列入分类之中):Transformation(转换)算子、Action(行动)算子、持久化(控制)算子
5.Spark代码流程
[1].创建SparkConf对象
①.可以设置Application name。
②.可以设置运行模式及资源需求。
[2].创建SparkContext对象
[3].基于Spark的上下文创建一个RDD,对RDD进行处理。
[4].应用程序中要有Action类算子来触发Transformation类算子执行。
[5].关闭Spark上下文对象SparkContext。
6.Transformations转换算子
[1].概念:
①.Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。
②.Transformations算子是延迟执行,也叫懒加载执行。
③.Transformations算子的运算结果是RDD类型(也即:RDD->RDD)。
[2].Transformation类算子举例:
①.filter
过滤符合条件的记录数,true保留,false过滤掉。
②.map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。
③.flatMap
先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
④.sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
⑤.reduceByKey
将相同的Key根据相应的逻辑进行处理。
⑥.sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。
7.Action行动算子
[1].概念:
①.Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。
②.Transformations类算子是延迟执行,Action类算子是触发执行。
③.一个application应用程序中有几个Action类算子执行,就有几个job运行。
④.Action算子的运算结果是非RDD类型(也即:RDD->非RDD)
[2]Action类算子举例
①.count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。
②.take(n)
返回一个包含数据集前n个元素的集合。
③.first
first=take(1),返回数据集中的第一个元素。
④.foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
⑤.collect
将计算结果回收到Driver端。
8.持久化(控制)算子
[1].概念:
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。注意:cache、persist、checkpoint持久化单位都是partition
[2].cache
①.默认将RDD的数据持久化到内存中。cache是懒执行。
②.注意:chche() = persist() = persist(StorageLevel.Memory_Only)
③.测试
i.测试文件:NASA_access_log_Aug95
ii.程序
package com.w4xj.scala.spark.wordcount
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* Cache算子持久化测试
* cache()默认将RDD中的数据缓存在内存中,属于懒执行算子
* @author w4xj
* @date 2019年6月12日 14:44:02
*/
object ScalaCache {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ScalaCache")
val context: SparkContext = new SparkContext(conf)
val lines: RDD[String] = context.textFile("./NASA_access_log_Aug95")
lines.cache
val startTime01: Long = System.currentTimeMillis
//此次数据来自于磁盘
val count01: Long = lines.count
val endTime01: Long = System.currentTimeMillis
println("共" + count01 + "条数据," + "初始化时间+cache时间+计算时间=" + (endTime01 - startTime01) + "ms")
val startTime02: Long = System.currentTimeMillis
//此次数据来自于缓存在内存中的数据
val count02: Long = lines.count
val endTime02: Long = System.currentTimeMillis
println("共" + count02 + "条数据," + "初始化时间+cache时间+计算时间=" + (endTime02 - startTime02) + "ms")
context.stop
}
}
iii.运行结果
共1569898条数据,初始化时间+cache时间+计算时间=11655ms
共1569898条数据,初始化时间+cache时间+计算时间=207ms
[3].persist:
①.可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
②.查看StorageLevel持久化级别源码
i.我们直接点进去,发现虽然我们用maven构建,依然没有下载源码,显示的是IDEA反编译字节码(Eclipse就更不友好了)
ii.解决:下载源码(IDEA直接点击下载然后自动关联,这也是为什么推荐IDEA的原因)然后关联源码
iii.再次查看持久化级别
iv.使用注意:
尽量避免使用DISK_ONLY级别
尽量避免使用xxx_2级别
③.cache和persist的注意事项:
i.cache和persist都是懒执行,必须有一个action类算子触发执行。
ii.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
iii.cache和persist算子后不能立即紧跟action算子。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。所以是不能持久化的
iv.当application执行完成后,持久化的数据将会被清除
[4].checkpoint
①.checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
②.checkpoint 的执行原理:
i 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
ii 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
iii.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到指定的checkpoint目录中。
iv.切断RDD的依赖关系
③.优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
④.checkpoint与persist
i.persist也可使用磁盘存储,但是在application完成之后就会进行回收,checkpoint执行完成之后,磁盘上依然存在数据
ii.checkpoint会切断与前面RDD的关联关系
iii.某些特定场景必须用checkpoint
⑤.示例
i.
package com.w4xj.scala.spark.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author by w4xj
* @Classname ScalaCheckpoint
* @Description TODO
* @Date 2019/6/14 11:28
* @Created by IDEA
*/
object ScalaCheckpoint {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ScalaCheckpoint")
val context = new SparkContext(conf)
context.setCheckpointDir("./checkpointDir")
//文件中加载数据
//val parallelize: RDD[String] = context.textFile("./words")
//模拟RDD数据
val parallelize: RDD[Int] = context.parallelize(List(1,2,3))
//懒执行,需要action触发
parallelize.checkpoint
println(parallelize.count)
context.stop
}
}
ii.运行结果
3
iii.生成持久化文件
上一篇: Spark 算子
下一篇: ReduceByKey算子理解