大数据面试题整理 --持续更新中
Hadoop 常见面试题
1.mr 工作原理
mr 将得到的split 分配对应的 task,每个任务处理相对应的 split,将 split 以 line 方式读取每一行数据,将数据依次读取到100M(maprdeuce.task.io.sort.mb)的环形缓冲区读取过程中一旦到达阈值(mapreduce.map.sort.spill.percent)80M进行溢写操作,spiller线程溢写到磁盘(mapreduce.cluster.local.dir)目录中,期间会进行kv分区(分区数由reduce数来决定)默认使用hashpartition,再将分区中数据进行key的排序(默认排序规则是字典和升序),如果设置了setCombinerClass 则会对每个分区中的数据进行 combiner 操作,如果设置了output.compress压缩格式会对溢写的数据进行压缩,最后merge根据分区规则将数据归并到同一个文件中等待reduce的拉取,nodemanger将启动一个mapreduce_shuffle服务将数据以http方式拉取到reduce端,reduce处理阶段当达到阈值(默认0.66)或map输出数的阈值(默认100)会进行merge(同一分区的一组数据会先进行归并)|sort(将归并好的数据进行排序)|group(判断迭代器中的元素是否可以迭代),处理完成mr将同一个分区内的数据,在hdfs中以文件形式体现出来,几个分区就会创建几个文件。
其中reduce端的merge达到阈值会触发,而sort和group是设置后才会触发。
有效的理解mr工作流程可大大提升程序运行效率,因此mr 的 shuffle 也被称为奇迹开始的地方
2. split 机制
spilit是在mr 处理的map端之前产生的概念,split切片大小,默认等于block*1.1,在FileInputFormat中计算切片大小的逻辑:
-
blocksize:默认是 128M,可通过 dfs.blocksize 修改
-
minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改
-
maxsize:默认是 Long.MaxValue,可通过 mapreduce.input.fileinputformat.split.maxsize 修改
Hadoop FileInputFormat 源码:
public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
mr 是通过split切片来决定 task 的并行度。
为什么split不是与block 一一对应的?
大量小文件场景,map进程造成资源严重浪费。
针对大小文件场景可以手动配置。
3. namenode,datanode,secondaryNameNode分别是干什么的?
namenode,在基于主从架构的hdfs文件系统中是主节点,其主要职责就是对hdfs中的文件的元信息,副本数,文件目录树,block 数据节点信息;
datanode,它是从节点也是数据节点,基于本地磁盘存储 block(文件的形式),有相关数据块的长度、效验和、时间戳,与namnode保持心跳,汇报 block 状态。
secondaryNameNode,检查点节点,namenode 日志高可用的关键,其主要作用就是将namenode的元数据日志信息合并后备份,防止元数据丢失。
元信息:是数据文件的block大小,文件副本存储位置,副本数量,block 数量,主要体现在edits文件和fsimage文件。
副本数:hdfs 中同一个文件在多个节点中所存储的总数量,也是实现持久化和保证安全性的关键。
文件目录树:hdfs提供了一个可以维护的文件目录,该文件目录下存储着有关所有hdfs的文件。
block 数据节点信息:如a文件在01和02节点中存储,该信息称为数据节点信息。
edits:记录 client
执行创建,移动,修改文件的信息,同时体现了 HDFS
的最新的状态(二进制文件)。
它分布在磁盘上的多个文件,名称由前缀 edits
及后缀组成.后缀值是该文件包含的事务 ID,同一时刻只有一个文件处于可读写状态.为避免数据丢失,事务完成后 client
端在执行成功前,文件会进行更新和同步,当 NN
向多个目录写数据时,只有在所有操作更新并同步到每个副本之后执行才成功。
fsimage:记录的是数据块的位置信息、数据块的冗余信息(二进制文件)
由于 edits
文件记录了最新状态信息,并且随着操作越多,edits
文件就会越大,把 edits
文件中最新的信息写到 fsimage
文件中就解决了 edits
文件数量多不方便管理的情况。
没有体现 HDFS
的最新状态。
每个 fsimage
文件都是文件系统元数据的一个完整的永久性的检查点。
为什么引入 secondaryNameNode?
由于只有在重启时 fsimage
和 edits
才会进行合并,得到一个新的 fsimage
文件,但是在实际生产环境中很少会重启集群,NN
的重启需要花费很长时间,因为会有很多改动需要合并到 fsimage 文件上,如果 NN
挂掉,fsimage
文件没有更新内容,从而丢失很多改动。
但 editlog 日志大小会随着时间变的越来越大,导致系统重启,根据日志恢复元数据的时间会越来越长;
为了避免这种情况,引入检查点机制checkpoint,命名空间镜像 fsimage 就是 HDFS 元数据的持久性检查点,即将内存中的元数据落磁盘生成的文件;
了解详细可以访问我另一个博客:hdfs详细
https://blog.csdn.net/qq_43259670/article/details/105882983
4. mr on yarn 工作原理
1、向client端提交MapReduce job.
2、随后yarn的ResourceManager进行资源的分配.
3、由NodeManager进行加载与监控containers.
4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.
5、通过hdfs进行job配置文件、jar包的各节点分发。
5.fsimage 和 edits 是干什么的?为什么要使用?
edits:记录 client
执行创建,移动,修改文件的信息,同时体现了 HDFS
的最新的状态(二进制文件)。
它分布在磁盘上的多个文件,名称由前缀 edits
及后缀组成.后缀值是该文件包含的事务 ID,同一时刻只有一个文件处于可读写状态.为避免数据丢失,事务完成后 client
端在执行成功前,文件会进行更新和同步,当 NN
向多个目录写数据时,只有在所有操作更新并同步到每个副本之后执行才成功。
fsimage:记录的是数据块的位置信息、数据块的冗余信息(二进制文件)
由于 edits
文件记录了最新状态信息,并且随着操作越多,edits
文件就会越大,把 edits
文件中最新的信息写到 fsimage
文件中就解决了 edits
文件数量多不方便管理的情况。
没有体现 HDFS
的最新状态。
每个 fsimage
文件都是文件系统元数据的一个完整的永久性的检查点。
为什么使用?
NN
使用了 FsImage
+EditLog
整合的方案;
滚动将增量的 EditLog
更新到 FsImage
,以保证更近时点的 FsImage
和更小的 EditLog
体积
6. hdfs 工作原理
一般就是读写的工作流程,因为hdfs 主要还是对文件存储与读写。
读流程:
client端创建一个代理对象与namenode进行rpc通信,拿到namenode对象后请求获取文件的元信息,namenode效验无误后将元信息返回,client获取到元信息之后根据元信息读取相应datanode的block块,将block合并成一个文件进行返回。
写流程:
client端创建一个代理对象与namenode进行rpc通信,拿到namenode对象后请求创建文件的元信息,namenode触发副本放置策略,返回元数据信息,client和datanode建立piepline连接,client将packet放入一个队列中,并向第一个datanode发送packet这一过程中上游节点同时发送下一个packet,当 block
传输完成,DN
们各自向 NN
汇报,同时 Client
继续传输下一个 block
所以,Client
的传输和 block
的汇报也是并行的
7. block 副本放置策略
1.x
- 第一个副本:放置在上传文件的
DataNode
;如果时集群外提交,则随机挑选一台磁盘不太满,CPU 不太忙的节点。 - 第二个副本:放置在于第一个副本不同的机架的节点上。
- 第三个副本:与第一个副本相同机架的节点。
- 更多副本:随机节点
可能产生的问题是前两个副本在同一机架当机架出现问题时会丢失两个副本
2.x
- 第一个副本:放置在上传文件的
DataNode
;如果时集群外提交,则随机挑选一台磁盘不太满,CPU 不太忙的节点。 - 第二个副本:放置在于第一个副本不同的机架的节点上。
- 第三个副本:与第二个副本相同机架的节点。
- 更多副本:随机节点
了解详细可以访问我另一个博客:hdfs详细
https://blog.csdn.net/qq_43259670/article/details/105882983
Hive 常见面试题
1.简述Hive工作原理
1、执行查询:Hive接口,命令行或 web UI发送查询驱动程序
2、get Plan:驱动程序查询编译器
3、词法分析/语法分析
4、语义分析
5、逻辑计划产生
6、逻辑计划优化
7、物理计划生成
8、物理计划优化
9、物理计划执行
10、查询结果返回
提示:以上是hive的大致工作原理流程,一般面试问到这里就算比较深入了
2.hive 内部表和外部表区别
创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径, 不对数据的位置做任何改变。
删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
提示:内部表与外部表的区别一定要掌握,通常情况下我们都会使用外部表保证数据安全性,但是像中间表,结果表这种我们就会考虑使用内部表(管理表)
3.分区和分桶的区别
分区
是指按照数据表的某列或某些列分为多个区,区从形式上可以理解为文件夹,比如我们要收集某个大型网站的日志数据,一个网站每天的日志数据存在同一张表上,由于每天会生成大量的日志,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。
那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找
分桶
分桶是相对分区进行更细粒度的划分。
分桶将整个数据内容安装某列属性值得hash值进行区分,如要按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。
如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件
总结:分区就是在hdfs上分目录(文件夹),分桶就是分文件。
4.将数据直接上传到分区目录(hdfs)上,让分区表和数据产生关联有哪些方式?
方案一:上传数据后修复表
dfs -mkdir -p 分区目录
dfs -put 分区目录
msck repair table 表名
方案二:上传数据后添加分区
dfs -mkdir -p 分区目录
dfs -put 分区目录
alter table 表名 add partition();
提示:这里我们如果直接将新的分区文件上传到hdfs上,因为hive没有对应的元数据所以是无法查询到数据的,所以我们要进行表修复或者添加分区。
5.桶表是否可以通过直接load将数据导入?
不可以,因为load数据的话hdfs下只会有一个文件无法完成分桶的效果,分桶和mapredue中的分区是一样的道理,所以我们要借助中间表导入数据。
6.order by,sort by,distribute by,cluster by的区别?
-
order by会对所给的全部数据进行全局排序 ,不管来多少数据,都只启动一个reducer来处理 。
-
sort by是局部排序 ,sort by会根据数据量的大小启动一到多个reducer来干活,并且,它会在进入reduce之前为每个reducer都产生一个排序文件 。
-
distribute by 控制map结果的分发,它会将具有相同字段的map输出分发到一个reduce节点上做处理 。
-
cluster by 可以理解为一个特殊的distribute by和sort by的结合,当distribute by和sort by后面所跟的列名相同时,就等同于直接使用cluster by 跟上该列名。但是被cluster by指定的列最终的排序结果只能是降序,而且无法指定asc和desc。
提示:这个问题面试问的频率很高,大家一定要注意区分以下。
7.聚合函数是否可以写在order by后面,为什么?
不可以。
原因:执行顺序!!!order by的执行顺序在select之后,所以需使用重新定义的列名进行排序。
提示:理解sql的执行顺序更加有利于大家写sql
(1)from
(2)join
(3)on
(4)where
(5)select
(6)group by
(7)having
(8)order by
(9)limit
8.导致数据倾斜的原因有哪些,有什么解决的方案?
什么是数据倾斜?
数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据倾斜。
针对mapreduce的过程来说就是,有多个reduce,其中有一个或者若干个reduce要处理的数据量特别大,而其他的reduce处理的数据量则比较小,那么这些数据量小的reduce很快就可以完成,而数据量大的则需要很多时间,导致整个任务一直在等它而迟迟无法完成。
跑mr任务时常见的reduce的进度总是卡在99%,这种现象很大可能就是数据倾斜造成的。
造成数据倾斜的原因
比如某些业务数据作为key的字段本就很集中,那么结果肯定会导致数据倾斜啊。
还有其他的一些原因,但是,根本原因还是key的分布不均匀,而其他的原因就是会造成key不均匀,进而导致数据倾斜的后果,所以说根本原因是key的分布不均匀。
既然有数据倾斜这种现象,就必须要有数据倾斜对应的处理方案啊。
简单地说数据倾斜这种现象导致的任务迟迟不能完成,耗费了太多时间,极大地影响了性能,所以我们数据倾斜的解决方案设计思路就是往如何提高性能,即如何缩短任务的处理时间这方面考虑的,而要提高性能,就要让key分布相对均衡,所以我们的终极目标就是考虑如何预处理数据才能够使得它的key分布均匀。
解决办法
1 合理设置Map数
-
1) 通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。 举例: a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数。 b) 假设input目录下有3个文件a,b,c大小分别为10m,20m,150m,那么hadoop会分隔成4个块(10m,20m,128m,22m),从而产生4个map数。即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
-
2) 是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
-
3) 是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。 针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
2 小文件合并
-
在map执行前合并小文件,减少map数:
-
CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)
set mapred.max.split.size=112345600; set mapred.min.split.size.per.node=112345600; set mapred.min.split.size.per.rack=112345600; set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。
3 复杂文件增加Map数
-
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
-
增加map的方法为
- 根据 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))公式
- 调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
mapreduce.input.fileinputformat.split.minsize=1 默认值为1 mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue因此,默认情况下,切片大小=blocksize maxsize(切片最大值): 参数如果调到比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。 minsize(切片最小值): 参数调的比blockSize大,则可以让切片变得比blocksize还大。
- 例如
--设置maxsize大小为10M,也就是说一个fileSplit的大小为10M set mapreduce.input.fileinputformat.split.maxsize=10485760;
4 合理设置Reduce数
-
1、调整reduce个数方法一
-
1)每个Reduce处理的数据量默认是256MB
set hive.exec.reducers.bytes.per.reducer=256000000;
-
2)每个任务最大的reduce数,默认为1009
set hive.exec.reducers.max=1009;
-
3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
-
-
2、调整reduce个数方法二
--设置每一个job中reduce个数 set mapreduce.job.reduces=3;
-
3、reduce个数并不是越多越好
-
过多的启动和初始化reduce也会消耗时间和资源;
-
同时过多的reduce会生成很多个文件,也有可能出现小文件问题
-
总的来说就是,数据倾斜的根源是key分布不均匀,所以应对方案要么是从源头解决(不让数据分区,直接在map端搞定),要么就是在分区时将这些集中却无效的key过滤(清洗)掉,或者是想办法将这些key打乱(给key加上标签),让它们进入到不同的reduce中。
9. Hive的四种排序
order by
order by 会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)
只有一个reducer,会导致当输入规模比较大时,需要较长的时间。
set hive.mapred.mode=nonstrict; (default value / 默认值)
set hive.mapred.mode=strict;
order by 和数据库中的order by功能一致按照某一项&几项排序输出。
与数据库中order by的区别在于hive.mapred.mode = strict模式下 必须指定limit否则执行会报错
原因:在order by状态下所有的数据都会到一台服务器进行reduce操作也就是只有一个reduce, 如果在数据量大的情况下会出现无果的情况,如果进行limit n,那只有n * map
number 条记录而已。只有一个reduce也可以出来里过来
sort by
sort by不是全局排序,其在数据进入reducer前完成排序
因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只保证每个reducer 的输出有序,不保证全局有序。
sort by 不受hive.mapred.mode是否为strict,nostrict的影响。
sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。
使用sort by你可以指定执行的reduce个数(set mapred.reduce.tasks=),对输出的 数据在执行归并排序,即可以得到全部结果。
注意:可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录就 减少到n*(map个数)。否则由于数据过大可能出不了结果。
distribute by
按照指定的字段对数据进行划分到不同的输出reduce / 文件中。
insert overwrite local directory ‘/home/hadoop/out’ select * from test order by name
distribute by length(name);
此方法会根据name的长度划分到不同的reduce中,最终输出到不同的文件中。
length 是内建函数,也可以指定其他的函数或这使用自定义函数。
Cluster By
cluster by 除了具有 distribute by 的功能外还兼具 sort by 的功能。
但是排序只能是倒序排序,不能指定排序规则为asc 或者desc。
10. Hive的分区分桶
我们发现其实桶的概念就是MapReduce的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。
而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件。通过文件夹可以查询里面存放的文件。但文件夹本身和数据的内容毫无关系。
桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。 这些小文件可以单独排序。如果另外一个表也按照同样的规则分成了一个个小文件。两个表join的时候,就不必要扫描整个表,只需要匹配相同分桶的数据即可。效率当然大大提升。
同样,对数据抽样的时候,也不需要扫描整个文件。只需要对每个分区按照相同规则抽取一部分数 据即可。
• 分区表
如果在建表时使用了 PARTITIONED BY,表即为分区表。分区表下的数据按分区键的值(或值的范围)放在HDFS下的不同目录中,可以有效减少查询时扫描的数据量,提升查询效率。
• 非分区表
非分区表即除分区表之外的表。
按表是否分桶分类
按表是否分桶可以将表分为两类:分桶表和非分桶表。
• 分桶表
如果在建表时使用了 CLUSTERED BY … INTO … BUCKETS,表即为分桶表。分桶表下的数据按
分桶键的哈希值放在HDFS下的不同目录中,可以有效减少查询时扫描的数据量,提升查询效率。
• 非分桶表
非分桶表即除分桶表之外的表
11. hive中分区可以提高查询效率,分区是否越多越好,为什么?
1.hive如果有过多的分区,由于底层是存储在HDFS上,HDFS上只用于存储大文件 而非小文件,因为过多的分区会增加namenode的负担。
2.hive会转化为mapreduce,mapreduce会转化为多个task。过多小文件的话,每个文件一个task,每个task一个JVM实例,JVM的开启与销毁会降低系统效率。
注意:合理的分区不应该有过多的分区和文件目录,并且每个目录下的文件应该足够大
12. hive 调优
12.1 hive-site.xml参考
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://bd01:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<!--开启 spark 引擎-->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<!-- 合并 block 减少 task 数量-->
<property>
<name>ngmr.partition.automerge</name>
<value>true</value>
</property>
<!-- 表示将 n 个 block 安排给单个线程处理。 -->
<property>
<name>ngmr.partition.mergesize.mb</name>
<value>3</value>
</property>
<!-- 开启小文件合并-->
<property>
<name>hive.merge.sparkfiles</name>
<value>true</value>
</property>
<!-- 开启小文件合并-->
<property>
<name>hive.map.agg</name>
<value>true</value>
</property>
<!-- 使用向量化查询-->
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<!-- cbo可以优化hive的每次查询-->
<property>
<name>hive.cbo.enable</name>
<value>true</value>
</property>
<property>
<name>hive.stats.fetch.column.stats</name>
<value>true</value>
</property>
<property>
<name>hive.stats.fetch.partition.stats</name>
<value>true</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>true</value>
</property>
<!-- 开启数据压缩-->
<property>
<name>hive.exec.compress.intermediate</name>
<value>true</value>
</property>
<property>
<name>hive.exec.compress.output</name>
<value>true</value>
</property>
<!-- 使简单的sql语句不执行spark引擎-->
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
</property>
<!-- 有数据倾斜的时候进行负载均衡
group by操作是否允许数据倾斜,默认是false,当设置为true时,执行计划会生成两个map/reduce作业,第一个MR中会将map的结果随机分布到reduce中,达到负载均衡的目的来解决数据倾斜,
-->
<property>
<name>hive.groupby.skewindata</name>
<value>true</value>
</property>
<!-- 列裁剪,默认开启true,在做查询时只读取用到的列,这个是个有用的优化;-->
<property>
<name>hive.optimize.cp</name>
<value>true</value>
</property>
<!--jvm重用-->
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is no limit.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
</configuration>
hive cli 中设置调优参数
// 合并 block 减少 task 数量
set ngmr.partition.automerge = true;
// jvm 重用
set mapreduce.job.jvm.numtasks=10;
// 表示将 n 个 block 安排给单个线程处理。
set ngmr.partition.mergesize.mb =3;
// 开启小文件合并
set hive.merge.sparkfiles = true;
// 开启小文件合并
set hive.map.agg = true;
// 使用向量化查询
set hive.vectorized.execution.enabled = true;
// cbo可以优化hive的每次查询
set hive.cbo.enable = true;
set hive.stats.fetch.column.stats = true;
set hive.stats.fetch.partition.stats = true;
set hive.compute.query.using.stats = true;
// 开启数据压缩
set hive.exec.compress.intermediate = true;
set hive.exec.compress.output = true;
// 有数据倾斜的时候进行负载均衡group by操作是否允许数据倾斜,默认是false,当设置为true时,执行计划会生成两个map/reduce作业,第一个MR中会将map的结果随机分布到reduce中,达到负载均衡的目的来解决数据倾斜,
set hive.groupby.skewindata = true;
// 列裁剪,默认开启true,在做查询时只读取用到的列,这个是个有用的优化;
set hive.optimize.cp = true;
12.2 数据压缩
1、数据的压缩说明
-
压缩模式评价
- 可使用以下三种标准对压缩方式进行评价
- 1、压缩比:压缩比越高,压缩后文件越小,所以压缩比越高越好
- 2、压缩时间:越快越好
- 3、已经压缩的格式文件是否可以再分割:可以分割的格式允许单一文件由多个Mapper程序处理,可以更好的并行化
- 可使用以下三种标准对压缩方式进行评价
-
常见压缩格式
压缩方式 | 压缩比 | 压缩速度 | 解压缩速度 | 是否可分割 |
---|---|---|---|---|
gzip | 13.4% | 21 MB/s | 118 MB/s | 否 |
bzip2 | 13.2% | 2.4MB/s | 9.5MB/s | 是 |
lzo | 20.5% | 135 MB/s | 410 MB/s | 是 |
snappy | 22.2% | 172 MB/s | 409 MB/s | 否 |
- Hadoop编码/解码器方式
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
Gzip | org.apache.hadoop.io.compress.GzipCodec |
BZip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compress.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
压缩性能的比较
压缩算法 | 原始文件大小 | 压缩文件大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
2、压缩配置参数
要在Hadoop中启用压缩,可以配置如下参数(mapred-site.xml文件中):
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在core-site.xml中配置) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.Lz4Codec | 输入压缩 | Hadoop使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress | false | mapper输出 | 这个参数设为true启用压缩 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 | 使用LZO、LZ4或snappy编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress | false | reducer输出 | 这个参数设为true启用压缩 |
mapreduce.output.fileoutputformat.compress.codec | org.apache.hadoop.io.compress. DefaultCodec | reducer输出 | 使用标准工具或者编解码器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type | RECORD |
12.3 文件压缩
Hive支持的存储数的格式主要有:TEXTFILE(行式存储) 、SEQUENCEFILE(行式存储)、ORC(列式存储)、PARQUET(列式存储)。
1、列式存储和行式存储
上图左边为逻辑表,右边第一个为行式存储,第二个为列式存储。
行存储的特点: 查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。select *
列存储的特点: 因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。 select 某些字段效率更高
TEXTFILE和SEQUENCEFILE的存储格式都是基于行存储的;
ORC和PARQUET是基于列式存储的。
2 、TEXTFILE格式
默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。
3 、ORC格式
Orc (Optimized Row Columnar)是hive 0.11版里引入的新的存储格式。
可以看到每个Orc文件由1个或多个stripe组成,每个stripe250MB大小,这个Stripe实际相当于RowGroup概念,不过大小由4MB->250MB,这样能提升顺序读的吞吐率。每个Stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer:
一个orc文件可以分为若干个Stripe
一个stripe可以分为三个部分
indexData:某些列的索引数据
rowData :真正的数据存储
StripFooter:stripe的元数据信息
1)Index Data:一个轻量级的index,默认是每隔1W行做一个索引。这里做的索引只是记录某行的各字段在Row Data中的offset。
2)Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
3)Stripe Footer:存的是各个stripe的元数据信息
每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。
4 、PARQUET格式
Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache*项目。
Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。
通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如下图所示。
上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。
存储文件的查询速度测试:
1)TextFile
hive (default)> select count(*) from log_text;
_c0
100000
Time taken: 21.54 seconds, Fetched: 1 row(s)
2)ORC
hive (default)> select count(*) from log_orc;
_c0
100000
Time taken: 20.867 seconds, Fetched: 1 row(s)
3)Parquet
hive (default)> select count(*) from log_parquet;
_c0
100000
Time taken: 22.922 seconds, Fetched: 1 row(s)
存储文件的查询速度总结:
ORC > TextFile > Parquet
Spark 常见面试题
1. 说出几个常见的rdd算子?
1.1 transformation算子
map,filter,flatmap,groupbykey,repartition
1.2 action 算子
reduce,collect,count,take,saveAsTextFile
2. rdd是什么?rdd的创建方式有几种?分别是怎么分区的?
2.1 rdd是什么?
rdd即弹性分布式数据集,是spark中最基本的数据抽象,它代表一个不可变的、可分区的内部元素可并行计算 的集合。
2.2 rdd的创建方式
使用makeRDD通过集合创建。由本地核数来决定分区数量
使用外部数据源创建如hdfs。由block的数量来决定的,通常默认为2个分区最低也是2个。
由另一个rdd得出的结果创建,即转换时创建。根据父rdd的 reduceTask数量
3. RDD的依赖关系
3.1 宽依赖:多个子RDD的分区依赖同一个父RDD的Partition
3.2 窄依赖:每一个父RDD的Partition最多被子RDD的 一个Partition使用
3.3 为什么要划分依赖关系?
简单的说,有效分别各个算子之间的关系有利于生成dag图形,以及程序运行过程中产生的多次依赖变化的监察,窄依赖就是将依赖较为单一依赖视为一种方式,如果将宽窄依赖混为一谈,处理、区分、运行都会导致效率的分配不均。
4. task,stage,job分别是什么?
4.1 task
即 stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition,就 会有多少个 task,因为每一个 task 只是处理一个partition 上的数据。
4.2 stage
一个Job会被拆分为多组Task,每组任务被称为一个stage,每一次数据的shuffle都会产生一个stage。
4.3 job
每触发一次action操作就会生成一个job,
4.4 为什么要划分 stage?
由于划分完 stage 之后,在同一个stage中只有窄依赖,没有宽依赖,可以实现流水线计算,stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task。
5. RDD的缓存持久化机制?
主要通过cache,persist,checkpoint来实现RDD 的缓存持久化机制
5.1 cache与persist:
- cache默认将数据存储在内存中,底层实现是persist
- persist定义了相关多种的数据存储策略,如多副本,磁盘,内存等,将数据存储在内存中就会产生相应的oom内存溢出问题,以及内存如果不够数据放置,并不能保证数据准确,安全性。
- 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失
- 后续要触发 cache 和 persist 持久化操作,需要有一个action操作
- 它不会开启其他新的任务,一个action操作就对应一个job
5.2 checkpoint:
- 可以把数据持久化写入到hdfs上
- 后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作
- 它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。程序运行完成后对应的checkpoint数据就不会消失
6. spark常见调优
6.1 修改序列化机制有效压缩数据量,通过使用Kryo优化序列化性能
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer ")
6.2 在实际的生产环境中,提交spark任务时,使用spark-submit shell脚本,在里面调整对应的参数。
提交任务的脚本:
spark-submit \
--master spark://node1:7077 \
--class com.kaikeba.WordCount \
--num-executors 3 \ 配置executor的数量
--driver-memory 1g \ 配置driver的内存(影响不大)
--executor-memory 1g \ 配置每一个executor的内存大小
--executor-cores 3 \ 配置每一个executor的cpu个数
/export/servers/wordcount.jar
6.3 提高并行度
-
设置task的数量
spark.defalut.parallelism
默认是没有值的,如果设置了值为10,它会在shuffle的过程才会起作用。
比如 val rdd2 = rdd1.reduceByKey(+)
此时rdd2的分区数就是10
可以通过在构建SparkConf对象的时候设置,例如:
new SparkConf().set("spark.defalut.parallelism","500")
-
给RDD重新设置partition的数量
使用rdd.repartition 来重新分区,该方法会生成一个新的rdd,使其分区数 变大。
此时由于一个partition对应一个task,那么对应的task个数越多,通过这 种方式也可以提高并行度。
-
通过设置参数sql.shuffle.partitions=500 默认为200;
可以适当增大,来提高并行度。 比如设置为 spark.sql.shuffle.partitions=500
6.4 RDD 的重用和持久化
- 适当的重用和持久化RDD能有效减少同一段代码重复与调用的次数,从而提高 程序的运行效率和性能,通常使用persist,cache方法。
- 在持久化过程中适当的使用序列化可减少数据的size从而降低内存和cpu的计 算耗时和存储空间。
- 序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可 以减少占用的空间和便于网络传输。
- 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式, 内存+磁盘的普通方式(无序列化)。
- 为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化
持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;
持久化的每个数据单元,存储一份副本,放在其他节点上面,从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足。
比如: StorageLevel.MEMORY_ONLY_2
6.5 适当将那些多次使用变量广播出去
比如一个任务需要50个executor,1000个task,共享数据为100M。
- 在不使用广播变量的情况下,1000个task,就需要该共享数据的1000个副本, 也就是说有1000份数需要大量的网络传输和内存开销存储。耗费的内存大小 1000*100=100G.
- 使用了广播变量后,50个executor就只需要50个副本数据,而且不一定都是 从Driver传输到每个节点,还可能是就近从最近的节点的executor的 blockmanager上拉取广播变量副本,网络传输速度大大增加;内存开销 50*100M=5G
- 如何使用?
1.通过sparkContext的broadcast方法把数据转换成广播变量,类型为Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) sc.broadcast(Array(1,2,3,4,5,6)) 2.然后executor上的BlockManager就可以拉取该广播变量的副本获取具体的数据。 获取广播变量中的值可以通过调用其value方法 val array: Array[Int] = broadcastArray.value
总结:
不使用广播变量的内存开销为100G,使用后的内存开销5G,这里就相差了20 倍左右的网络传输性能损耗和内存开销,使用广播变量后对于性能的提升和影响,还是很可观的。
广播变量的使用不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。
注意事项:
- 能不能将一个RDD使用广播变量广播出去?不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果executor端用到了Driver的变量,如果不使用广播变量在Executor 有多少task就有多少Driver端的变量副本。
- 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
6.6 尽量避免shuffle
有效的避免shuffle可以减少网络间io和各个分区之间传输。
如何避免?
案例:
//错误的做法:
// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
//正确的做法:
// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
6.7 使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。
所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。
map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需 要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。
通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来 替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户 自定义的函数对每个节点本地的相同key进行预聚合。
而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分 发和传输,性能相对来说比较差。
比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单 词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚 合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。
groupByKey进行单词计数原理:
reduceByKey单词计数原理:
6.8 使用高性能的算子
6.8.1 使用reduceByKey/aggregateByKey替代groupByKey
reduceByKey/aggregateByKey 可以进行预聚合操作,减少数据的传输量,提升性能
groupByKey 不会进行预聚合操作,进行数据的全量拉取,性能比较低
6.8.2 使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不 是一次函数调用处理一条,性能相对来说会高一些。
但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函 数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法 回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
6.8.3 使用foreachPartitions替代foreach
原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比 如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。
6.8.4 使用filter之后进行coalesce操作
通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。
因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。
因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition 之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于 性能的提升会有一定的帮助。
6.8.5 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需
要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。
因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
6.8.6 使用fastutil优化数据格式
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、 HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set.
Spark中应用fastutil的场景和使用
算子函数使用了外部变量
1.你可以使用Broadcast广播变量优化;
2.可以使用Kryo序列化类库,提升序列化性能和效率;
3.如果外部变量是某种比较大的集合,那么可以考虑使用fastutil改写外部 变量;
首先从源头上就减少内存的占用(fastutil),通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。
算子函数里使用了比较大的集合Map/List
在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中,出现,要创建比较大的Map、List等集合,
可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作;那么此时,可以考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类以后,就可以在一定程度上,减少task创建出来的集合类型的内存占用。
避免executor内存频繁占满,频繁唤起GC,导致性能下降。
fastutil的使用
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
第二步:平时使用List (Integer)的替换成IntList即可。
List的list对应的到fastutil就是IntList类型
使用说明:
基本都是类似于IntList的格式,前缀就是集合的元素类型;
特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。
reduce,collect,count,take,saveAsTextFile
-------------------------------------------end----------------------------------------------------
文章部分引用
开课吧大数据开发工程师课程如侵权请联系删除。
https://mkt.kaikeba.com/vipcourse/bde
上一篇: 苹果也要搞万物互联!homeOS剑指华为HarmonyOS
下一篇: Linux中断及处理方式