欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark知识点整理

程序员文章站 2024-02-25 10:23:46
...

Spark知识点整理

版本:Spark-2.1.1

Spark架构

Spark架构主要包含如下角色:

  • Driver: 主计算进程,Spark job的驱动器
  • Executor: 执行器,Worker上的计算进程
  • Cluster Master: 主节点,在standalone模式中为主节点,控制整个集群,监控Worker. 在Yarn模式中充当资源管理器(Resource Manager)
  • Worker: 从节点,负责控制计算节点,启动Executor或Driver

Driver

Spark的驱动器是执行main方法的进程,负责创建SparkContext,创建RDD,以及进行RDD的转化操作和行动操作的执行。Driver具有以下职能:

  1. 把用户程序转为Job
  2. 跟踪Executor运行状况
  3. 为执行器节点调度任务
  4. UI展示应用运行状况

Executor

Spark Executor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立,Spark应用启动时,Executor节点同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor上继续执行。Executor具有以下职能:

  1. 负责运行Spark Task, 并将结果返回给Driver进程
  2. 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储. RDD是直接存储在Executor进程内的,因此任务可以在运行时充分利用缓存加速运算

Driver与Executor关系

  • Executor: 接收任务并执行任务。RDD算子中的计算功能由Executor执行
  • Driver: 创建Spark Context对象的应用程序。Spark程序除了计算RDD计算以外的逻辑由Driver执行,Executor代码中引用的Driver部分的对象必须是可序列化的,因为可能需要网络传输

Spark运行流程

Spark知识点整理

运行模式

Local模式

本地模式,Master和Worker均为本机,可以断点调试

Standalone模式

使用Spark本身的资源管理和调度

Spark知识点整理

Yarn模式

使用Yarn作为Spark的资源管理和调度器,又分为Yarn-client模式和Yarn-cluster模式两种,涉及ResourceManager/NodeManager/ApplicationMaster/Container等组件

Yarn-client模式

  • driver运行在client客户端
  • client与Executor Container通信进行作业调度
  • 适用于调试,在客户端可以看到日志
  • client与Yarn集群的连接断开或client关闭,任务就挂了

Yarn-cluster模式

  • driver运行在ApplicationMaster中
  • ApplicationManager与Executor Container通信进行作业调度
  • 日志需要登录到Yarn集群的节点才能看到
  • client关闭或断开,任务不受影响,继续运行
  • 适用于生产环境

ResourceManager

  • 处理客户端请求
  • 监控NodeManager
  • 启动或监控ApplicationMaster
  • 资源的分配与调度

NodeManager

  • 管理单个节点上的资源
  • 处理来自ResourceManager的命令
  • 处理来自ApplicationMaster的命令

ApplicationMaster

  • 负责数据切分
  • 为应用程序申请资源并分配给内部的任务
  • 任务的监控与容错

Container

Yarn的资源抽象,封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络

工作流程

Spark知识点整理

RDD

什么是RDD

RDD (Resilient Distributed Dataset,分布式弹性数据集),是Spark的基本数据结构,在代码中是一个抽象类,代表不可变、可分区、里面的元素可并行计算的集合。RDD源码的设计模式类似Java IO,使用了装饰器模式,是对各种数据操作的封装。

RDD的属性

  • 一组分区(partition),即数据集的基本组成单位
  protected def getPartitions: Array[Partition]
  • 一个计算每个分区的函数
  def compute(split: Partition, context: TaskContext): Iterator[T]
  • RDD之间的依赖关系,也叫『血缘』
  protected def getDependencies: Seq[Dependency[_]] = deps
  • Partioner,RDD的分区函数
  @transient val partitioner: Option[Partitioner] = None
  • 一个列表,存储每个Partition的优先位置(preferred location)
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  • 理念:移动数据不如移动计算

