【面试】Spark大数据进阶篇
0、问题大纲
二、Spark 核心原理
2、Spark 运行原理
1、Spark 总体架构?
- 追问1:SparkContext作用
2、Spark程序的运行流程(*3)
- 追问1:关于分配资源,即作业调度,能不能再细说下?
- 追问2:DAGscheduler干了什么活?(*2)
- 追问3:Job、Task、Stage分别说下?
- 追问4:特别大的数据,怎么发送到excutor中?
- 追问5:Hadoop中job和Tasks的区别是什么?
3、宽依赖和窄依赖(*4)概念
- 追问1:哪些算子会导致宽依赖,哪些会导致窄依赖?(*3)
4、Spark中如何划分Stage?(*2)
- 追问1:划分Stage依据是什么?(*2)
- 追问2:Spark判断Shuffle的依据?
5、Spark shuffle过程是什么,shuffle的概念?
6、Spark 运行模式?(x3)其中哪几种是集群模式?
7、Spark on Yarn作业执行流程?(在集群中怎么执行程序?)
- 追问1:Yarn-Client和Yarn-Cluster有什么区别?
- 追求2:如果有聚合操作呢?
8、Spark中广播变量是什么,作用?
- 追问1:共享变量和累加器是什么?
二、Spark 核心原理
2、Spark 核心原理
1、Spark 总体架构?
答:
模块 | 简介 | 作用 |
---|---|---|
Driver | 每个应用任务控制节点 | 运行Spark Application的main()函数,创建SparkContext。 |
SparkContext | 应用入口,和整个集群交互 | 和Cluster Manager通信,进行资源申请、任务分配和监控,如创建RDD。(*2) |
Cluster Manager | 集群资源管理器 | 申请和管理Worker Node上运行所需资源 。 |
Worker Node | 工作节点 | 一个Application可以有多个运行作业任务的工作节点。 |
Executor | Worker Node的执行进程 | 运行Task,负责将数据存到内存/磁盘。 |
Task | 任务 | Executor进程中工作单元,多个Task组成一个Stage。 |
2、Spark程序的运行流程(*3)
- 构建运行环境。由Driver创建SparkContext,进行资源申请、任务分配与监控;
- 分配资源。资源管理器(Standalone/Mesos/YARN)为Executor分配资源、启动进程;
- 分解Stage,申请Task。 SparkContext构建DAG图,DAG图分解为Stage,并把TaskSet发送给TaskScheduler。Executor向SparkContext申请Task,Task Schedule将Task发放给Executor运行;
- 运行 & 注销。Task在Executor上运行,执行结果反馈给TaskScheduler与DAGScheduler,运行完释放所有资源。
追问1:关于分配资源,即作业调度,能不能再细说下?
- 1)DAGScheduler分析各个RDD间的转换依赖关系,得到DAG;
- 2)通过DAG把Job划分成多个stage;
- 3)每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行。
追问2:DAGscheduler干了什么活?(*2)
根据Job构建基于Stage的DAG(RDD间依赖关系),将Stage提交给TaskScheduler。
追问3:Job、Task、Stage分别说下?
名称 | 功能 |
---|---|
Job | 多个Task组成的并行计算阶段,Action算子触发 |
Stage | 每个Job被划分多个Task作为一个TaskSet(小的计算任务组),名为Stage |
Task | Executor上计算单元,多个Task组成一个Stage |
1 App = n Job
1 Job = n Stage = n TaskSet
1 Stage = n Task
1 App = n1 job = n1*n2 Stage = n1*n2*n3 Task
追问4:特别大的数据,怎么发送到excutor中?
……
追问5:Hadoop中job和Tasks的区别是什么?
答:Job是我们对一个完整MapReduce程序的抽象封装,Task是job运行时,每一个处理阶段的具体实例,如map task,reduce task,maptask和reduce task都会有多个并发运行的实例
3、宽依赖和窄依赖(*4)概念
答:Transformation操作会形成RDD依赖关系,其中一个父RDD节点最多只会被子RDD一个分区使用称为窄依赖,会被多个子RDD使用称为宽依赖。
窄依赖,1 父RDD <= 1 子RDD
宽依赖,1 父RDD == n 子RDD
追问1:哪些算子会导致宽依赖,哪些会导致窄依赖?(*3)
- 窄依赖:map、filter、union
- 宽依赖:groupbykey、sortByKey [会产生Shuffle过程]
- 看情况:If(分区器 == HashPartitioner), then (join、reduceByKey)为窄依赖。
说明:
- 默认两种划分器:HashPartitioner和RangePartitioner,当分区器HashPartitioner时就是窄依赖。
答:同一个key进去,通过HashPartitioner得到的分区id是一样的,计算前后同一个key得到的分区都一样,
父RDD的分区就只被子RDD的一个分区依赖,就不需要移动数据。所以是窄依赖。
4、Spark中如何划分Stage?(*2)
从执行的最后一个RDD往前推,遇到宽依赖就切割stage。
追问1:划分Stage依据是什么?(*2)
答:通过RDD间依赖关系找出开销最小的调度方法。
追问2:Spark判断Shuffle的依据?
答: 父RDD的一个分区中的数据有可能被分配到子RDD的多个分区中,这里会有Shuffle Write和Shuffle Read操作。
【标注RDD】
【分析Shuffle】
5、Spark shuffle过程是什么,shuffle的概念?
MapReduce Shuffle — Hash Shuffle V1:
MR中,Shuffle连接Map和Reduce,该阶段设计磁盘读写和网络传输,影响整个程序性能和吞吐量。
严重问题:
1、生成大量文件,消耗内存,IO操作低效。
2、Reduce Task合并操作会把数据放在HashMap合并,可能引发OOM。
Spark Shuffle - Hash Shuffle V2:
针对第一个问题,引入File Consolidation机制:一个Executor上所有Map Task生成分区文件只有一份,即将所有的Map Task相同的分区文件合并。这样每个Executor最多生成N个分区文件。
问题:这样减少了文件数,但加入下游Stage分区数N很大,还是会在每个Executor上生成N个文件,同样,若1个Executor上有K个Core,还是会有K*N个Writer Handler,这里容易导致OOM。
Spark Sort Shuffle V1:
为了解决上面问题,Spark参考了MR中Shuffle处理方式,引入基于排序的Shuffle写操作机制。
每个Task不会为后续每个Task创建单独文件,而是将所有对结果写入同一个文件,该文件首先按照Partition Id排序,每个Partition内部按照Key进行排序,Map Task运行期间会顺序写每个Partition数据,同时生成一个索引文件记录每个Partition的大小和偏移量。
Reduce阶段,Reduce Task拉取数据做Combine时不再采用HashMap,而是采用ExternalAppendOnlyMap,该数据结构做Combine时,若内存不足会刷写磁盘,很大程度上保证鲁棒性,避免大多数情况下的OOM。
看待:Sort Shuffle解决了Hash Shuffle的所有弊端,但是需要其Shuffle过程需要对记录进行排序,所以性能上有所损失。
Tungsten-Sort Based Shuffle / Unsafe Shuffle:
Spark 1.5.0开始钨丝计划,优化内存和CPU使用。由于使用了堆外内存,基于JDK Sun Unsafe API,所以也称为Unsafe Shuffle。
做法是将数据记录用二进制方式存储,直接在序列化的二进制数据上Sort而不是Java对象,一方面可减少内存使用和GC开销,另一方面避免Shuffle过程频繁的序列化和反序列化。排序过程中提供cache-efficient sorter,使用8 bytes指针,把排序转化为一个指针数据排序,极大优化排序性能。
问题:使用Tungsten-Sort Based Shuffle不能有aggregate操作,分区数不能超过一定大小,所以reduceByKey这类aggregate操作算子不能使用Tungsten-Sort Based Shuffle,会退化采用Sort Shuffle。
SortShuffleWriter 实现细节
我们可以先考虑一个问题,假如我有 100亿条数据,但是我们的内存只有1M,但是我们磁盘很大, 我们现在要对这100亿条数据进行排序,是没法把所有的数据一次性的load进行内存进行排序的,这就涉及到一个外部排序的问题,我们的1M内存只能装进1亿条数据,每次都只能对这 1亿条数据进行排序,排好序后输出到磁盘,总共输出100个文件,最后怎么把这100个文件进行merge成一个全局有序的大文件。我们可以每个文件(有序的)都取一部分头部数据最为一个 buffer, 并且把这 100个 buffer放在一个堆里面,进行堆排序,比较方式就是对所有堆元素(buffer)的head元素进行比较大小, 然后不断的把每个堆顶的 buffer 的head 元素 pop 出来输出到最终文件中, 然后继续堆排序,继续输出。如果哪个buffer 空了,就去对应的文件中继续补充一部分数据。最终就得到一个全局有序的大文件。
如何聚合:
Shuffle Write:上一个stage的每一个map task就必须保证将自己处理的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚在同一个节点上去处理和聚合。
Spark中有两种Shuffle管理类型,HashShuffleManager和SortShuffleManager,Spark1.2之前是HashShuffleManager,Spark1.2引入SortShuffleManager,在Spark2.0+版本中已经将HashShuffleManager丢弃。
Sort Shuffle V2
Spark-1.6.0 把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足 Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle。
Spark-2.0.0 把 Hash Shuffle 移除,目前 Spark-2.0 中只有一种 Shuffle,即 Sort Shuffle。
6、Spark 运行模式?(x3)其中哪几种是集群模式?
- 1)本地Local:单机运行,一般用于开发测试;
- 2)Standalone:构建Master+Slave构成的Spark集群,Spark运行在集群中;
- 3)Spark on Yarn:Spark客户直接连接Yarn,不需要额外构建Spark集群;
- 4)Spark on Mesos:Spark客户端直接连接Mesos,不需要额外构建Spark集群。
其中,Standalone,Spark on Yarn,Spark on Mesos模式是集群模式
7、Spark on Yarn作业执行流程?(在集群中怎么执行程序?)
7.1 简介
Yarn负责统一资源管理,可以运行多套计算框架,如MapReduce、Storm。Spark基于历史或性能的考量开发了Spark on Yarn运行模式,Yarn良好的弹性资源管理机制使得Application部署更方便,Application资源也完全隔离。
Spark on Yarn根据Driver在集群中的位置分为Yarn-Client模式和Yarn-Cluster(亦称Yarn-Standalone模式)。
7.2 Yarn-Client
该模式下Driver在客户端本地运行,Spark Application和客户端进行交互。因为Driver在客户端,所以可通过WebUI访问Driver状态,默认http://xxx:4040,Yarn通过http://xxx:8088访问。
工作流程:
-
Spark Yarn Client向ResourceManager申请启动Application Master。同时在SparkContext初始化中将创建DAGScheduler和TASKScheduler等。(Yarn-Client模式下,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend)
-
ResourceManager收到请求后选择一个NodeManager,为该应用程序分配第一个Container,然后启动ApplicationMaster。(VS YARN-Cluster区别在于该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派)
-
Client中SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册后,并根据任务信息申请资源(Container);
-
一旦ApplicationMaster申请到资源(Container)后,便与对应的NodeManager通信,要求在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
-
Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行状态,方便让Client掌握运行信息,可以在任务失败时重新启动任务;
-
应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
7.3 Yarn-Cluster
该模式中YARN将分两个阶段运行应用程序:
- 1、把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;
- 2、ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。
工作流程:
-
Spark Yarn Client向YARN提交应用程序,包括ApplicationMaster程序及其启动命令、在Executor中运行的程序等;
-
ResourceManager收到请求后在集群中选择一个NodeManager并分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
-
ApplicationMaster向ResourceManager注册,这样用户可直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控运行状态直到结束;
-
一旦ApplicationMaster申请到资源(Container)后,便与对应的NodeManager通信,要求在Container中启动CoarseGrainedExecutorBackend,启动后会向ApplicationMaster中SparkContext注册并申请Task。
(和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等) -
ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务运行状态,从而可以在任务失败时重新启动任务;
-
应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
追求1:Yarn-Client和Yarn-Cluster有什么区别?如果有聚合操作呢?
ApplicationMaster简介:YARN中,每个Application实例都有一个ApplicationMaster进程。ApplicationMaster进程是Application启动的第一个容器,负责和ResourceManager交互与请求资源,获取资源后告诉NodeManager为其启动Container。
YARN-Cluster VS YARN-Client模式区别—>ApplicationMaster进程差异:
- 1)YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开;
- 2)YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业.
追求2:如果有聚合操作呢?
……
8、Spark中广播变量是什么,作用?
广播变量用来高效分发较大的对象。使用广播变量,每个 Executor 的内存中,只驻留一份变量副本,而不是对 每个 task 都传输一次大变量,省了很多的网络传输, 对性能提升具有很大帮助, 而且会通过高效的广播算法来减少传输代价。
追问1:共享变量和累加器是什么?
累加器(accumulator)是 Spark 中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
三、参考
1、2020大数据面试题真题总结(附答案)
2、Spark总体架构和运行流程
3、Spark运行流程
4、Spark笔记(七)-Spark运行流程
5、理解spark中的job、stage、task
6、Spark作业调度中stage的划分
7、Spark Stage划分依据:Spark中的Stage调度算法
8、Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算
9、Spark Shuffle 详解
10、彻底搞懂 Spark 的 shuffle 过程(shuffle write)
11、Spark Shuffle基础
12、Spark的四种运行模式
13、Spark(五)Spark任务提交方式和执行流程
本文地址:https://blog.csdn.net/HeavenDan/article/details/112483627