Spark经典题目 spark面试题
https://blog.csdn.net/xuefenxi/article/details/81083727
https://blog.csdn.net/lijiaqi0612/article/details/79384594
1.Spark中的RDD是什么,有哪些特性?
答:RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合
机制:
所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。
rdd执行过程中会形成dag图,然后形成lineage 美 [ˈlɪniɪdʒ](血统,记住了它是如何从其它RDD中演变过来的)保证容错性等。 从物理的角度来看rdd存储的是block和node之间的映射。
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性。RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为.
当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
Dataset:就是一个集合,用于存放数据的
Distributed:分布式,可以并行在集群计算
Resilient [rɪˈzɪliənt]:表示弹性的,弹性表示
1.RDD中的数据可以存储在内存或者磁盘中;
2.RDD中的分区是可以改变的;
五大特性:
1.A list of partitions:一个分区列表,RDD中的数据都存储在一个分区列表中
2.A function for computing each split:作用在每一个分区中的函数
3.A list of dependencies on other RDDs:一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
4.Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可选的,针对于kv类型的RDD才有这个特性,作用是决定了数据的来源以及数据处理后的去向
5.可选项,数据本地性,数据位置最优
RDD的弹性表现在哪几点?
1)自动的进行内存和磁盘的存储切换;
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist 美 [pərˈsɪst] ,数据计算之后持久化缓存
6)数据调度弹性,DAG TASK调度和资源无关
7)数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.par
讲解checkpoint容错机制
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,只需要计算丢失的部分,并不需要重算全部Partition。
但是,多次迭代后数据丢失的重新计算,会影响这个效率。因此,RDD的缓存容错机制,保证即使缓存丢失也能保证快速的恢复,而不是重新计算。
checkpoint保存的目录是在HDFS目录中,保证了存储的可靠性。
checkpoint和cache一样,是transformation。当遇到action时,checkpoint会启动另一个任务,将数据切割拆分,保存到设置的checkpoint目录中。
在Spark的checkpoint源码中提到,checkpoint会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中
- 当使用了checkpoint后,数据被保存到HDFS,此RDD的依赖关系也会丢掉,因为数据已经持久化到硬盘,不需要重新计算。
- 强烈推荐先将数据持久化到内存中(cache操作),否则直接使用checkpoint会开启一个计算,浪费资源。
RDD有哪些缺陷?
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的
所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读
2)不支持增量迭代计算,Flink支持
RDD创建有哪几种方式?
1).使用程序中的集合创建rdd
2).使用本地文件系统创建rdd
3).使用hdfs创建rdd,
4).基于数据库db创建rdd
5).基于Nosql创建rdd,如hbase
6).基于s3创建rdd,
7).基于数据流,如socket创建rdd
rdd有几种操作类型?
1)transformation,rdd由一种转为另一种rdd
2)action,
3)crontroller,crontroller是控制算子,cache,persist,对性能和效率的有很好的支持
三种类型,不要回答只有2中操作
2、概述一下spark中的常用算子区别(map、mapPartitions、foreach、foreachPartition)
- map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。
- foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
- mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。
-
foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。
-
总结:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。
3、谈谈spark中的宽窄依赖
- RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
- 宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
- 窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生
区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖,窄依赖是一对一或者多对一, 宽依赖就是多对多或者一对多
-
窄依赖(narrow dependencies):
子RDD的每个分区依赖于常数个父分区(与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap(区别:map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象;flatMap:对RDD每个元素转换,然后再扁平化将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null的值).输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
从输入中选择部分元素的算子,如filter、distinct、substract、sample -
宽依赖(wide dependencies):
子RDD的每个分区依赖于所有的父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
对两个RDD基于key进行join和重组,如join
经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销。
注意:reduce是一个action,和reduceByKey完全不同。
4、spark中如何划分stage
答:概念:Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是宽窄依赖,遇到宽依赖就划分stage,每个stage包含一个或多个task,然后将这些task以taskSet的形式提交给TaskScheduler运行,stage是由一组并行的task组成
1.spark程序中可以因为不同的action触发众多的job,一个程序中可以有很多的job,每一个job是由一个或者多个stage构成的,后面的stage依赖于前面的stage,也就是说只有前面依赖的stage计算完毕后,后面的stage才会运行;
2.stage 的划分标准就是宽依赖:何时产生宽依赖就会产生一个新的stage,例如reduceByKey(不是action),groupByKey,join的算子,会导致宽依赖的产生;
3.切割规则:从后往前,遇到宽依赖就切割stage;
5、spark-submit的时候如何引入外部jar包
- 在通过spark-submit提交任务时,可以通过添加配置参数来指定
- –driver-class-path 外部jar包
- –jars 外部jar包
6、spark 如何防止内存溢出
- driver端的内存溢出
- 可以增大driver的内存参数:spark.driver.memory (default 1g)
- 这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
- map过程产生大量对象导致内存溢出
- 这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。
- 针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。
- 数据不平衡导致内存溢出
- 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区和修正分区异常的KEY
- shuffle后内存溢出
- shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
-
standalone模式下资源分配不均匀导致内存溢出
- 在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
-
使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
- rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。
Spark Rdd coalesce()方法和repartition()方法
Rdd是分区的,有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。
有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()。
coalesce()方法的作用是返回指定一个新的指定分区的Rdd。如果是生成一个窄依赖的结果,那么不会发生shuffle。比如:1000个分区被重新设置成10个分区,这样不会发生shuffle。
如果分区的数量发生激烈的变化,如设置numPartitions = 1,这可能会造成运行计算的节点比你想象的要少,为了避免这个情况,可以设置shuffle=true,那么这会增加shuffle操作.
当把父Rdd的分区数量增大时,比如Rdd的分区是100,设置成1000,如果shuffle为false,并不会起作用。
这时候就需要设置shuffle为true了,那么Rdd将在shuffle之后返回一个1000个分区的Rdd,数据分区方式默认是采用 hash partitioner。
最后来看看repartition()方法的源码:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
从源码可以看出,repartition()方法就是coalesce()方法shuffle为true的情况。
总之:如果shuffle为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD分区数变多的。
7、spark中cache和persist的区别
- cache:缓存数据,默认是缓存在内存中,其本质还是调用persist(Storage.MEMORY_ONLY),如果存不下,就只存一部分分区的数据,没有存下的数据需要重新计算。
cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache,cache不是action操作
- persist:缓存数据,有丰富的数据缓存策略,提供了一个用于指定存储级别的API,能更加灵活地控制缓存方式,数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。清除缓存使用unpersist()方法。
- executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数
SparkSQL 缓存
-
SparkSQL缓存的表是以列式存储在内存中的,使得在做查询时,不需要对整个数据集进行全部扫描,仅需要对需要的列进行扫描,所以性能有很大提升。
如果数据是以列式存储的,SparkSQL就能按列自动选择最优的压缩编码器,对它调优以减少内存使用及垃圾回收压力。DataFrame本身也是rdd,所以其实也可以直接按rdd的缓存方式缓存DataFrame
-
dataFrame.persist()
-
#直接缓存dataFrame#建议采用以下方式缓存
-
dataFrame.registerTempTable("tab_name")
-
hiveCtx.cacheTable("tab_name")
RDD持久化在执行action操作时才会被持久化,而SparkSQL中缓存表则在请求表时被缓存。
为释放内存,需要手动从缓存中删除表
-
dataFrame.unpersist()
-
hiveCtx.uncacheTable("tab_name") #基于cacheTable 方法
-
-
(1) persist()可以指定的存储级别有:
1、仅存于内存:默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候根据世代信息重新计算。
这个存储级别优点是读取速度快,缺点是占用内存大。使用这种缓存方式,代码如下:
splitDataRdd.cache();
splitDataRdd.persist() #默认就是内存,相当于splitDataRdd.persist(StorageLevel.MEMORY_ONLY());序列化形式仅存于内存:RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间,同仅存于内存,如果内存中不够,就将一部分丢弃,使用时重新计算那部分。
优点是数据序列化后数据更紧凑,占用内存更小。缺点是每次读取都需要进行序列化或者反序列化,占用更多的CPU运算。使用这种缓存方式,代码如下:
splitDataRdd.persist(StorageLevel.MEMORY_ONLY_SER());
为什么序列化?
-
序列化可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU
2、存于内存和磁盘:RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。
优点是后续读取数据不会再重新计算,缺点是也要占据大量内存,另外因为要读取磁盘,所以会带来I/O操作。使用这种缓存方式,代码如下:
splitDataRdd.persist(StorageLevel.MEMORY_AND_DISK());
序列化形式存于内存和磁盘:相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。
优点是序列化后数据更加紧凑,占用空间更少;缺点序列化和反序列化需要更多的计算。使用这样缓存方式,代码如下:
splitDataRdd.persist(StorageLevel.MEMORY_AND_DISK_SER());
3、仅存于磁盘:这种存储级别可以避免内存消耗,使用未序列化的Java对象格式,将数据全部写入磁盘文件中,但是数据读写过程涉及到磁盘I/O,CPU计算。使用这样的缓存方式,代码如下:
splitDataRdd.persist(StorageLevel.DISK_ONLY());
4、缓存在两个节点上:上面所有的存储级别都可以应用到集群的两个节点上,RDD数据都将复制到两个worker节点的内存或者磁盘里,使用这样的缓存方式,代码如下:
splitDataRdd.persist(StorageLevel.MEMORY_ONLY_2());
splitDataRdd.persist(StorageLevel.MEMORY_ONLY_SER_2());
splitDataRdd.persist(StorageLevel.MEMORY_AND_DISK_2());
splitDataRdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2());
splitDataRdd.persist(StorageLevel.DISK_ONLY_2());5、堆外存储:这种存储级别,序列化的RDD将保存在Tachyon的堆外存储上,好处是可以在executor和其他应用之间共享一个内存池,减少垃圾回收带来的消耗,也能避免在executor崩溃时丢失内存内缓存数据的问题。代码如下:
splitDataRdd.persist(StorageLevel.OFF_HEAP());
如果要从缓存中移除RDD,要么等着它以最近最久未使用(LRU)方式被消除,要么调用unpersist()方法
my_rdd.unpersist()
Spark为什么要持久化,一般什么场景下要进行persist操作?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,但分布式系统风险很高,所以容易出错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。
以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤
3)checkpoint所在的rdd要持久化persist,
lazy级别,框架发现有checkpoint,checkpoint时单独触发一个job,需要重算一遍,checkpoint前要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。
4)shuffle之后为什么要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。
8、Spark有哪些优化方法 ?
spark调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等
3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
spark-submit提交参数说明
参数名格式参数说明
--masterMASTER_URL如spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
--deploy-modeDEPLOY_MODEClient或者master,默认是client
--classCLASS_NAME应用程序的主类
--nameNAME应用程序的名称
--jarsJARS逗号分隔的本地jar包,包含在driver和executor的classpath下
--confPROP=VALUE固定的spark配置属性,默认是conf/spark-defaults.conf
--properties-fileFILE加载额外属性的文件
--driver-memory MEMDriver内存,默认1G
--driver-java-options传给driver的额外的Java选项
--driver-library-path传给driver的额外的库路径
--driver-class-path传给driver的额外的类路径
--executor-memoryMEM每个executor的内存,默认是1G
--driver-cores NUMDriver的核数,默认是1。这个参数仅仅在standalone集群deploy模式下使用
--superviseDriver失败时,重启driver。在mesos或者standalone下使用
--verbose打印debug信息
--total-executor-coresNUM所有executor总共的核数。仅仅在mesos或者standalone下使用
--executor-cores NUM每个executor的核数。在yarn或者standalone下使用
--driver-coresNUMDriver的核数,默认是1。在yarn集群模式下使用
--queueQUEUE_NAME队列名称。在yarn下使用
--num-executors NUM启动的executor数量。默认为2。在yarn下使用
task 数量=num-executors * executor-cores
9、spark中的数据倾斜的现象、原因、后果
(1)、数据倾斜的现象
多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
(2)如何解决spark中的数据倾斜问题
发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。
1、数据问题造成的数据倾斜
找出异常的key
如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。
选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个。
比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)
如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。
经过分析,倾斜的数据主要有以下三种情况:
1、null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
2、无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
3、有效数据,业务导致的正常数据分布。
解决办法
第1,2种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。
第3种情况则需要进行一些特殊操作,常见的有以下几种做法
(1) 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
(2) 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.)
(4) 使用map join。
案例
如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:
(1) 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
(2) 对数据进行 reduceByKey(func)
(3) 将 key + 随机值 转成 key
(4) 再对数据进行 reduceByKey(func)
案例操作流程分析:
假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?
进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合
注意1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。
注意2: 单独处理异常数据时,可以配合使用Map Join解决。
2、spark使用不当造成的数据倾斜
提高shuffle并行度
dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。
局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
使用map join 代替reduce join
在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了.(详细见http://blog.csdn.net/lsshlsw/article/details/50834858、http://blog.csdn.net/lsshlsw/article/details/48694893)
局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。
10、 spark中groupByKey 、aggregateByKey、reduceByKey 有什么区别?使用上需要注意什么?
(1)aggregateByKey()是先对每个partition中的数据根据不同的Key进行aggregate,然后将结果进行shuffle,完成各个partition之间的aggregate。因此,和groupByKey()相比,运算量小了很多。
(2)groupByKey()是对RDD中的所有数据做shuffle,根据不同的Key映射到不同的partition中再进行aggregate。
(3)reduceByKey()也是先在单台机器中计算,再将结果进行shuffle,减小运算量
(4) distinct()也是对RDD中的所有数据做shuffle进行aggregate后再去重。
RDD中reduceBykey与groupByKey哪个性能好,为什么
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
通过以上对比可以发现在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还是可以防止使用groupByKey造成的内存溢出问题。
11、Spark为什么比mapreduce快?
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制Linage,精华部分就是DAG和Lineage
12、简单说一下hadoop和spark的shuffle相同和差异?
1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。
2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。
在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。
Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别?
答:两者都是用mr模型来进行并行计算:
1)hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
2)spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。
3)hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。
13、Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?
答:可以画一个这样的技术栈图先,然后分别解释下每个组件的功能和场景
1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。
2)SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。
3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析
4)BlinkDB :是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
6)GraphX是Spark中用于图和图并行计算
14、spark有哪些组件?
答:主要有如下组件:
1)master:管理集群和节点,不参与计算。
2)worker:计算节点,进程本身不参与计算,和master汇报。
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。需要注意的是:
1、worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给master,master分配的时候就知道work,只有出现故障的时候才会发送资源。
2、worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
3)Driver:运行程序的main方法,创建spark context对象。主要功能:
1、一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的人口点;
2、功能:负责向集群申请资源,向master注册信息,负责了作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
4)spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
5)client:用户提交程序的入口。
SparkEnv内构建并包含如下一些重要组件的引用。 (1)MapOutPutTracker:负责Shuffle元信息的存储。 (2)BroadcastManager:负责广播变量的控制与元信息的存储。 (3)BlockManager:负责存储管理、创建和查找快。 (4)MetricsSystem:监控运行时性能指标信息。 (5)SparkConf:负责存储配置信息。
15、spark工作机制?
答:用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。
执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。 task scheduler会将stage划分为task set分发到各个节点的executor中执行。
16.Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper?
答:spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application以及Executors。standby节点要从zk中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。另外,Master切换需要注意2点
1)在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!
2) 在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;
17、Spark常用算子讲解
能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
Spark的算子的分类
从大方向来说,Spark 算子大致可以分为以下两类:
1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
从小方向来说,Spark 算子大致可以分为以下三类:
1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。
3)Action算子,这类算子会触发SparkContext提交Job作业。
1)Value数据类型的Transformation算子
一、输入分区与输出分区一对一型
1、map算子
2、flatMap算子
3、mapPartitions算子
4、glom算子
二、输入分区与输出分区多对一型
5、union算子
6、cartesian算子
三、输入分区与输出分区多对多型
7、grouBy算子
四、输出分区为输入分区子集型
8、filter算子
9、distinct算子
10、subtract算子
11、sample算子
12、takeSample算子
五、Cache型
13、cache算子
14、persist算子
2)Key-Value数据类型的Transfromation算子
一、输入分区与输出分区一对一
15、mapValues算子
二、对单个RDD或两个RDD聚集
单个RDD聚集
16、combineByKey算子
17、reduceByKey算子
18、partitionBy算子
两个RDD聚集
19、Cogroup算子
三、连接
20、join算子
21、leftOutJoin和 rightOutJoin算子
3)Action算子
一、无输出
22、foreach算子
二、HDFS
23、saveAsTextFile算子
24、saveAsObjectFile算子
三、Scala集合和数据类型
25、collect算子
26、collectAsMap算子
27、reduceByKeyLocally算子
28、lookup算子
29、count算子
30、top算子
31、reduce算子
32、fold算子
33、aggregate算子
上一篇: 初识Spark Spark
下一篇: 【赵强老师】搭建Hadoop环境