Spark知识点整理
Spark知识点整理
版本:Spark-2.1.1
Spark架构
Spark架构主要包含如下角色:
- Driver: 主计算进程,Spark job的驱动器
- Executor: 执行器,Worker上的计算进程
- Cluster Master: 主节点,在standalone模式中为主节点,控制整个集群,监控Worker. 在Yarn模式中充当资源管理器(Resource Manager)
- Worker: 从节点,负责控制计算节点,启动Executor或Driver
Driver
Spark的驱动器是执行main方法的进程,负责创建SparkContext,创建RDD,以及进行RDD的转化操作和行动操作的执行。Driver具有以下职能:
- 把用户程序转为Job
- 跟踪Executor运行状况
- 为执行器节点调度任务
- UI展示应用运行状况
Executor
Spark Executor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立,Spark应用启动时,Executor节点同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor上继续执行。Executor具有以下职能:
- 负责运行Spark Task, 并将结果返回给Driver进程
- 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储. RDD是直接存储在Executor进程内的,因此任务可以在运行时充分利用缓存加速运算
Driver与Executor关系
- Executor: 接收任务并执行任务。RDD算子中的计算功能由Executor执行
- Driver: 创建Spark Context对象的应用程序。Spark程序除了计算RDD计算以外的逻辑由Driver执行,Executor代码中引用的Driver部分的对象必须是可序列化的,因为可能需要网络传输
Spark运行流程
运行模式
Local模式
本地模式,Master和Worker均为本机,可以断点调试
Standalone模式
使用Spark本身的资源管理和调度
Yarn模式
使用Yarn作为Spark的资源管理和调度器,又分为Yarn-client模式和Yarn-cluster模式两种,涉及ResourceManager/NodeManager/ApplicationMaster/Container等组件
Yarn-client模式
- driver运行在client客户端
- client与Executor Container通信进行作业调度
- 适用于调试,在客户端可以看到日志
- client与Yarn集群的连接断开或client关闭,任务就挂了
Yarn-cluster模式
- driver运行在ApplicationMaster中
- ApplicationManager与Executor Container通信进行作业调度
- 日志需要登录到Yarn集群的节点才能看到
- client关闭或断开,任务不受影响,继续运行
- 适用于生产环境
ResourceManager
- 处理客户端请求
- 监控NodeManager
- 启动或监控ApplicationMaster
- 资源的分配与调度
NodeManager
- 管理单个节点上的资源
- 处理来自ResourceManager的命令
- 处理来自ApplicationMaster的命令
ApplicationMaster
- 负责数据切分
- 为应用程序申请资源并分配给内部的任务
- 任务的监控与容错
Container
Yarn的资源抽象,封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络
工作流程
RDD
什么是RDD
RDD (Resilient Distributed Dataset,分布式弹性数据集),是Spark的基本数据结构,在代码中是一个抽象类,代表不可变、可分区、里面的元素可并行计算的集合。RDD源码的设计模式类似Java IO,使用了装饰器模式,是对各种数据操作的封装。
RDD的属性
- 一组分区(partition),即数据集的基本组成单位
protected def getPartitions: Array[Partition]
- 一个计算每个分区的函数
def compute(split: Partition, context: TaskContext): Iterator[T]
- RDD之间的依赖关系,也叫『血缘』
protected def getDependencies: Seq[Dependency[_]] = deps
- Partioner,RDD的分区函数
@transient val partitioner: Option[Partitioner] = None
- 一个列表,存储每个Partition的优先位置(preferred location)
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 理念:移动数据不如移动计算
RDD的特点
-
分区
- Spark的数据是分区的,分区使得并行计算成为可能
- 只有K-V数据才有分区器
- Spark支持Hash分区器和Range分区器
-
只读
- RDD不能修改,只能通过已有的RDD经过算子变换成新的RDD
- 两类算子
- Transformations: 构建RDD的血缘,不立即执行计算
- Actions: 触发RDD计算,将结果写入外部系统
-
依赖
- 窄依赖:上下游RDD之间的依赖是一一对应的
- 宽依赖:一个下游RDD依赖多个上游RDD
- 通过依赖关系可以将一个Spark任务描述为一个DAG(有向无环图)
- 宽依赖是划分stage的依据
- 通过
rdd.dependencies
查看依赖的类型(OneToOneDependency
和SuffleDependency
) - 通过
rdd.toDebugString
查看依赖血缘
-
缓存
- 可以将RDD缓存到内存,只有第一次根据RDD的血缘计算,之后的计算直接从内存中取缓存
- 使用RDD的
persist
或cache
方法进行缓存,persist
默认会把数据以序列化的形式存储在JVM的堆空间中
-
持久化
- 对长时间的任务,将中间RDD持久化到checkpoint,加速容错恢复
- 在SparkContext中设置检查点保存目录(
sc.setCheckpointDir(dir)
,通常dir
设置为HDFS路径),然后调用RDD的checkpoint
方法 - checkpoint会打断血缘,checkpoint后的RDD的血缘会从checkpoint开始
创建RDD
从内存中创建RDD
-
parallelize
-
makeRDD,底层实现使用parallelize
从外部存储创建RDD
- textFile
Spark任务划分
-
Application: 初始化一个SparkContext即生成一个Application
-
Job: 一个Action算子即生成一个Job
-
Stage: 根据RDD之间的依赖关系将Job划分成不同的Stage, 遇到一个宽依赖则划分一个新的Stage
-
Task: 将Stage最后一个RDD的分区分发到不同的Executor,每个Executor上执行一个Task
广播变量和累加器
- RDD: 分布式数据集
- 广播变量: 分布式只读变量(需要注册)
- 使用广播变量可以减少网络传输和内存使用:从每个task存储一份变量到每个Executor存储一份变量
- 累加器: 分布式只写变量(多个task共同操作同一个变量)
SparkSQL
DataFrame
-
since Spark 1.3
-
类似传统数据库的二维表
-
SparkSession是创建DataFrame和执行SQL的入口
DataSet
-
since Spark 1.6
-
DataFrame的一个扩展,类似Hibernate对数据表的封装
Spark Streaming
- Spark Streaming 架构
-
Spark Streaming编程的核心对象是DStream,是对一个时间片(时间片长度可以设置)的数据的封装. DStream的常用方法与RDD类似
-
有状态转化操作:保存了过去的时间片的状态, 例如
updateStateByKey
,mapWithStates
Spark shuffle
Spark的shuffle发生在宽依赖算子,是划分两个stage的依据。在Spark 1.6之前,Spark的shuffle主要采用HashShuffleWriter
;从Spark 1.6之后,Spark的shuffle主要采用SortShuffleWriter
,但是当输出分区数较少时会采用BypassMergeSortShuffleWriter
.
HashShuffleWriter
基于Hash的shuffle在每个map task/executor会为每一个输出分区形成一个文件,然后reduce task去取对应的文件。这样做的缺点是shuffle过程中产生很多小文件读写,并且这些小文件还要进行网络传输,效率低,容易OOM,因此在Spark 1.6中放弃了基于Hash的shuffle作为主要shuffle方式的设计思路。
SortShuffleWriter
Spark 1.6之后主要的shuffle方式,非常类似Hadoop Mapreduce的shuffle. map task的计算结果写入一个内存数据结构(可以类比Mapreduce的环形缓冲区),内存数据结构满了就会发生溢写,map task执行完后把溢写的多个小文件合用MergeSort归并成一个index file和一个data file,reduce task根据index file去fetch自己对应的data.
BypassMergeSortShuffleWriter
在Spark 1.6之后,当不存在map side combine(即aggregator)且输出分区数量小于spark.shuffle.sort.bypassMergeThreshold
指定的阈值(默认为200)时,使用BypassMergeSortShuffleWriter
替代SortShuffleWriter
. BypassMergeSortShuffleWriter
的思路类似HashShuffleWriter
,但是每个map task为所有reduce task创建各自的输出文件后,在map task执行完后会将各个reduce task对应的文件合并成一个文件,并建立索引文件。