hadoop面试题
文章目录
- hadoop面试题
- 入门
- HDFS
- MapReduce
- 1.谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化?
- 2.FileInputFormat切片机制
- 3.(问的少)自定义InputFormat流程
- 4.如何决定一个job的map和redue的数量?
- 5.Maptask的个数由什么决定?(与上题相似)
- 6.MapTask工作机制
- 截屏2020-04-23下午8.46.51
- 7.ReduceTask工作机制
- 8.请描述mapReduce有几种排序及排序发生的阶段
- 9.(shuffle工作流程必问)请描述mapreduce中suffle阶段的工作流程,如何优化shuffle阶段?
- 10.请描述mapReduce中combiner的作用是什么,一般使用情景,哪些情况不需要,及和reduce的区别?
- 11.Mapreduce的工作原理,请举例子说明mapreduce是怎么运行的?
- 12.如果没有定义partitioner,那数据在被送达reducer前是如何被分区的?
- 13.Mapreduce怎么实现TopN?
- 14.有可能使Hadoop任务输出到多个目录中么?如果可以,怎么做?
- 15.简述hadoop实现join的几种方法及每种方法的实现
- 16.请简述hadoop怎样实现二级排序?
- 17.参考下面的MR系统场景:
- 18.Hadoop中RecordReader的作用是什么?
- 19.(京东面试题)给你1G的数据文件。分别有id,name,mark,source四个字段,按照mark分组,id排序,手写一个MapReduce?其中有几个Mapper?
- yarn
- 优化
hadoop面试题
入门
1.简要描述如何配置apache的一个开源Hadoop?
1.准备服务器(配置ssh免密码登入)
2.安装JDK
3.解压hadoop安装包
4.配置hadoop核心文件 hadoop-env.sh,core-site.xml,mapred-site.xml,hdfs-site.xml
5.配置hadoop环境变量
6.格式化hadoop namenode-formate
7.启动节点 start-all.sh
2.Hadoop中需哪些需要配置文件,作用是?
core-site.xml:
fs.defaultFS: hdfs//cluster(域名),默认的HDFS路径
Hadoop.tmp.dir: /export/data/hadoop_tmp,默认是NameNode、DataNode、secondaryNamenode等存放数据的公共目录。用户可以自己单独制定三类节点的目录
hadoop-env.sh:设置jdk的安装路径,如:export JAVA_HOME=/user/local/jdk
hdfs-site.xml:
dfs.repilication: 文件块备份个数,默认为3
dfs.data.dir: datanode节点存储在文件形同的目录
dfs.name.dir: namenode节点存储hadoop文件系统信息的本地系统路径
mapred-site.xml:指mr运行在yarn上 mapreduce.framework.name:yarn
3.请列出正常工作的Hadoop集群中Hadoop都分别需要启动哪些进程,他们的作用?
NameNode:存储文件的元数据
DataNode:在本地文件系统存储文件块数据,以及块数据的校验和。
SecondaryNameNode:每隔一段时间对NN元数据备份。
ResourceManager:管理集群中所有资源
NodeManager:管理单个节点的资源
ApplicationMaster:每个MR任务会启动一个MrAppMaster,负责为MR向RM申请资源
Container:对资源的封装,防止资源被侵占
4.简述Hadoop的几个默认端口及其含义
hdfs:NN: 9820(内部通信) 9870(web端)
2NN:9868(web端)
yarn: ResourceManager 8088(web端)
HistoryServer 19888(web端)
HDFS
1.HDFS的存储机制(读写流程)?
写入:
1)客户端通过Distributed FileSystem 模块向NameNode请求上传文件,NameNode检查目标文件是否存在,父目录是否存在
2)NameNode返回是否可以上传
3)客户端请求是第一个block上传到哪几个DataNode服务器上
4)NameNode返回3个Datanode节点,分别为dn1,dn2,dn3
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求后会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1,dn2,dn3逐一应答客户端
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到本地内存缓存),以Packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3,dn1每传一个packet会放入一个应答队列等待应答
8)当第一个block传输完后,客户端再次请求NameNode上传第二块Block的服务器,重复3-7
读数据:
1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2)挑选一台DataNode(就近、然后随机)服务器请求读取数据。
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位做校验)
4)客户端以packet为单位接受,现在本地缓存,然后写入目标文件
2.SecondaryNameNode工作机制?
第一阶段:NameNode启动
1)第一次启动Name格式化后,创建Fsimage和Edits文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存
2)客户端对元数据进行增删改的请求
3)NameNode记录操作日志,更新滚动日志
4)NameNode在内存中对元数据进行增删改
第二阶段:SecondaryNameNode启动
1)2NN询问NN是否需要CheckPoint,直接带回结果
2)2nn请求执行checkpoint
3)nn滚动正在写的Edits日志
4)将滚动前的编辑日志和镜像文件拷贝到2nn
5)2nn加载编辑日志和镜像文件到内存,并合并
6)生成新的镜像文件
7)拷贝fsimage.chkpoint到nn
8)nn将fsimage.chkpoint重命名为fsimage
3.NN与2NN区别?
nn负责管理整个文件系统的元数据
2nn负责替nn进行编辑日志和镜像文件的合并工作
当nn挂掉的时候,可以从2nn进行恢复
4.服役新节点和退役旧节点步骤?
服役新节点:
1)克隆一台主机
2)修改ip地址和主机名称
3)删除原来hdfs系统留存的文件(data和logs)
4)source 一下配置文件
5)直接启动DataNode,即可关联到集群。
退役旧节点:
1)添加白名单和黑名单,黑名单写入要退役的节点
2)在hdfs-site.xml配置dfs.hosts和dfs.hosts.exclude 把黑白名单加入进去
3)重新启动集群( 节点数据迁移完毕后 进行退役)
5.NameNode挂了怎么办?
方式一:将2NN中的数据拷贝到namenode存储数据目录
方式二:使用importCheckpoint选项启动namenode守护进程,从而将2nn中的数据拷贝到namenode目录中
MapReduce
1.谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化?
序列化和反序列化:
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于持久化存储和网络传输
反序列化就是把收到的字节序列或是硬盘中持久化数据,转换成内存中的对象
java的序列化是重量级序列化框架(Serializable),一个对象被序列化了后,会附带很多额外的新的(各种校验信息、header、继承体系),不便与在网络中高效传输,所以hadoop自己开发了一套序列化机制Writable,精致高效
1.必须实现Writable接口
2.反序列化时,需要反射调用空参的构造函数,所以必须有空参构造器
3.重写序列化方法
4.重写反序列化方法
5.反序列化的顺序要和序列化的顺序完全一致
6.如果需要把结果显示在文件中需要重写toString()
2.FileInputFormat切片机制
(1)简单的按照文件的内容长度切片
(2)切片大小,默认等于Block大小,(数据倾斜1.1倍切片)
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
job提交流程源码:
waitForCompletion()
submit();
// 1建立连接
connect();
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
获得:xml、切片、jar包
切片:块大小、1.1倍
3.(问的少)自定义InputFormat流程
(1)自定义类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
4.如何决定一个job的map和redue的数量?
map数量:
splitSize = max(minSize,min(maxSize,blockSize) );
由客户端提交job时的切片个数决定,默认是block的数量
reduce数量:
默认是1,可根据业务逻辑来设定job.setNumReduceTask(x) ,x为reduce的数量
5.Maptask的个数由什么决定?(与上题相似)
由客户端提交job时的切片个数决定
6.MapTask工作机制
(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
7.ReduceTask工作机制
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
8.请描述mapReduce有几种排序及排序发生的阶段
1)排序分类
(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2)自定义排序WritableComparable
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序
WritableComparable排序案例实操(全排序)
排序发生的阶段:
二次排序、全排序、部分排序发生在map阶段
辅助排序发生在reduce阶段
9.(shuffle工作流程必问)请描述mapreduce中suffle阶段的工作流程,如何优化shuffle阶段?
分区、排序、溢写、拷贝到对应的reduce机器上,增加combiner,压缩溢写的文件
调整环形缓冲区大小 比率 、增大溢写文件、io采用压缩
10.请描述mapReduce中combiner的作用是什么,一般使用情景,哪些情况不需要,及和reduce的区别?
combiner是对每一个maptask的输出进行局部汇总,以减小网络传输量
combiner能勾应用的前提是不能影响最终的业务逻辑,而且输出的kv应该跟reducer的输入的kv类型对应
combiner和reducer的区别在于运行的位置
combiner在每一个maptask所在的节点
reducer接收全局所有的Mapper输出结果
11.Mapreduce的工作原理,请举例子说明mapreduce是怎么运行的?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4Xb19ttM-1587653018694)(/Users/vanas/Library/Application Support/typora-user-images/截屏2020-04-23下午9.20.57.png)]
(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
12.如果没有定义partitioner,那数据在被送达reducer前是如何被分区的?
默认的partition算法 key的hashcode值取模运算(%)reduceTask的数量,得到的数字就是 分区号 0
13.Mapreduce怎么实现TopN?
可以自定义gropingComparator,或者在map端对数据进行排序,然后再reduce输出时,控制只输出前n个数。
14.有可能使Hadoop任务输出到多个目录中么?如果可以,怎么做?
可以,采用自定义OutputFormat
步骤:自定义OutputFormat 改写recordwriter 具体改写输出数据的方法write()
15.简述hadoop实现join的几种方法及每种方法的实现
reduce join
map端主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
map join
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能减少数据倾斜
具体办法:采用distributedcache
在mapper 的setup阶段,将文件读取到缓存集合中
在驱动函数中加载缓存
job.addCacheFile(new URI(“file:/…”)). //缓存普通文件到task运行节点
16.请简述hadoop怎样实现二级排序?
对map端输出的key进行排序,实现的compareTo方法,其中compareTo方法中排序的条件有2个。
17.参考下面的MR系统场景:
5块 :64KB,64MB,1MB,64MB,63MB
18.Hadoop中RecordReader的作用是什么?
可以读取不同类型文件。
1.以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类
2.系统默认的RecordReader是LineRecordReader
3.LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value
4.应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是在文件中的偏移量。
(答案不准,读文件)
19.(京东面试题)给你1G的数据文件。分别有id,name,mark,source四个字段,按照mark分组,id排序,手写一个MapReduce?其中有几个Mapper?
1024m/128m=8块,所以有8个Mapper
代码仅供参考不一定正确
Bean
package com.atguigu.test.exam;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 分别有id,name,mark,source四个字段,按照mark分组,id排序,手写一个MapReduce?
*
* @author Vanas
* @create 2020-04-23 9:51 下午
*/
public class OrderBean implements WritableComparable<OrderBean> {
private Integer id;
private String name;
private String mark;
private String source;
public OrderBean() {
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMark() {
return mark;
}
public void setMark(String mark) {
this.mark = mark;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeUTF(mark);
out.writeUTF(source);
}
public void readFields(DataInput in) throws IOException {
id = in.readInt();
name = in.readUTF();
mark = in.readUTF();
source = in.readUTF();
}
public int compareTo(OrderBean o) {
return this.id - o.id;
}
@Override
public String toString() {
return id + "\t'" + name + "\t" + mark + "\t" + source ;
}
}
Mapper
package com.atguigu.test.exam;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author Vanas
* @create 2020-04-23 9:58 下午
*/
public class OrderMapper extends Mapper<LongWritable, Text, Text, OrderBean> {
Text outK = new Text();
OrderBean outV = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
outK.set(split[3]);
outV.setId(Integer.parseInt(split[0]));
outV.setName(split[1]);
outV.setMark(split[2]);
outV.setSource(split[3]);
context.write(outK, outV);
}
}
Reducer
package com.atguigu.test.exam;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author Vanas
* @create 2020-04-23 10:11 下午
*/
public class OrderReducer extends Reducer<Text, OrderBean, OrderBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException {
for (OrderBean value : values) {
context.write(value, NullWritable.get());
}
}
}
Driver
package com.atguigu.test.exam;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author Vanas
* @create 2020-04-23 10:15 下午
*/
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderBean.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(""));
FileOutputFormat.setOutputPath(job, new Path(""));
job.waitForCompletion(true);
}
}
yarn
1.简述Hadoop1与Hadoop2的架构异同?
加入了yarn解决了资源调度的问题,把MapReduce拆解开来,解偶
加入了对zookeeper的支持实现比较可靠的高可用
2.为什么会产生yarn,他解决了什么问题,有什么优势?
Yarn最主要功能是解决运行的用户程序与yarn框架完全解偶
yarn上可以运行各种类型的分布式运算程序(mr只是其中一种)比如spark等
3(必会)MR作业提交过程?
(1)作业提交
1.Client调用job.waitForCompletion方法,向整个集群提交MR作业
2.Client向RM申请一个作业id
3.RM给Client返回该job资源的提交路径和作业id
4.Client提交jar包、切片信息和配置文件到指定的资源提交路径
5.Client提交完资源后,向RM申请运行MrAppMaster
(2)作业初始化
6.当MR收到Client的请求后,将该job添加到容量调度器中
7.某一个空闲的NM领取到该job
8.该NM创建Container,并产生MrAppMaster
9.下载Client提交的资源到本地
(3)任务分配
10.MrAppMaster向RM申请运行多个MapTask任务资源
11.RM将运行MapTask任务分配给另外两个NodeManager,另外两个NodeManager分别领取任务并创建容器
(4)任务运行
12.MrAppMaster向两个接收到任务的NM发送城区启动脚本,两个NM分别启动MapTask,MapTask对数据分区排序
13.MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask
14.ReduceTask向MapTask获取相应的分区数据
15.程序运行完毕后,MR会向RM申请注销自己
4.HDFS的数据压缩算法?以及每种算法的应用场景?
Gzip压缩
优点:压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。
缺点:不支持Split。
应用场景:当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。
Bzip压缩
优点:支持Split;具有很高的压缩率,比Gzip压缩率都高;Hadoop本身自带,使用方便。
缺点:压缩/解压速度慢。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况。
LZO压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持Split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便。
缺点:压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越越明显。
sanppy压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持Split;压缩率比Gzip要低;Hadoop本身自带,使用方便。
应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入。
5.Hadoop的调度器总结?
FIFO :先进先出队列,只有一个任务执行
Capacity Scheduler(默认):多队列,每个队列内部先进先出,每个队列只有一个任务在执行,队列的并行度为队列的个数
Fair Scheduler:公平调度器:多队列:每个队列内部按照缺额大小分配资源启动任务,同一时间队列中有多少个任务执行,队列的并行度大于等于队列的个数
6.maprereduce推测执行算法及原理?
estimatedRunTime = (currentTimestamp - taskStartTime) / progress
推测运行时间(60s) = (当前时刻(6)+任务启动时刻(0)) / 任务运行比例(10%)
estimatedEndTime = estimatedRunTime + taskStartTime
推测执行完时刻 60 = 推测运行时间(60) + 任务启动时刻(0)
estimatedEndTime’ = currentTimestamp + averageRunTime
备份任务推测完时刻(16)= 当前时刻(6)+ 运行完任务的平均时间(10)
MR总是选择(estimatedEndTime - estimatedEndTime’)差值最大的任务,并为之启动备份任务
为了防止大量任务同时启动备份任务造成资源浪费,mr为每个作业设置了同时启动的备份任务数目的上限
以空间换时间
优化
1.mapreduce 跑的慢的原因:
1)计算机性能:cpu、内存、磁盘健康、网络
2)I/O操作优化:
1.数据倾斜
2.Map和Reduce数设置不合理
3.Map运行时间太长,导致Reduce等待过久
4.小文件过多
5.大量不可分块的超大文件
6.spill次数过多
7.merge次数过多等
2.mapreduce优化方法
数据输入
(1)合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。
(2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。
Map阶段
(1)减少溢写(Spill)次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。
(2)减少合并(Merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。
(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少 I/O。
Reduce阶段
(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。
(2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。
(3)规避使用Reduce:因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)合理设置Reduce端的Buffer:默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销:mapreduce.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。
I/O传输
1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。
2)使用SequenceFile二进制文件。
数据倾斜问题
1.数据倾斜现象
数据频率倾斜——某一个区域的数据量要远远大于其他区域。
数据大小倾斜——部分记录的大小远远大于平均值。
2.减少数据倾斜的方法
方法1:抽样和范围分区
可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
方法2:自定义分区
基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。
方法3:Combine
使用Combine可以大量地减小数据倾斜。在可能的情况下,Combine的目的就是聚合并精简数据。
方法4:采用Map Join,尽量避免Reduce Join。
常用的调优参数
3.HDFS小文件优化方法?
1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
4)开启uber模式,实现jvm重用
- Hadoop Archive
是一个高效的将小文件放入HDFS块中的文件存档工具,能够将多个小文件打包成一个HAR文件,从而达到减少NameNode的内存使用
- SequenceFile
SequenceFile是由一系列的二进制k/v组成,如果为key为文件名,value为文件内容,可将大批小文件合并成一个大文件
- CombineTextInputFormat
CombineTextInputFormat用于将多个小文件在切片过程中生成一个单独的切片或者少量的切片。
- 开启uber模式,实现jvm重用。默认情况下,每个Task任务都需要启动一个jvm来运行,如果Task任务计算的数据量很小,我们可以让同一个Job的多个Task运行在一个Jvm中,不必为每个Task都开启一个Jvm.
4.MapRedcue怎么解决数据均衡问题,如何确定分区号?
抽样和范围分区。通过对院士数据抽样得到的结果集来预设分区边界值
自定义分区:基于输出键 进行自定义分区,根据业务逻辑均衡分配key值
combine:可以大量减小数据倾斜
MapJoin :尽量避免Reduce Join
区号从0 开始根据设置分区的的数一次递增
5.Hadoop中job和Tasks之间的区别是什么?
一个job中包含多个Tasks