《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》学习笔记
一.读后感
最近读完了《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》英文第4版,个人感觉这本书是hadoop目前最权威、最全面、最靠谱的书籍,强烈建议大家好好研读。不建议大家去读hadoop权威指南第1版、第2版和第3版,第3版我也看完了,但是里面的知识已经与当前Apache hadoop 2.X严重脱节,比如第3版还在大篇幅的讲解hadoop1.X的jobtracker和tasktracker,这早就被抛弃。
《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》全书从五个部分向读者诠释了hadoop相关知识,第一部分讲解了hadoop的基础知识(mapreduce、HDFS、YARN),第二部分详细讲解了mapreduce,第三部分讲解了hadoop集群的搭建管理和监控,第四部分讲解了hadoop生态圈一系列相关的子项目(Avro Parquet Flume Sqoop Pig Hive Crunch Spark Hbase Zookeeper),第五部分讲解了实战案例。给我感触最深的是第一部分中的YARN,从这里我学习到了FIFO CAPACITY FAIR3种调度器的详细知识,第二部分的mapreduce使我在对mapreduce进行优化时候有了明确的思路不至于那么迷茫,第三部分使我学会了如何站在公司的角度去思考搭建公司级hadoop要考虑的硬件和软件要求,hadoop管理和监控要求,总之,这本书值得购买,值得花时间去读,值得推荐给想了解hadoop又不知选啥资料的国人。
国人也在出hadoop相关书籍,和《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》比起来,大部分可以说是一本文字垃圾,要么片面,要么不够深入,比如一本书就只讲一点YARN解析,另一本书讲mapreduce解析,不免让人觉得这在浪费广大读者的money,读者被动的接受着作者在金钱利益驱动下写下的垃圾,强烈建议这些作者好好学习下《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》,建议出书的作者知识积累丰富点,所讲解的内容尽量囊括在一本书中(不要分成那么多子系列),建议作者要有持续更新版本的忍耐力,建议出书的作者资质再老点,不要是个人弄了几下hadoop就出书,国人很多技术书甚至作者的年龄都在20-35岁,不免让人觉得这些书籍是否可靠,也很少看到这些数据在随着时间的推移过程中持续更新,出第1版,第2版,第3版......可以说很多作者的书籍在第一版本发布后,就永远没了下文,随着时间推移,只会被淘汰。
看了很多hadoop相关的书籍,收获最大的就是《Hadoop.The.Definitive.Guide.4th.Edition.2015.3》,有感而发,在使用hadoop这条路上走过很多弯路,踩过很多坑,对于参考资料,只希望少一点垃圾,多一点经典。
二.HDFS相关问题
1.【HDFS】为何blocksize=128MB?是否需要调大?
答:读取HDFS的block需要做两件事,一是找到block在哪里,也就是寻址,这里消耗的时间为寻址时间,比如需要10毫秒;二是将这个block对应的数据传输到客户端,这里消耗的时间为磁盘传输时间,比如磁盘数据传输速度为128MB/秒,那么128MB数据需要1秒,也即1000毫秒。现在来做个试验,如下表:
文件大小 | blocksize | 寻址时间(10ms/block) | 传输时间(128MB/s) | 总计耗时 |
128MB | 128MB | 10ms/block*1block | (128MB*1block/(128MB/s))*1block | 1010毫秒 |
128MB | 1.28MB | 10ms/block*100block | (1.28MB*1block/(128MB/s))*100block | 2000毫秒 |
上面试验表明:在blocksize比较小的时候,频繁的在进行block的寻址操作,寻址操作越多,所消耗的时间就越多,最终总计耗时越大。试验充分说明了一个问题,要调大blocksize,设置多大好呢?hadoop目前推荐的设置是,寻址时间/传输时间=1%,所以官网上推荐的默认值为128MB,第一个问题回答完毕。
随着磁盘硬件性能的提升,你可以更改这个blocksize,让它大于128MB,比如256MB,不过不建议?这里说下我的理由,目前hadoop中很多地方都是据blocksize=128MB值之上对其他参数进行了默认设置,比如mapreduce.task.io.sort.mb=100MB,splitsize=128MB,hbase中memstore=128MB,这些与blocksize=128MB是有关的,当你变动了blocksize,这些参数你也要相应调整,除非你是hadoop高级掌握者,否则不要轻易的去改动。
2.【HDFS】HDFS中namenode维护什么数据?数据是固定的么?
答:namenode维护两类数据:(1)namespace image(2)edit log
来看下hdfs-site.xml中配置的dfs.namenode.name.dir目录的文件结构:
${dfs.namenode.name.dir}/ ├── current │ ├── VERSION │ ├── edits_0000000000033667872-0000000000033668028 │ ├── ... │ ├── edits_0000000000034678717-0000000000034678900 │ ├── edits_0000000000034678901-0000000000034679140 │ ├── edits_0000000000034679141-0000000000034679336 │ ├── edits_inprogress_0000000000034679337 │ ├── fsimage_0000000000034668015 │ ├── fsimage_0000000000034668015.md5 │ ├── fsimage_0000000000034675376 │ ├── fsimage_0000000000034675376.md5 │ └── seen_txid └── in_use.lock
- edit log文件名为edits_${start_transaction_ID}-${end_transaction_ID}
- 正在进行中的edit log文件名为edits_inprogress_${inprogress_transaction_ID}
- 上面记录的是start_transaction_ID=0000000000034678717,end_transaction_ID=0000000000034678900之间的edit log
- edits_inprogress_0000000000034679337记录的是inprogress_transaction_ID=0000000000034679337的正在进行中的edit log。
- namespace image文件名为fsimage_${merged_transaction_ID}
- fsimage_0000000000034675376说明当前已经完成对于transaction_ID=0000000000034675376的edit log合并后的fsimage
开始第二个问题,namenode维护的逻辑文件系统及其block分布信息不是固定不变的,信息是在变化的,也不会写死,里面的block分布信息在各个datanode启动时候会主动向namenode上报,随着datanode的关闭也会动态更新。
3.【HDFS】HDFS中dfs.namenode.name.dir、dfs.datanode.data.dir、dfs.namenode.checkpoint.dir、dfs.journalnode.edits.dir哪些需要设置多个?各自作用?
配置项 | 使用者 | 单节点是否设置多个目录 | 作用 |
dfs.namenode.name.dir | namenode | 可以多个 |
每个目录是一份独立完整数据, 多个目录就是多份完整独立数据COPY |
dfs.datanode.data.dir | datanode | 可以多个 |
每个目录中只会存储所有文件数据中的某一部分,所有节点所有目录中才构成一份完成数据的默认3份COPY |
dfs.namenode.checkpoint.dir | secondary namenode | 可以多个 |
每个目录是一份独立完整数据, 多个目录就是多份完整独立数据COPY |
dfs.journalnode.edits.dir | journalnode | 只能一个 | 存储的是此journalnode的状态和一部分edit log信息,这份edit log 在大多数其他journalnode上也有备份 |
4.【HDFS】HDFS中namenode+secondarynamenode的冷切换加载过程?HDFS中HA(Active namenode+Standby namenode)热切换和冷切换过程?
答:HDFS中namenode+secondarynamenode的冷切换加载过程:
- (1)namenode宕机
- (2)重新启动namenode或者通过secondarynamenode修复namenode然后启动namenode
- (3)将fsimage加载到内存
- (4)读取未合并的edit log,进行整合合并到fsimage
- (5)接收尽可能多的datanode上报block信息,最终退出safemode,完成namenode冷启动
HDFS中HA(Active namenode+Standby namenode)热切换过程:
- (1)active namenode停止
- (2)ZKFC发现namenode停止,将standby namenode切换为active namenode,完成热切换
HDFS中HA(Active namenode+Standby namenode)冷切换过程(效果比namenode+sencondarynamenode冷切换要好,因为journalnode上edit log不需加载):
- (1)active namenode和standby namenode都停止了
- (2)手动启动active namenode和standby namenode
- (3)将fsimage加载到内存
- (4)读取journalnode未合并的edit log,进行整合合并到fsimage
- (5)接收尽可能多的datanode上报block信息,最终退出safemode,完成namenode冷启动
5.【HDFS】实现HDFS HA的QJM和NFS方式对比?
答:
HDFS HA方式 | 启动 | 配置 | edit log |
QJM | 无差别 | 用journalnode存储edit log | 只有一个namenode可写edit log,容易控制 |
NFS | 无差别 | 用NFS共享edit log | 比较难控制只有一个namenode写edit log |
6.【HDFS】HDFS HA方实现客户端如何实现namenode故障切换的透明化?
答:HDFS HA方式下,服务端配置的hadoop集群名字是个虚拟名称,目前客户端需要配置上这个虚拟名称以及这个名称后面对应的多个namenode节点,从而客户端自己通过检测来实现连接处于active状态的namenode,一定要配置给客户端的必要参数如下:
dfs.nameservices=mycluster dfs.client.failover.proxy.provider.mycluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider dfs.ha.namenodes.mycluster=nn1,nn2 dfs.namenode.rpc-address.mycluster.nn1=192.168.77.38:9000 dfs.namenode.rpc-address.mycluster.nn2=192.168.77.39:9000
7.【HDFS】HDFS中网络距离计算原则?
答:网络距离依次递增,规则如下:
- 同节点上的进程,距离为0
- 同一机架上的不同节点,距离为2
- 同一数据中心中不同机架上的节点,距离为4
- 不同数据中心中的节点,距离为6
8.【HDFS】HDFS读数据和写数据剖析
答:(1)读数据剖析:
- client从namenode获取block信息;
- client按照顺序一个个先后读取block数据,读取block数据时候,根据上面7中网络距离计算最优block位置进行读取
(2)写数据剖析
- client从namenode获取创建的outputstream
- client只负责写主block,主block负责写第一个备份block,第一个备份block负责写第二个备份block
- 第二个备份block写完毕后ACK到第一个备份block,第一个备份block然后ACK到主block,主block返回ACK给client
9.【HDFS】HDFS中远程拷贝数据distcp如何调优?
答:distcp本质是个mapreduce,使用时候最好指定map和reduce个数,这样效率才会高。
10.【HDFS】HDFS中如何实现数据均衡?
答:
- HDFS默认提供了脚本start-balance.sh启动一个后台进程来均衡各个节点数据
- 节点之间数据传递的带宽默认dfs.data,balance=1MB/s太小,建议设置为10MB/s~50MB/s之间,太大会占有网络资源,太小balance执行的时间会很长
- 判断数据是否均衡的依据是各个节点上的磁盘使用空间/该节点上总的HDFS磁盘空间都不能超过默认值10%,这个比例值可以修改
11.【HDFS】Hadoop支持哪些文件系统?
文件系统 | URI前缀 | hadoop的具体实现类 |
Local | file:// | fs.LocalFileSystem |
HDFS | hdfs:// | hdfs.DistributedFileSystem |
WebHDFS | webhdfs:// | hdfs.web.WebHdfsFileSystem |
Secure WebHDFS | swebhdfs:// | hdfs.web.SWebHdfsFileSystem |
HAR | har:// | fs.HarFileSystem |
View | viewfs:// | viewfs.ViewFileSystem |
FTP | ftp:// | fs.ftp.FTPFileSystem |
S3 | s3a:// | fs.s3a.S3AFileSystem |
Azure | wasb:// | fs.azure.NativeAzureFileSystem |
Swift | swift:// | fs.swift.snative.SwiftNativeFileSystem |
hadoop是提供了对于以上各种文件系统的默认实现,稍作配置即可使用上述文件系统,比如S3 Support in Apache Hadoop 基于Amazon S3的文件系统
HDFS只是hadoop实现的文件系统之一,默认dfs.webhdfs.enabled=true,是开启了WebHDFS,如果觉得不安全,可以设置dfs.webhdfs.enabled=false来禁止WebHDFS。
12.【HDFS】hadoop的文件压缩有啥好处?选择文件压缩格式要考虑哪些因素?支持的文件压缩格式有哪些?不同后缀的文件HDFS是如何编解码读取的?为什么Apache hadoop默认不支持LZO和SNAPPY?LZO对应的文件后缀就是.lzo吗?
答:(1)hadoop的文件压缩好处:
- 减少了存储文件所需的空间;
- 加速了网络、磁盘或磁盘的数据传输
(2)选择文件压缩格式考虑三个因素:
- 文件压缩比例
- 文件解压缩效率
- 该压缩格式的文件是否可被切分,不能切分直接影响mapreduce的map的个数,从而导致mapreduce效率低下
(3)支持的文件压缩格式有哪些:
默认Apache hadoop的安装文件提供了Bzip2、Gzip、DEFLATE的支持,只需配置mapreduce.map.output.compress和mapreduce.map.output.compress.codec即可使用,但是在Apache hadoop安装文件里并没有提供对于snappy和LZO的支持,这两个需要自己去安装整合,关于snappy和LZO的安装整合参见如下地址参见我的另外一篇博客hadoop 压缩 gzip biz2 lzo snappy
(4)不同后缀的文件HDFS是如何编解码读取的?
答:HDFS是通过文件后缀名来找编解码的,所以只要你的文件的后缀和相应的编码格式里的文件后缀匹配上了,那么HDFS就能知道用何种编解码,不过前提是对应的编码你首先都集成到hadoop中了,默认Apache hadoop能处理.deflate、.gz、.bz2后缀的文件,因为默认Apache hadoop就不支持LZO和SNAPPY,所以无法读取.snappy、.lzo和.lzo_deflate为后缀的文件,关于LZO和SNAPPY怎么集成到hadoop请参见我的另外一篇博客hadoop 压缩 gzip biz2 lzo snappy
(5)为什么Apache hadoop默认不支持LZO和SNAPPY?
答:Apache hadoop官网提供的hadoop版本本身就支持Bzip2、Gzip、DEFLATE3种压缩格式,不支持Snappy和LZO格式,原因是Snappy和LZO的代码库拥有GPL开源协议许可,而不是Apache开源协议许可,关于Snappy和LZO需要hadoop运维或者hadoop提供商自己集成。关于开源代码协议GPL、BSD、MIT、Mozilla、Apache和LGPL的区别详见下图:
(6)LZO对应的文件后缀就是.lzo吗?
答:hadoop是根据文件的后缀去寻找编解码用哪个,之前认为com.hadoop.compression.lzo.LzoCodec对应的后缀为".lzo"是错误的,查看了hadoop-lzo源代码发现".lzo"对应的编解码为com.hadoop.compression.lzo.LzopCodec,查看了源代码发现".lzo_deflate"对应的编解码为com.hadoop.compression.lzo.LzoCodec
13.【HDFS】hadoop支持的结构化文件类型有哪些?
答:hadoop支持的结构化文件类型有如下:
- SequenceFile,是一个由二进制序列化过的key/value的字节流组成的文本存储文件;
- MapFile,是排序后的SequenceFile两部分组成,分别是data和index;
- Apache Avro,是一个基于二进制数据传输高性能的中间件,是一个数据序列化的系统,可以将数据结构或对象转化成便于存储或传输的格式,用来支持数据密集型应用,适合于远程或本地大规模数据的存储和交换;
- Apache ORC,(OptimizedRC File)存储源自于RC(RecordColumnar File)这种存储格式,RC是一种列式存储引擎,对schema演化(修改schema需要重新生成数据)支持较差,而ORC是对RC改进,但它仍对schema演化支持较差,主要是在压缩编码,查询性能方面做了优化。RC/ORC最初是在Hive中得到使用,最后发展势头不错,独立成一个单独的项目。Hive 1.x版本对事务和update操作的支持,便是基于ORC实现的(其他存储格式暂不支持)。ORC发展到今天,已经具备一些非常高级的feature,比如支持update操作,支持ACID,支持struct,array复杂类型。你可以使用复杂类型构建一个类似于parquet的嵌套式数据架构,但当层数非常多时,写起来非常麻烦和复杂,而parquet提供的schema表达方式更容易表示出多级嵌套的数据类型;
- Apache Parquet, 最初的设计动机是存储嵌套式数据,比如Protocolbuffer,thrift,json等,将这类数据存储成列式格式,以方便对其高效压缩和编码,且使用更少的IO操作取出需要的数据,这也是Parquet相比于ORC的优势,它能够透明地将Protobuf和thrift类型的数据进行列式存储,在Protobuf和thrift被广泛使用的今天,与parquet进行集成,是一件非容易和自然的事情。 除了上述优势外,相比于ORC, Parquet没有太多其他可圈可点的地方,比如它不支持update操作(数据写成后不可修改),不支持ACID等
三.YARN相关问题
1.【YARN】YARN运行application的流程?
(1)ResourceManager进行资源分配时,采用了和HDFS相同的网络距离计算算法,尽量把block所在节点让NodeManager启动containner;
(2)执行的具体流程如下:
- client请求执行application
- ResourceManager分配一个节点NodeManager启动containner,这个containner执行一个Application Master;
- Application Master判定当前client提交的application为小任务,无需请求ResourceManager去分派NodeManager启动额外的containner来并发处理,只在当前Application Master所在containner里执行整个application;
- Application Master判定当前client提交的application不是小任务,就请求ResourceManager去分派NodeManager启动额外的containner来并发处理
(3)判定client提交的application为小任务的依据:
- application对应的job的map的个数<10,mapreduce.job.ubertask.maxmaps可以更改默认值;
- application对应的job的reduce个数为1,mapreduce.job.ubertask.maxreduces可以更改默认值;
- application对应的文件数据大小<blocksize128MB,mapreduce.job.ubertask.maxbytes可以更改默认值;
- 默认mapreduce.job.ubertask.enable=false,默认没有开启小任务,要开启必须设置为true
2.【YARN】基于YARN简化YARN的其他框架
答:
(1)Apache Slider 是一个 Yarn 应用,它可以用来在 Yarn 上部署并监控分布式应用。Slider 可以在应用运行期随意扩展或者收缩应用。目前它是 Apache 的孵化项目。参见董西城的Apache Slider—将已有服务或者应用运行在YARN上
(2)Apache Twill(官方首页:Apache Twill)这个项目则是为简化YARN上应用程序开发而成立的项目,该项目把与YARN相关的重复性的工作封装成库,使得用户可以专注于自己的应用程序逻辑。参见董西城的Apache Twill—YARN上应用程序开发包
3.【YARN】YARN与MapReduce1的比较
答:
(1)hadoop version<=2.5.1,使用job historyserver,默认MapReduce Application Master将Job history存储在HDFS上,保留的期限是7天,存储的路径通过mapreduce.jobhistory.done-dir设置;
(2)hadoop version> 2.5.1,使用timelineserver,historyserver仍然保留。application history server支持MapReduce作业,引入timeline server之后,Application History Server变成了Timeline Server的一个应用,history服务基于timeline store建立,history可以存储在内存中或者采用leveldb数据库存储,采用leveldb数据库存储可以保证history在timeline server 重启后仍会保留。
4.【YARN】YARN提供的三种调度?
答:
|
FIFO |
CAPACITY |
FAIR |
描述 |
每一个job占有所有资源,第二个job等第一个job执行完毕才开始执行 |
预先按照设定的queue资源比例为每个queue预留资源,一个job在queue对应的资源下执行,这个job所在queue不会占有所有资源,同一queue里面不同job仍然按照FIFO执行 |
各个queue按照预设的比重获得资源,当只有一个job时候,独占所有资源,不过也可以设置最大资源来限制独占所有资源,此时如果有第二个queue里启动了job,那么会通过延迟调度等待资源或者通过资源抢占来让第一个job腾出资源 |
特点 |
先进先出, 资源独占 |
不同组按比例分配资源, 不同组的比例之和为100%, 同组内按照FIFO处理, 只有一个组时也不会独占资源,存在资源浪费 |
不同组按照权重对应比例分配总资源, 只有一个组时可以独占资源, 多个组运行时,会腾出资源, 同组内可设置FAIR、FIFO、DRF |
Apache hadoop | 默认调度策略 | ||
Cloudera hadoop | 默认调度策略 | ||
指定queue | 通过mapreduce.job.queuename指定使用哪个queue |
默认通过规则来自己判定用哪个queue。 1)specified意思是按照mapreduce.job.queuename值判断用哪个queue 2)primaryGroup意思是按照client所在用户组去找queue 3)user意思是按照client所在用户去找queue |
|
配置文件 |
capacity-scheludel.xml |
fair-scheduler.xml 可修改yarn.scheduler.fair.allocation.file修改配置文件名字 |
|
资源抢占 |
FAIR支持资源抢占: 1)minum.share.preemption.timeout时间未获得最小资源要求,进行资源抢占; 通过defaultMinSharePreemptionTimeout设置所有queue超时; 通过minSharePreemptionTimeout设置单个queue超时 2)fair.share.preemption.timeout未获得资源要求的50%,进行资源抢占; 通过defaultFairSharePreemptionTimeout设置所有queue超时; 通过fairSharePreemptionTimeout设置单个queue超时; 通过defaultFairSharePreemptionThreshold设置所有queue最低资源要求比例(默认0.5); 通过fairSharePreemptionThreshold设置单个queue最低资源要求比例(默认0.5) |
||
延迟调度 |
可以设置延迟等待,以便等待最优的资源被分配来执行job(比如尽量离block最近)
yarn.scheduler.capacity.node-locality-delay设置一个正整数N,代表错过N次调度机会后放弃延迟等待 |
可以设置延迟等待,以便等待最优的资源被分配来执行job(比如尽量离block最近)
yarn.scheduler.fair.locality.threshold.node设置一个浮点数,代表等待直到这个比例的节点资源处于空闲就开始执行job
yarn.scheduler.fair.locality.threshold.rack设置正整数,代表当应用程序请求某个机架上资源时,它可以接受的可跳过的最大资源调度机会 |
|
DRF(Dominant Resource Fairness) |
同时考虑CPU和MEMORY两个资源,默认DRF为开启,默认只考虑MEMORE不考虑CPU
通过设置yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator来在容量调度开启DRF |
同时考虑CPU和MEMORY两个资源,默认DRF为开启,默认只考虑MEMORE不考虑CPU
通过设置fair-scheduler.xml中的defaultQueueSchedulingPolicy=drf来在公平调度里开启DRF |
四.MapReduce相关问题
1.【MapReduce】MapReduce的配置Configuration的加载规则?
答:(1)后加载的配置信息会覆盖之前加载的信息;
(2)加了Final的配置项不可被覆盖;
(3)配置文件中定义在前的属性可通过${属性name}在后面进行引用;
(4)可以使用System设置配置属性;
(5)可以使用JDK的-Dproterty=value设置配置属性。
2.【MapReduce】MapReduce的maven开发环境如何设置?MapReduce默认提供了哪些工具类方便开发调试?
答:(1)maven中添加hadoop-client依赖jar,该jar包含所有客户端与HDFS和MAPREDUCE相关的所有类;maven中添加hadoop-minicluster依赖jar,该jar支持在单个JVM中模拟hadoop集群进行测试
(2)ToolRunner是hadoop提供的工具类以减轻shell脚本的工作,它里面的使用的是工具类GernericOptionsParser作用是与hadoop命令行配置信息交互并将这行配置设置到Configuration对象上;
MRUnit可以用来在本地用断言的方式测试mapreduce程序,比如MRUnit.MapDriver可以构造map的输入信息。
<project> <modelVersion>4.0.0</modelVersion> <groupId>com.hadoopbook</groupId> <artifactId>hadoop-book-mr-dev</artifactId> <version>4.0</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.5.1</hadoop.version> </properties> <dependencies> <!-- Hadoop main client artifact --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- Unit test artifacts --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> <scope>test</scope> </dependency> <!-- Hadoop test artifact for running mini clusters --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <finalName>hadoop-examples</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.5</version> <configuration> <outputDirectory>${basedir}</outputDirectory> </configuration> </plugin> </plugins> </build> </project>
3.【MapReduce】MapReduce程序提交被运行时候的用户身份信息是什么?
答:
(1)默认hadoop是没有开启提交mapreduce程序的用户身份认证,因为hadoop.security.authorization=false
(2)默认hadoop的client所在用户组和用户信息会被作为参数传入到服务端与HDFS文件权限进行匹配
(3)如果client所在用户组和用户于HDFS权限体系不同,可通过HADOOP_USER_NAME或者hadoop.user.group.static.mapping.overides设置
(4)WebHdfs交互的用户权限可通过hadoop.http.staticuser.user设置
4.【MapReduce】MapReduce默认加载的配置文件的路径是啥?开发者如何知道哪些配置属性可被设置?开发者如何设置让hadoop优先加载自己使用的第三方jar
答:
(1)默认通过环境变量HADOOP_HOME和HADOOP_CONF_DIR寻找配置文件,另外一种方法是可以通过-conf指定配置文件;
(2)开发者可以通过工具类GernericOptionsParser类加载Configuration配置文件后,将所有配置信息打印出来,这是方法一;第二种方法是去hadoop官网进行查询
(3)开发者可以设置HADOOP_USER_CLASSPATH_FIRST=true和mapreduce.job.user.classpath.first=true来让hadoop优先加载用户自己的jar。
5.【MapReduce】MapReduce中Job、Task和AttemptTask的命名规则?
答:
(1)首先,所有在YARN resource manager上运行的application的命名规则是:application_${时间戳}_${第几个application程序},比如application_1410450250506_0003代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序;
(2)Job的命名规则是继承了部分application的信息,规则是:job_${时间戳}_${第几个application程序},比如job_1410450250506_0003代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序的job;
(3)Task的命令规则有继承了Job的信息,规则是:task_${时间戳}_${第几个application程序}_${m代表map,r代表reduce}_${第几个map程序或者第几个reduce程序},比如task_1410450250506_0003_m_000003代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序job里对应的第000003个map程序,task_1410450250506_0003_r_000001代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序job里对应的第000001个reduce程序;
(4)AttemptTask是在Task失败后尝试执行的任务,它的命名规则:attempt_${时间戳}_${第几个application程序}_${m代表map,r代表reduce}_${第几个map程序或者第几个reduce程序}_${尝试次数},比如attempt_1410450250506_0003_m_000003_0代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序job里对应的第000003个map程序失败后又被执行了一次
6.【MapReduce】hadoop提供哪些日志,这些日志放置在哪里?
日志类型 | 归属 | 描述 |
System daemon logs | Administrators |
(1)后台进程namenode、datanode、journalnode、zkfc、nodemanager、resoucemanager等产生的日志默认存放在HADOOP_HOME/logs,可以设置HADOOP_LOG_DIR改变日志目录位置,如果目录不存在hadoop会主动创建但必须要给权限; (2).log结尾的日志按天存储,所有日志不会主动删除,会永久保留; (3).out结尾的日志默认保留最近的5个,会自动清理过期的日志 |
HDFS audit logs | Administrators | 记录所有的HDFS的请求的日志,写入后台进程namenode的.log日志文件,默认是关闭的可以配置开启 |
MapReduce job history logs | Users | MapReduce Application Master将Job history存储在HDFS上,保留的期限是7天,存储的路径通过mapreduce.jobhistory.done-dir设置 |
MapReduce task logs | Users |
(1)记录每个Task相关程序产生的日志,日志有syslog stdout stderr3类 (2)存放的目录通过环境变量YARN_LOG_DIR设置,默认在linux机器的YARN_LOG_DIR/userlogs目录下面 |
7.【MapReduce】MapReduce的执行有哪些可供使用的工作流框架?
答:
(1)Apache oozie
(2)Apache Azkaban
8.【MapReduce】MapReduce Job如何进行优化?
答:通常的优化检查列表如下:
优化项 | 优化建议 |
map个数 |
(1)关注map个数以及每个map执行消耗的平均时间,使得每个map执行平均时间在一个合理范围(合理建议:map平均耗时在1分钟左右) (2)处理的数据文件的格式直接影响到map,不可分割的文件压缩格式比如SNAPPY使得大的数据文件无法被切分从而导致有些map消耗过多时间;合理的结构化文件比如Apache ORC可以提升hive的效率; (3)大量的小文件不仅增加寻址总耗时,也会加剧map任务量大小,建议是添加CombineFileInputFormat将小文件进行整合 |
reduce个数 |
(1)确保reduce个数大于1(合理建议:reduce平均耗时5分钟左右,每个reduce写的数据大小刚好一个blocksize=128MB大小) (2)reduce个数不能多不能少,个数的依据是尽量使得每个reduce刚好写一个blocksize=128MB大小的文件。 |
Combiners | (1)确保数据从map出来到达reduce之间使用了combiners来较少这之间shuffle传递的数据 |
Intermediate Compression |
(1)设置mapreduce中间值的压缩,减少了存储文件所需的空间,加速了网络、磁盘或磁盘的数据传输; (2)设置mapreduce.map.output.compress=true,默认mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaoultCodec,可以设置其它压缩格式比如LZO LZ4 Snappy |
Custom serialization | (1)自定义序列必须保证实现了RawComparator |
Shuffle tweaks |
(1)MapRduce的shuffle过程可以对一些内存管理的参数进行调整,以弥补性能的不足 (2)深入了解map端的shuffle过程,然后对其内存进行优化; (3)深入了解reduce端的shuffle过程,然后对其内存进行优化。 |
9.【MapReduce】MapReduce job参与实体有哪些?MapReduce job是如何运作的?
答:
(1)MapReduce job的参与实体有:
- Client 提交MapReduce job
- YARN resource manager 协调集群上计算资源的分配
- YARN node managers 负责集群中计算容器的启动和监视
- MapReduce application master 用于协调运行Map-Reduce job的MapReduce tasks。MapReduce application master和MapReduce tasks都运行在由resource manager分配由node managers管理的容器中。
- HDFS 用于job之间共享数据
(2)MapReduce job运作流程:
1)MapReduce job提交:client调用Job.submit()方法会产生一个JobSubmitter对象,然后调用JobSubmitter.submitJobInternal()提交Job,最后通过Job.waitForCompletion()每秒中获取Job的运行状态同时通知console状态变化,否则通知console错误日志信息。详细过程如下:
- 向YARN resource manager申请一个新的application ID,比如application_1410450250506_0003代表是在时间戳1410450250506经YARN resource manager运行的第0003个application程序,也即下图的“步骤2”
- 如果输出目录已经存在或者没有设置,MapReduce job不会被提交,直接产生一个错误通知client
- 如果输入数据不存在,那就无法计算输入数据切分信息,MapReduce job不会被提交,直接产生一个错误通知client
- 将MapReduce job相关的jar(默认在HDFS中的备份数mapreduce.client.submit.file.replication=10,以便nodemanager上的task共享MapReduce job的jar信息)、MapReduce job相关配置文件、输入数据切分信息拷贝到分布式文件系统HDFS上进行共享,也即下图的“步骤3”
- 调用submitApplicatioon()发送请求到YARN resource manager,也即下图中的“步骤4”
2)MapReduce job初始化
- YARN resource manager收到submitApplicatioon请求,传递给YARN scheduler获取一个YARN node manager下的container容器,然后YARN resource manager在这个容器containner里启动MapReduce application master,也即下图的“步骤5a和步骤5b”
- MapReduce application master对应的java主函数所在类class为MRAppMaster,MRAppMaster创建一系列跟踪对象来跟踪MapReduce job的工作进度,也即下图的“步骤6”
- MRAppMaster从分布式文件系统HDFS上获取输入数据切分信息,也即下图的“步骤7”,根据输入数据切分信息针对每个切分的block创建一个对应的map task(输入数据切分信息个数决定map task个数),根据方法setMumReduceTasks()设置的值或者mapreduce.job.reduces设置的值,创建对应个数的reduce task
- MRAppMaster判定MapReduce job是否为小任务【map的个数<mapreduce.job.ubertask.maxmaps的默认值10 && reduce个数为mapreduce.job.ubertask.maxreduces的默认值1 && 输入数据大小<blocksize的mapreduce.job.ubertask.maxbytes默认128MB,默认mapreduce.job.ubertask.enable=false,默认没有开启小任务,要开启必须设置为true】,在开启了小任务并且判定为小任务,那么MapReduce job直接在运行MapReduce application master的JVM对应的containner里运行,如果未开启小任务判断或者判定为非小任务,那么就行后续task分配
3)TASK分配
- 如果未开启小任务判断或者判定为非小任务,那么就行后续task分配,MapReduce application master向YARN resource manager请求执行map task或者reduce task任务所需分布在YARN node manager下的containner,也即下图的“步骤8”
- map task的优先级高于reduce task,map task最先被分配执行,sort task和shuffletask必须等待所有map task执行完毕后才可以开始,reduce task默认在5%的map task执行完毕后便开始启动执行。reduce task可以运行于任意节点之上,而map task则会尽量安排在block所在节点运行或者block所在rack 上运行,最后也有可能安排在其他rack运行。
- 上面的map task、sort task、shuffle task、reduce task的请求里也包含这对于CPU 和 MEMORY的特殊配置请求(默认mapreduce.map.memory.mb=1G,mapreduce.map.cpu.vcores=1 默认mapreduce.reduce.memory.mb=1G,mapreduce.reduce.cpu.vcores=1)
4)TASK运行
- 一旦TASK被YARN resource manager分配在某个YARN node manager下的containner执行,MapReduce application master便和这个YARN node manager取得联系并启动一个containner运行一个主类class为YarnChild(YarnChild运行在独立的JVM,并不是和TASK运行在同一个JVM,这样TASK的运行不会影响到YarnChild)的主程序,也即下图的“步骤9a和9b”
- YarnChild在运行TASK之前,先从分布式文件系统HDFS获取client提交的MapReduce job相关jar同步到本地、获取client提交的配置信息,也即下图中的“步骤10”
- 最后YarnChild开始运行TASK,也即下图中的“步骤11”
5)进度和状态更新
- map task通过输入数据已经处理的比例来定期更新其状态;
- reduce task比较复杂,reduce task首先将数据的处理分为三个阶段(COPY阶段占比1/3,SORT阶段占比1/3,REDUCE阶段占比1/3),完成了COPY阶段进度就是1/3,完成了SORT阶段进度就是2/3,完成了一半的REDUCE阶段进度就是1/3+1/3+1/3*1/2=5/6
- TASK还有一系列的计数器用来跟踪处理进度
- TASK运行的过程中,YarnChild时刻保持与MapReduce application master的联系,每隔3秒向MapReduce application master汇报该MapReduce job的TASK的进度和状态,YARN resource manager的WEB管理端通过链接到各个运行的MapReduce application master来展现它们负责的目前正在运行的MapReduce job的进度和状态。
- 在MapReduce job运行期间,client每秒(可设置mapreduce.client.progressmonitor.pollinterval来调整时间间隔)向MapReduce application master轮询MapReduce job的状态,client可以主动调用getStatus()来获取MapReduce job的状态信息,获取流程如下图所示:
6)MapReduce job完成
- 当MapReduce application master收到最后一个TASK完成的通知后,MapReduce application master负责将MapReduce job的status调整为successful,这样当client每秒向MapReduce application master轮询MapReduce job的状态就可以知道job已经完成,就会主动退出waitForCompletion(),然后在控制台console打印job统计信息和相关计数器信息
- 如果设置了mapreduce.job.end-notification.url,那么MapReduce application master在得知MapReduce job完成后,会将MapReduce job的信息(统计信息、计数器信息)通知到这个地址,这给了我们一个提示,我们是不是可以设置一个汇聚MapReduce job处理完成的管理中心
- MapReduce application master和相关容器containner清理各自状态(中间临时数据目录此刻会被删除)
- OutputCommitter.commitJob()会被调用,运行完成的MapReduce job信息会被存档到historyserver上一遍client所在用户进行信息查询追溯
10.【MapReduce】MapReduce job对于失败是如何处理的?如何进行重试?哪些参数与失败尝试相关?
答:
(1)MapReduce job失败相关处理:
- 第一种失败是由于MapReduce job里MapReduce tasks的用户代码导致的失败,MapReduce task所在JVM上报错误信息给MapReduce application master,错误信息会写入用户日志,MapReduce application master获知错误信息后尝试启动MapReduce AttemptTask重试,同时释放之前失败JVM所在containner资源以便可以执行其它MapReduce task
- 第二种失败是由于MapReduce tasks所在JVM的BUG导致的失败,此时YARN node manager发现该JVM进程退出后会通知MapReduce application master,MapReduce application master获知错误信息后尝试启动MapReduce AttemptTask重试
- 第三种失败是由于MapReduce task一直处于假死状态,MapReduce application master在有效的时间(mapreduce.task.timeout=6000000毫秒,也即10分钟,实际中不建议设置为0,0代表永远不过期,那么最终假死的MapReduce task会越来越多,最终使得集群被撑死)未收到进度更新,便标记该MapReduce task为失败
- 当MapReduce application master获知MapReduce task失败后,会启动MapReduce AttemptTask重试,并且尽量不安排在之前出问题的YARN node manager上执行MapReduce AttemptTask。总之,任何MapReduce task的最大重试次数为4次(可设置mapreduce.map.maxattempts更改默认的最大的map task尝试次数4次,可设置mapreduce.reduce.maxattempts更改默认的最大reduce task尝试次数4次),超过4次,不会再次尝试执行MapReduce AttemptTask,任何MapReduce job的MapReduce tasks尝试总次数不能大于4次,否则视MapReduce job失败
- 对于MapReduce job,在某些场合只要MapReduce tasks失败的比例控制在一定范围,仍然认为该MapReduce job是可信可用的,这时候可以通过参数mapreduce.map.failures.maxpercent和mapreduce.reduce.failures.maxpercent分别设置最大的允许map task和reduce task失败的比例
- MapReduce AttemptTask也可能存在失败的情况,比如在推测任务Speculative Execution(MapReduce job的MapReduce tasks有快有慢,针对那些假死或者耗时的task,会启动一个和该TASK一样的另外一个推测任务MapReduce task并行执行,当推测TASK在原TASK执行完之前完成那么原TASK被中止,当推测TASK在原TASK执行完之后完成那么推测TASK被中止;推测TASK只会在MapReduce job的所有MapReduce tasks都启动后才会开始被启动)被中止或者由于YARN node manager错误导致该YARN node manager上所有MapReduce tasks被标记为失败,此时的MapReduce AttemptTask失败不会导致之前map task或者reduce task的最大尝试次数计数器的累加,毕竟不是MapReduce AttemptTask的过错导致的问题。
(2)MapReduce application master的失败相关处理:
- MapReduce application master的最大失败尝试次数通过mapreduce.am.max-attempts(默认值为2)进行设置,大于2次则MapReduce job失败。YARN通过yarn.resourcemanager.am.max-attempts(默认值为2)设置了一个阈值对所有的MapReduce application master有效,而mapreduce.am.max-attempts的值必须小于此值,所以如果想增大mapreduce.am.max-attempts的值,必须先增大yarn.resourcemanager.am.max-attempts的值
- YARN resource manager未收到未收到MapReduce application master的定期心跳包判定MapReduce application master出错,YARN resource manager会在一个新的containner容器中启动新的MapReduce application master,通过job history来恢复(默认yarn.app.mapreduce.am.job.recovery.enable=true开启了恢复)在旧的MapReduce application master上运行的MapReduce tasks的状态
- client在MapReduce job初始化的时候将YARN resource manager反馈的MapReduce application master地址缓存在了本地,当MapReduce application master出错后,client发现一定期限内没有状态更新,就会向YARN resource manager请求新的MapReduce application master的地址
(3)YARN node manager的失败相关处理:
- YARN resource manager在有效的时间(yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms=600000毫秒)未收到YARN node manager的心跳包则视YARN node manager出错,此时运行于此YARN node manager上的MapReduce application master和MapReduce tasks按照之前的流程进行恢复或者重试。
- YARN node manager出错导致的不完整MapReduce job(所有map task已经执行完毕,但是map task的中间结果已经不可被reduce task读取),那么该MapReduce job对应的所有map task会被重新执行。
- 不管YARN node manager是否出错,只要YARN node manager上MapReduce application master出错的次数如果大于mapreduce.job.maxtaskfailures.per.tracker(默认值为3),则YARN node manager会被MapReduce application master列入黑名单,被列入黑名单后,MapReduce application master会尽可能让MapReduce tasks在其它YARN node manager上运行
(4)YARN resource manager的失败相关处理:
- YARN resource manager的失败是当前最严重的威胁,解决的最好办法是启动多个YARN resource manager,其中只有一个处于active状态,其他都处于standby状态;
- MapReduce job的运行信息都被存储在高可用的状态存储器(比如HDFS ZOOKEEPER).1)处于standby状态的YARN resource manager可以在active的YARN resource manager出错后马上从高可用的状态存储器(比如HDFS ZOOKEEPER)进行恢复。2)YARN node manager的信息并没有存储到高可用的状态存储器(比如HDFS ZOOKEEPER),因为新的active YARN resource manager可以从YARN node manager的心跳包中重新建立联系.3)MapReduce job的MapReduce tasks的状态只被对应的MapReduce application master管理,虽然没存储在高可用的状态存储器(比如HDFS ZOOKEEPER),但也不会丢失,先对mapreduce1是一大进步。
- 当新的YARN resource manager被active后,会从高可用的状态存储器(比如HDFS ZOOKEEPER)读取信息并重新启动所有的MapReduce application master,但是不会把计数器yarn.resourcemanager.am.max-attempts(默认为2)的值进行增1,因为毕竟不是MapReduce application master自己出问题。
- 我们知道HDFS的HA的高可用切换是有一个后台进程ZKFC进行控制,而这里YARN resource manager的HA的控制则并不是一个单独的后台进程控制,而是被嵌入到每一个YARN resource manager,用zookeeper的选举来维持永远只有一个YARN resource manager处于active状态。
- 默认yarn.resourcemanager.recovery.enabled=false并未启用YARN resource manager失败后的恢复功能
- 默认yarn.resourcemanager.store.class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore意思是YARN resource manager将状态信息存储到${hadoop.tmp.dir}/yarn/system/rmstore中,而这个目录是linux本地,在YARN resource manager恢复时候肯定是不可取的,一定要设置到HDFS打头的目录,比如:hdfs://localhost:9000/rmstore,这样才可以从HDFS高可用恢复数据
- 设置yarn.resourcemanager.store.class=org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore可将状态信息存储到zookeeper,此时通过yarn.resourcemanager.zk-state-store.parent-path=/rmstore设置zookeeper中数据存储目录,这样才可以从zookeeper中高可用恢复数据
11.【MapReduce】MapReduce中shuffle是什么?shuffle和sort的具体流程?从map task端shuffle和sort流程?reduce task端shuffle和sort流程?如何优化MapReduce job?
答:MapReduce会确保从map task产生的结果传输到reduce task时候数据都已经进行了排序处理,从map task到reduce task中间对于数据的排序处理和传输处理就是MapReduce的shuffle;
map task和reduce task是的执行可能在不同的机器节点上YARN node manager写下的containner,所以要了解shuffle和sort的具体流程需要分别站在map task和reduce task的角度去看待;
(1)map task端的shuffle处理
1)每个map task持有一个环形的内存缓冲区buffer用于存储map task输出。缓冲区默认大小为100M(可以通过mapreduce.task.io.sort.mb设置)。一旦缓冲的内容达到阈值(mapre
duce.map.sort.spill.percent,默认0.80),会开启一个后台线程,将buffer内容spill(溢出)到本地linux磁盘。map task的输出继续写到缓冲区buffer,但如果在此期间缓冲区buffer被填满,map task会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区内容写到mapreduce.cluster.local.dir属性指定的linux目录中
2)map task的buffer数据在写入linux磁盘前,那个开启的后台线程首先根据最终要输出的reducer将数据分区。在每个分区中,后台线程会在内存中按key执行排序,如果client设置了combiner,它就在排序后的输出上运行。运行combine函数会使map输出更加紧凑,减少写到磁盘的数据和传递给reducer的数据
3)map task的buffer每次达到阈值后,都会新建一个spill文件。因此在map写完其最后一个输出记录后,会有几个溢出文件。在task完成之前,spil文件被合并成一个已分区且已排序的输出文件。mapreduce.task.io.sort.factor属性控制着一次最多能合并多少流,默认值为10
4)如果至少存在3个spill文件(mapreduce.map.combine.minspills可以配置),这output写入到磁盘之前会再次运行combiner。反复调用combiner不会对最终结果产生影响。如果只有一个或者两个spill 文件,那么就不值得再对map输出调用combiner,所以不会再运行combiner。
5)建议将map输出结果压缩后再写到磁盘,这样写入磁盘会更快,节约磁盘空间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,需要将mapreduce.map.output.compress设置未true。使用的压缩库由mapreduce.map.output.compress.codec属性指定
6)reduce task通过HTTP方式得到map task输出文件的分区,文件分区的工作线程的数量是由属性mapre
duce.shuffle.max.threads(默认值为0,此时默认是处理器数量的2倍,这个应该是继承自netty的reactor线程池设置)控制的,此设置是针对每一个YARN nodemanager,不是针对每个map task
(2)reduce task端的shuffle处理
1)map task的输出文件都存储在linux上mapreduce.cluster.local.dir目录中,一个reduce task需要的是来自多个map task输出文件中的特殊parttion数据。每个map task的完成时间可能不同,因此只要有一个map task完成,reduce task就开始复制map task输出,reduce task有少量的copier线程来并行的取得map task的输出,默认为5个线程,并且可以通过mapreduce.reduce.shuffle.parallelcopies属性设置
2)reduce task如何知道要从哪台机器取得map输出的呢? map task成功完成后,它会通过heartbeat机制通知MapReduce application master。因此,对于一个MapReduce job,MapReduce application master知道 map output和hosts之间的映射。reduce task会有一个线程轮询MapReduce application master获取map task输出的位置,直至获取所有输出的位置
3)如果map task的输出足够小,会被拷贝到reduce task的JVM内存中(mapreduce.reduce.shuffle.input.buffer.percent设置用于此用途的内存占有堆空间JVM的百分比,默认为0.70,意思是JVM*0.70),否则,将会被复制到磁盘。一旦达到缓冲区大小的阈值时(mapreduce.reduce.shuffle.merge.percent,默认值0.66,此时阈值为JVM*0.70*0.66)或者达到map task输出的阈值(mapreduce.reduce.merge.inmem.threshold,意思是拷贝来自map task的输出文件个数大于1000则合并写到磁盘),则合并后spill到磁盘。如果指定combiner,则在合并期间运行它,降低写入硬盘的数据量
4)当从map task复制而来的副本在reduce task的本地linux上累积时,一个后台线程被启动来合并这些文件,如果之前map task的输出被压缩过此时会被解压然后在内存中进行合并。
5)复制完所有的map task输出后,reduce task进入sort阶段(准确的说是merge阶段,sort是在map端发生的),这个阶段维持map输出的顺序,合并map task输出,最后直接将结果写入HDFS,HDFS的第一个block一般会优先写入nodemanager所在机器上的datanode。举个例子:如果有50个map outputs,merge factor是10(mapreduce.task.io.sort.factor可以设置大小),合并将进行5次,每10个文件合并成一个文件,最后有5个中间文件。 最后将这个5个文件合并未一个文件是调用的reduce函数。
(3)熟悉了MapReduce job的map task和reduce task各自的shuffle之后,如何优化MapReduce job?
1)优化建议如下:
- 总的原则:尽可能多的分配内存给shuffle;
- 第二点是优化MapReduce task所在JVM内存,默认不管map task和reduce task,它们的JVM内存都是通过mapred.child.java.opts设置;
- 在map task里面,尽可能的优化mapreduce.task.io.sort.*属性来避免map task端spill数据到linux磁盘,比如增大mapreduce.task.io.sort.mb的设置,实战中可以通过观察hadoop的计数器SPILLED_RECORDS来调整,这个也用于reduce task;
- 在reduce task里面,尽可能的让reduce task的中间结果数据存储于内存中。比如对于内存需求低的reduce task,可以设置mapreduce.reduce.input.buffer.percent=1.0来尽可能让数据处于内存中,设置mapreduce.reduce.merge.inmem.threshold=0避免从map task输出文件个数大于1000时写磁盘文件。2008年4月,hadoop的基准测试在900个节点上运行1TB排序测试集仅需209秒,成为当时世界最快,其中一项优化就是尽可能的使得reduce task的中间结果数据存储于内存中;
- 默认hadoop访问文件的IO缓存大小io.file.buffer.size=4K,这个建议增大到128K。
2)从map task端优化MapReduce job
属性 | 类型 | 默认值 | 描述 |
mapreduce.task.io.sort.mb | int | 100 | 每个map task持有的一个环形内存缓冲区大小 |
mapreduce.map.sort.spill.percent | float | 0.80 | 当每个map task持有的一个环形内存缓冲区使用比例大于此值就spill写linux磁盘文件 |
mapreduce.task.io.sort.factor | int | 10 | 10个spill的linux本地文件会进行一次合并 |
mapreduce.map.combine.minspills | int | 3 | map task最后的输出文件个数必须小于此值,不然就触发合并 |
mapreduce.map.output.compress | boolean | false | map task的输出的压缩是否开启 |
mapreduce.map.output.compress.codec | Classname | org.apache.hadoop.io.compress.DefaultCodec | map task输出的压缩格式设置 |
mapreduce.shuffle.max.threads | int | 0 | 每一个nodemanager上开启多少个线程来将map task输出数据传输给reduce task,0表示处理器数量的2倍,此属性继承自netty设置 |
3)从reduce task端优化MapReduce job
属性 | 类型 | 默认值 | 描述 |
mapreduce.reduce.shuffle.parallelcopies | int | 5 | reduce task的COPY阶段,用来复制map task数据的线程数 |
mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | reduce task尝试从map task获取数据失败后的最大尝试次数,尝试次数大于此值后直接跑错 |
mapreduce.task.io.sort.factor | int | 10 | 10个spill的linux本地文件会进行一次合并 |
mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | 拷贝到reduce task的数据存储到内存缓冲区,该内存缓冲区大小为reduce task的JVM * 0.70 |
mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 拷贝到reduce task的数据存储到内存缓冲区,该缓冲区数据达到一定比例就写文件到linux磁盘,该比例对应内存阈值为为reduce task的JVM * 0.70 *0.66 |
12.【MapReduce】MapReduce job的推测任务(speculative execution)?
答:(1)MapReduce job的MapReduce tasks有快有慢,针对那些假死或挂起或者耗时的task,会启动一个和该TASK一样的另外一个推测任务MapReduce task并行执行,当推测TASK在原TASK执行完之前完成那么原TASK被中止,当推测TASK在原TASK执行完之后完成那么推测TASK被中止;推测TASK只会在MapReduce job的所有MapReduce tasks都启动后才会开始被启动,这种任务就是推测任务(speculative execution);
(2)推测任务的相关设置属性如下:
属性 | 类型 | 默认值 | 描述 |
mapreduce.map.speculative | boolean | true | 是否开启针对map task的推测任务 |
mapreduce.reduce.speculative | boolean | true | 是否开启针对reduce task的推测任务 |
yarn.app.mapreduce.am.job.speculator.class | Class | org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator | 推测任务执行策略 |
yarn.app.mapreduce.am.job.task.estimator.class | Class | org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator | 推测任务使用的评估策略 |
(3)建议关闭推测任务(speculative execution),理由如下:
- 推测任务(speculative execution)好处是减少MapReduce job执行时间,但是会带来性能开销,特别是在一个本来就非常忙的集群上;
- 对于reduce task推测任务(speculative execution)会让更多的reduce task去获取map task的输出,增加网络开销;
- 对于non-idempotent task更应该关闭推测任务(speculative execution)
13.【MapReduce】MapReduce job的FileInputFormat默认的InputPathFilter排除了哪些文件?如何设置自己的InputPathFilter过滤文件?如何设置级联读取所给目录子目录下文件数据?
答:FileInputFormat默认自己有InputPathFilter,这个InputPathFilter会将所给输入数据目录下所有以(点号.和下划线_打头的隐藏文件进行排除,放置参与数据处理计算);
client通过setInputPathFilter()添加自己的InputPathFilter,这个InputPathFilter不会覆盖默认的InputPathFilter,只会基于其上做叠加;
默认hadoop MapReduce job的FileInputFormat不会级联读取所给目录下子目录中的数据文件,相反会把子目录当作数据文件进行读取,此时会直接向外抛出error;如果想让其级联读取目录下子目录的数据文件需要设置mapreduce.input.fileinputformat.input.dir.recursive=true
14.【MapReduce】MapReduce的一些其他有用特性
答:(1)计数器:计数器可以辅助MapReduce job的相关统计信息;计数器可以用来诊断MapReduce job出现的问题;计数器相对于通过日志去排查MapReduce job更容易。
计数器分类:
- hadoop自带的计数器
计数器类型 | 计数器实现类 |
MapReduce task counters | org.apache.hadoop.mapreduce.TaskCounter |
Filesystem counters | org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat counters | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat counters | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
Job counters | org.apache.hadoop.mapreduce.JobCounter |
- 用户定义的java计数器
- 用户定义的非java计数器(Streaming计数器)
(2)排序sort
- 部分排序:MapReduce Job使用输入数据的key的数据进行排序,排序的规则是:1)首先采用mapreduce.job.output.key.comparator.class设置的类;2)如果mapreduce.job.output.key.comparator.class没设置,则输入数据的key类型必须是WritableComparable的子类,找该子类对应的comparator;3)如果找不到comparator,就使用RawComparator
- 全排序:第一种实现是设置只产生一个结果文件,第二种是分别产生多个分区parrtion排序文件然后串联起来
- 分组排序(次要排序或者第二排序):默认MapReduce Job已经按照key排序,如果需要多个条件排序,一般流程如下:1)定义包括自然键和自然值的组合键2)根据组合键对记录进行排序,即同时用自然键和自然值进行排序3)针对组合键进行分区和分组时均只考虑自然键
(3)链接join
- map task端的连接join:两个大规模输入数据集之间的map端连接会在数据达到map函数之前就执行链接join操作,此时要求各个map的输入数据必须已经按照相同的key先分区并且以该key特定方式排序。适用于多个Mapreduce job的输出进行连接join,只要这些Mapreduce job的reduce数量相同和key相同并且输出文件是不可切分的。
- reduce task端的链接join:reduce端join比map端join常用,但是毕竟reduce端的join数据都要经过shuffle,效率比map端join低。
(4)分布式缓存(类似于spark的广播变量)
- Mapreduce job配置Configuration共享:通过Configuration以key-value形式共享数据
- 分布式缓存:使用辅助类GenericOptionsParser通过-files将文件在HDFS进行共享
五.hadoop cluster相关问题
1.【hadoop cluster】hadoop的版本有哪些?各自优缺点?
答:
版本 | 安装包 | 优点 | 缺点 |
Apache hadoop | Tar | 扩展性好,更灵活,开源免费 |
工作量大, 没有完整的管理和监控功能 |
Apache Bigtop | RPM |
Debian提供各种子项目的整合和测试, 比如整合hive |
扩展不灵活受制于人 |
Cloudera hadoop | Tar、RPM |
具备安装、管理和监控功能, 有开源版本和商用版本, 目前开源版本有使用限制 |
扩展不灵活受制于人, 需要root或者sudo权限, 一家独大时商业纠纷问题 |
Hortonworks hadoop | Tar、RPM |
具备安装、管理和监控功能, 有开源版本和商用版本, 目前开源版本有使用限制 |
扩展不灵活受制于人, 需要root或者sudo权限, 一家独大时商业纠纷问题 |
Ambari hadoop | Tar、RPM |
具备安装、管理和监控功能, 开源免费 |
扩展不灵活受制于人, 需要root或者sudo权限 |
2.【hadoop cluster】企业hadoop cluster集群配置要求?企业如何去评估集群硬件需求?
答:(1)hadoop被设计用来在商业硬件上运行,企业可以选择普通硬件供应商生产的标准化的、广泛有效的硬件来搭建集群,这里的商业硬件注意两个原则:一是商业硬件不等同于低端硬件,低端硬件故障概率高;二是商业硬件也不推荐使用大型的数据库级别的机器,因为这类机器性价比太低。
对于商业硬件的配置推荐如下(基于2014年的硬件市场和配置推荐):
- 处理器:2 * 6/8核CPU,主频3GHZ
- 内存 :64-512G
- 磁盘 :12-24 * 1-4TB SATA磁盘,不建议RAID(只会降低速度,备份对hadoop毫无意义)
- 网络 :具有链路聚合的千兆以太网
推荐的hadoop cluster网络拓扑结构如下:
- 使用之前的HDFS网络距离算法计算最优距离;
- 默认hadoop是没有机架感知能力,需要设置net.topology.node.switch.mapping.impl和net.topology.script.file.name来启用机架感知
(2)企业如何估算hadoop cluster硬件服务器需求量:
3.【hadoop cluster】hadoop cluster安装步骤和建议?
答:详见hadoop2.7.1安装准备 和 1.x和2.x都支持的集群安装
(1)安装JDK(详查官网每个版本对于JDK的要求)
(2)创建用于管理的linux用户(建议分别创建linux用户hdfs mapred yarn)
(3)安装hadoop(下载tar.gz文件直接解压,安装目录建议为/usr/local,其次可以安装在/opt)
(4)配置SSH(方便从一个节点SSH登录其他节点从而实现集群启动和停止)
(5)修改hadoop配置文件
(6)格式化HDFS文件系统
(7)启动和关闭hadoop(可以设置hadoop-env.sh中HADOOP_SLAVES来改变默认slaves文件位置)
(8)在HDFS上创建用户目录
#在HDFS上创建文件目录/user/username并授权给username:username hadoop fs -mkdir /user/username hadoop fs -chown username:username /user/username #设置HDFS文件目录/user/username的最大使用磁盘空间 hdfs dfsadmin -setSpaceQuota 1t /user/username
4.【hadoop cluster】hadoop cluster有哪些配置文件?如何使用工具同步配置?hadoop的环境初始化配置?其它配置信息?
答:(1)hadoop的配置文件列表如下:
(2)在CDH和Ambari中,默认使用dsh和pdsh来管理进群配置;集群节点个性化或者差异化配置工具参考Chef, Puppet, CFEngine, and Bcfg2
(3)hadoop的环境变量初始化脚本主要为hadoop-env.sh和yarn-env.sh,一些特别的设置建议如下:
- java:请设置JAVA_HOME,因为在hadoop-env.sh和yarn-env.sh都需要此值,也可设置在hadoop-env.sh和yarn-env.sh里面;
- 堆内存:默认hadoop-env.sh里HADOOP_HEAPSIZE=1G来设置后台进程namenode datanode journalnode resourcemanager nodemanager进程各自内存,也可单独设置,比如设置yarn-env.sh里的YARN_RESOURCEMANAGER_HEAPSIZE来单独设置resourcemanager内存,其它以此类推;namenode内存计算公式:datanode节点个数*单datanode上最大磁盘空间/(block大小*副本replication数),比如在一个200个datanode集群上,单datanode磁盘空间为24TB,block=128MB上需要配置的namenode堆内存为200 ×24*1024*1024⁄ (128 MB × 3)=12.5G
- 系统日志:默认日志目录为$HADOOP_HOME/logs,该日志目录不存在的话hadoop在启动时会主动创建,如果创建linux文件目录权限不够直接跑错;$HADOOP_HOME/logs/*.log的日志文件默认永久保留不会自动删除清理,$HADOOP_HOME/logs/*.out的日志文件只保留最近的5个会自动删除清理过期日志文件;默认在hadoop-env.sh里设置了export HADOOP_IDENT_STRING=$USER,使得日志文件名字里包含了该节点启动的用户的名字,可以修改HADOOP_IDENT_STRING来做个性化设置。
- ssh:可以在hadoop-env.sh里设置HADOOP_SSH_OPTS将ssh信息传入hadoop
(4)后台进程相关重要设置
<?xml version="1.0"?> <!-- core-site.xml --> <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://namenode/</value> </property> </configuration>
<?xml version="1.0"?> <!-- hdfs-site.xml --> <configuration> <property> <!--设置多个,每个目录是一份完整数据的CPPY--> <name>dfs.namenode.name.dir</name> <value>/disk1/hdfs/name,/remote/hdfs/name</value> </property> <property> <!--设置多个,每个目录是一份完整数据的一部分--> <name>dfs.datanode.data.dir</name> <value>/disk1/hdfs/data,/disk2/hdfs/data</value> </property> <property> <!--设置多个,每个目录是一份完整数据的CPPY--> <name>dfs.namenode.checkpoint.dir</name> <value>/disk1/hdfs/namesecondary,/disk2/hdfs/namesecondary</value> </property> </configuration>
<?xml version="1.0"?> <!-- yarn-site.xml --> <configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>resourcemanager</value> </property> <property> <!--nodenamager允许containner存储中间结果的本地linux目录,这个参数通常会配置多个目录,已分摊磁盘IO负载--> <name>yarn.nodemanager.local-dirs</name> <value>/disk1/nm-local-dir,/disk2/nm-local-dir</value> </property> <property> <!--在YARN中,此值只能为mapreduce_shuffle--> <name>yarn.nodemanager.aux-services</name> <value>mapreduce.shuffle</value> </property> <property> <!--默认nodemanager能分配总内存为8G,这里提升到16G--> <name>yarn.nodemanager.resource.memory-mb</name> <value>16384</value> </property> <property> <!--默认nodemanager能分配总CPU CORE为8,这里提升到16--> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>16</value> </property> </configuration>
在mapreduce1中,map个数和reduce的个数是固定的,默认2个map2个reduce,所以计算tasktracker节点上内存为:1G(datanode内存)+1G(tasktracker内存)+2(map个数)*200MB(map task JVM)+2(reduce个数)*200MB(reduce task JVM)
在mapreduce2也即YARN中,map和reduce个数不再固定,只默认设置了map和reduce能申请的总的资源(8G内存,8CPU CORE),所以一般在nodemanager节点上,内存开销为:1G(datanode内存)+1G(nodemanager内存)+x(map个数)*1Gmap task JVM)+y(reduce个数)*1G(reduce task JVM),但总量x(map个数)*1Gmap task JVM)+y(reduce个数)*1G(reduce task JVM)不会超过默认的8G;YARN支持JVM虚拟内存设置,设置的虚拟内存不超过2.1倍JVM内存,默认yarn.nodemanager.vmem-pmem-ratio=2.1;YARN通过yarn.nodemanager.resource.cpuvcores、mapreduce.map.cpu.vcores、mapreduce.reduce.cpu.vcores来控制对于CPU的使用;可以设置yarn.nodemanager.containerexecutor.class和yarn.nodemanager.linux-container-executor来让YARN启用linux的cgroups资源隔离。
hadoop默认使用了一批端口号,这里详见官网。
(5)其他设置
通过dfs.hosts、dfs.hosts.exclude、yarn.resourcemanager.nodes.include-path和yarn.resourcemanager.nodes.exclude-path来上线和下线节点
默认hadoop访问文件的IO缓存大小io.file.buffer.size=4K,这个建议增大到128K
blocksize=128MB,不建议修改,很多地方依赖次配置
设置dfs.datanode.du.reserved预留磁盘空间供非HDFS程序或进程使用
设置fs.trash.interval启用回收站以免HDFS文件误删除后可以恢复
YARN调度器在Apache hadoop中默认为容量调度,而在CDH默认为公平调度,对于Apache hadoop建议优化为公平调度
默认reduce task在5%的map task完成后即开始启动,对于数据量大的任务建议设置mapreduce.job.reduce.slowstart.completedmaps进行优化
建议如果client和数据block在同一节点,建议开启short-circuit,通过设置dfs.client.read.shortcircuit=true和dfs.domain.socket.path开启
5.【hadoop cluster】hadoop cluster如何开启安全控制?
答:2009年,雅虎把Kerberos组件运用到了Hadoop之中,在RPC连接等多个组件上进行认证。时至今日,Kerberos依然是hadoop使用比较广泛的安全机制之一。
kerberos认证的过程:
- 认证:client向authentication服务器发送一条报文,或获取一个含时间戳的票据(Ticket-Granting Ticket ,TGT,默认有效期10小时)
- 授权:client使用TGT向ticket granting服务器请求一个服务票据
- 服务请求:client向最终的服务提供服务器(HDFS等)初十服务票据,以证实自己的合法性,然后最终的服务提供服务器(HDFS等)想client提供服务
hadoop中开启kerberos认证:
- hadoop.security.authentication=kerberos,默认值为simple,simple表示hadoop只通过linux组和用户进行权限判断,kerberos表示启用kerberos权限控制;
- hadoop.security.authorization=true来启用服务级别的授权,通过hadoop-policy.xml设置的linux组和用户来进行ACL的控制
hadoop的委托令牌:
- hadoop使用委托令牌来支持后续认证访问,避免了多次请求KDC(kerbers key distribution center)
- HDFS的委托令牌认证是通过设置dfs.block.access.token.enable=true来开启;
- mapreduce的委托令牌认证因为其提交的mapreduce job的jar和配置都存储在HDFS上进行共享,所以也是设置dfs.block.access.token.enable=true来开启;
其它的安全加强机制:
- 设置yarn.nodemanager.containerexecutor.class=org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor和yarn.nodemanager.linux-container-executor来启用轻量级目录访问(Lightweight Directory Access Protocol,LDAP)和基于linux cgroups的资源隔离;
- 任务有提交作业的用户启动时,建议把所有用户均可读的文件放到共享缓存中,把私有文件放在私有缓存中;
- 设置mapreduce.cluster.acls.enabled=true,mapreduce.job.acl-view-job=逗号分隔用户名, mapreduce.job.acl-modify-job=逗号分隔用户名来控制用户只能查看和修改自己的作业;
- shuffle默认阻止恶意用户请求获取其他用户的map输出;
- 为了防止恶意的辅助namenode datanode resourcemanager nodemanager加入集群跑环集群数据,建议设置master节点对视图与之连接的守护进程进行认证。比如一个datanode要加入集群,那么步骤是:1)通过ktutil生成keytab;2)设置dfs.datanode.keytab.file指向生成的keytab,设置dfs.datanode.kerberos.principal=usernname来指定使用的datanode账户;3)最后datanode与namenode交互的linux账户必须已经在hadoop-policy.xml中已经设置;
- datanode最好运行在特定端口(端口号小于24),使客户端确信它是安全的;
- mapreduce task只与其对应的mapreduce application master通信;
- 所有的网络数据传输采用加密,针对RPC设置hadoop.rpc.protection,针对HDFS设置dfs.encrypt.data.transfer,针对mapreduce shuffle设置mapreduce.shuffle.ssl.enabled,针对WEB UI接口设置hadokop.ssl.enabled
6.【hadoop cluster】hadoop cluster提供的基准测试工具有哪些?
答:啊红斗篷自带若干基准测试工具,这些工具放置在hadoop-*-test.jar的文件中,安装开销小,运行方便。基准测试工具很多无需传递额外参数,基准测试工具有利于快速检查搭建的hadoop cluster的性能状况,以供下一步的性能调优或者集群扩容甚至问题发现。
基准测试工具 | 用途 | 特点 |
TeraSort | 测试数据排序能力 |
1TB排序通常用于衡量分布式数据处 理框架的数据处理能力。Terasort是 Hadoop中的的一个排序作业,在2008年, Hadoop在1TB排序基准评估中赢得 第一名,耗时209秒。 |
TestDFSIO | 测试HDFS的I/0性能 | 本质为MapReduce |
MRBench | 测试小型作业是否快速响应 | 多次运行一个小型作业 |
NNBench | 测试namenode的加载过程 | |
Gridmix | 测试集群负载情况 |
是一个基准测试程序集合, 通过模拟真实场景数据来 逼真的为一个集群负载建模。 |
SWIM (Statistical Workload Injector for MapReduce) |
测试mapreduce负载情况 |
是一个针对mapreduce 负载的测试程序 |
TPCx-HS |
基于TeraSort和事务处理TPPC (Transaction Processing Performance Council) |
推荐阅读
-
学习强国app本地频道积分怎么获取?
-
Jsp学习笔记(4)——分页查询
-
ifunk笔记本怎么样?iFunk翼超极本体验评测
-
笔记本键盘没有大小写指示灯怎么判断大小写状态?
-
联想发布ThinkPad T560笔记本 售价5916元
-
联想Yoga720笔记本怎么样?联想笔记本yoga720配置介绍
-
昂达小马41值得买吗?昂达小马41笔记本全面深度体验评测图解
-
联想小新Air Pro值得买吗?联想小新Air Pro鹿晗特别版笔记本全面详细评测图解
-
联想小新Air 12值得买吗?联想小新Air 12云笔记本全面深度评测图解
-
雷神笔记本g150TB怎么使用u盘安装win7系统 雷神g150TB笔记本使用u盘安装win7系统图文