RDD的特点

  • 分区

    • Spark的数据是分区的,分区使得并行计算成为可能
    • 只有K-V数据才有分区器
    • Spark支持Hash分区器和Range分区器
  • 只读

    • RDD不能修改,只能通过已有的RDD经过算子变换成新的RDD
    • 两类算子
      • Transformations: 构建RDD的血缘,不立即执行计算
      • Actions: 触发RDD计算,将结果写入外部系统
  • 依赖

    • 窄依赖:上下游RDD之间的依赖是一一对应的
    • 宽依赖:一个下游RDD依赖多个上游RDD
    • 通过依赖关系可以将一个Spark任务描述为一个DAG(有向无环图)
    • 宽依赖是划分stage的依据
    • 通过rdd.dependencies查看依赖的类型(OneToOneDependencySuffleDependency
    • 通过rdd.toDebugString查看依赖血缘
  • 缓存

    • 可以将RDD缓存到内存,只有第一次根据RDD的血缘计算,之后的计算直接从内存中取缓存
    • 使用RDD的persistcache方法进行缓存,persist默认会把数据以序列化的形式存储在JVM的堆空间中
  • 持久化

    • 对长时间的任务,将中间RDD持久化到checkpoint,加速容错恢复
    • 在SparkContext中设置检查点保存目录(sc.setCheckpointDir(dir),通常dir设置为HDFS路径),然后调用RDD的checkpoint方法
    • checkpoint会打断血缘,checkpoint后的RDD的血缘会从checkpoint开始

创建RDD

从内存中创建RDD

  • parallelize

  • makeRDD,底层实现使用parallelize

从外部存储创建RDD

  • textFile

Spark任务划分

  • Application: 初始化一个SparkContext即生成一个Application

  • Job: 一个Action算子即生成一个Job

  • Stage: 根据RDD之间的依赖关系将Job划分成不同的Stage, 遇到一个宽依赖则划分一个新的Stage

  • Task: 将Stage最后一个RDD的分区分发到不同的Executor,每个Executor上执行一个Task

广播变量和累加器

  • RDD: 分布式数据集
  • 广播变量: 分布式只读变量(需要注册)
    • 使用广播变量可以减少网络传输和内存使用:从每个task存储一份变量到每个Executor存储一份变量
  • 累加器: 分布式只写变量(多个task共同操作同一个变量)

SparkSQL

DataFrame

  • since Spark 1.3

  • 类似传统数据库的二维表

  • SparkSession是创建DataFrame和执行SQL的入口

DataSet

  • since Spark 1.6

  • DataFrame的一个扩展,类似Hibernate对数据表的封装

Spark Streaming

  • Spark Streaming 架构

Spark知识点整理

  • Spark Streaming编程的核心对象是DStream,是对一个时间片(时间片长度可以设置)的数据的封装. DStream的常用方法与RDD类似

  • 有状态转化操作:保存了过去的时间片的状态, 例如updateStateByKey, mapWithStates

Spark shuffle

Spark的shuffle发生在宽依赖算子,是划分两个stage的依据。在Spark 1.6之前,Spark的shuffle主要采用HashShuffleWriter;从Spark 1.6之后,Spark的shuffle主要采用SortShuffleWriter,但是当输出分区数较少时会采用BypassMergeSortShuffleWriter.

HashShuffleWriter

基于Hash的shuffle在每个map task/executor会为每一个输出分区形成一个文件,然后reduce task去取对应的文件。这样做的缺点是shuffle过程中产生很多小文件读写,并且这些小文件还要进行网络传输,效率低,容易OOM,因此在Spark 1.6中放弃了基于Hash的shuffle作为主要shuffle方式的设计思路。

SortShuffleWriter

Spark 1.6之后主要的shuffle方式,非常类似Hadoop Mapreduce的shuffle. map task的计算结果写入一个内存数据结构(可以类比Mapreduce的环形缓冲区),内存数据结构满了就会发生溢写,map task执行完后把溢写的多个小文件合用MergeSort归并成一个index file和一个data file,reduce task根据index file去fetch自己对应的data.

BypassMergeSortShuffleWriter

在Spark 1.6之后,当不存在map side combine(即aggregator)且输出分区数量小于spark.shuffle.sort.bypassMergeThreshold指定的阈值(默认为200)时,使用BypassMergeSortShuffleWriter替代SortShuffleWriter. BypassMergeSortShuffleWriter的思路类似HashShuffleWriter,但是每个map task为所有reduce task创建各自的输出文件后,在map task执行完后会将各个reduce task对应的文件合并成一个文件,并建立索引文件。