大数据面试题(三)----MapReduce面试题
MapReduce面试题
-
谈谈Hadoop 序列化和反序列化及自定义bean 对象实现序列化?
1) 序列化和反序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)
和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存
中的对象。
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外
的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,
hadoop 自己开发了一套序列化机制(Writable),精简、高效。
2) 自定义bean 对象要想序列化传输步骤及注意事项:
(1) 必须实现Writable 接口
(2) 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3) 重写序列化方法
(4) 重写反序列化方法
(5) 注意反序列化的顺序和序列化的顺序完全一致
(6) 要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用
(7) 如果需要将自定义的bean 放在key 中传输,则还需要实现comparable 接口,因为mapreduce 框中的shuffle 过程一定会对key 进行排序 -
在Hadoop 中定义的InputFormat 中,默认是哪一个?(A)
A.TextInputFormat
B.KeyValueInputFormat
C.SequenceFileInputFormat -
两个类TextInputFormat 和KeyValueInputFormat 的区别是什么?
1) 相同点:
TextInputformat 和KeyValueTextInputFormat 都继承了FileInputFormat 类,都是每一行作为一个记
录;
2) 区别:
TextInputformat 将每一行在文件中的起始偏移量作为key,每一行的内容作为value。默认以\n 或回
车键作为一行记录。
KeyValueTextInputFormat 适合处理输入数据的每一行是两列,并用tab 分离的形式。
public static class MyMapper extends Mapper<Text, Text, Text, LongWritable>
{
final Text k2 = new Text();
final LongWritable v2 = new LongWritable();
protected void map(Text key, Text value,
Mapper<Text, Text, Text, LongWritable>.Context context) throws
InterruptedException, IOException {
// final String line = value.toString();
// final String[] splited = line.split("o");
// for (String word : splited) {
// k2.set(word); k2.set(key);
v2.set(1); context.write(k2, v2);
// }
}
}
- FileInputFormat 切片机制(☆☆☆☆☆)
1)job 提交流程源码详解
waitForCompletion() submit();
// 1 建立连接
connect();
// 1)创建提交job 的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn 还是远程
initialize(jobTrackAddr, conf);
// 2 提交job submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobid ,并创建job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝jar 包到集群
copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job);
// 5)向Stag 路径写xml 配置文件
writeConf(conf, submitJobFile); conf.writeXml(out);
// 6)提交job,返回提交状态
Status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());
5. 在一个运行的Hadoop 任务中,什么是InputSplit?(☆☆☆☆☆)
FileInputFormat 源码解析(input.getSplits(job))
(1) 找到你数据存储的目录。
(2) 开始遍历处理(规划切片)目录下的每一个文件
(3) 遍历第一个文件ss.txt
a) 获取文件大小fs.sizeOf(ss.txt);
b) 计算切片大小
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c) 默认情况下,切片大小=blocksize
d) 开始切,形成第1 个切片:ss.txt—0:128M 第2 个切片ss.txt—128:256M 第3 个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1 倍, 不大于1.1 倍就划分一块切片)
e) 将切片信息写到一个切片规划文件中
f) 整个切片的核心过程在getSplit()方法中完成。
g) 数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h) 注意:block 是HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4) 提交切片规划文件到yarn 上,yarn 上的MrAppMaster 就可以根据切片规划文件计算开启maptask 个数。
-
自定义InputFormat 流程
(1) 自定义一个类继承FileInputFormat
(2) 改写RecordReader,实现一次读取一个完整文件封装为KV -
如何决定一个job 的map 和reduce 的数量?
1) map 数量
splitSize=max{minSize,min{maxSize,blockSize}}
map 数量由处理的数据分成的block 数量决定default_num = total_size / split_size;
2) reduce 数量
reduce 的数量job.setNumReduceTasks(x);x 为reduce 的数量。不设置的话默认为1。 -
Maptask 的个数由什么决定?
一个job 的map 阶段MapTask 并行度(个数),由客户端提交job 时的切片个数决定。 -
MapTask 工作机制(☆☆☆☆☆)
等待。。。 -
ReduceTask 工作机制(☆☆☆☆☆)
等待。。。 -
请描述mapReduce 有几种排序及排序发生的阶段(☆☆☆☆☆)
1) 排序的分类:
(1) 部分排序:
MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序
(2) 全排序:
如何用Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低, 因为一台机器必须处理所有输出文件, 从而完全丧失了MapReduce 所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建3 个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。
(3) 辅助排序:(GroupingComparator 分组)
Mapreduce 框架在记录到达reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map 任务且这些map 任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce 程序会避免让reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
(4) 二次排序:
在自定义排序过程中,如果compareTo 中的判断条件为两个即为二次排序。
2) 自定义排序WritableComparable
bean 对象实现WritableComparable 接口重写compareTo 方法,就可以实现排序
3) 排序发生的阶段:
(1) 一个是在map side 发生在spill 后partition 前。
(2) 一个是在reduce side 发生在copy 后reduce 前。 -
请描述mapReduce 中shuffle 阶段的工作流程,如何优化shuffle 阶段(☆☆☆☆☆)
分区,排序,溢写,拷贝到对应reduce 机器上,增加combiner,压缩溢写的文件。 -
请描述mapReduce 中combiner 的作用是什么,一般使用情景,哪些情况不需要,及和reduce 的区别?
1) Combiner 的意义就是对每一个maptask 的输出进行局部汇总,以减小网络传输量。
2) Combiner 能够应用的前提是不能影响最终的业务逻辑,而且,Combiner 的输出kv 应该跟reducer 的输入kv 类型要对应起来。
3) Combiner 和reducer 的区别在于运行的位置。
Combiner 是在每一个maptask 所在的节点运行;
Reducer 是接收全局所有Mapper 的输出结果。 -
Mapreduce 的工作原理,请举例子说明mapreduce 是怎么运行的?(☆☆☆☆☆)
-
如果没有定义partitioner,那数据在被送达reducer 前是如何被分区的?
如果没有自定义的partitioning,则默认的partition 算法,即根据每一条数据的key 的hashcode值摸运算(%)reduce 的数量,得到的数字就是“分区号“。 -
MapReduce 出现单点负载多大,怎么负载平衡? (☆☆☆☆☆)
可以用Partitioner -
Hadoop 的缓存机制(Distributedcache)(☆☆☆☆☆)
分布式缓存一个最重要的应用就是在进行join 操作的时候,如果一个表很大,另一个表很小,我们就可以将这个小表进行广播处理,即每个计算节点上都存一份,然后进行map 端的连接操作,经过我的实验验证,这种情况下处理效率大大高于一般的reduce 端join,广播处理就运用到了分布式缓存的技术。
DistributedCache 将拷贝缓存的文件到Slave 节点在任何Job 在节点上执行之前,文件在每个Job 中只会被拷贝一次,缓存的归档文件会被在Slave 节点中解压缩。将本地文件复制到HDFS 中去,接着Client 会通过addCacheFile()和addCacheArchive()方法告诉DistributedCache 在HDFS中的位置。当文件存放到文地时, JobClient 同样获得
DistributedCache 来创建符号链接,其形式为文件的URI 加fragment 标识。当用户需要获得缓存中所有有效文件的列表时,JobConf 的方法getLocalCacheFiles() 和getLocalArchives() 都返回一个指向本地文件路径对象数组。 -
如何使用mapReduce 实现两个表的join?(☆☆☆☆☆)
1) reduce side join : 在map 阶段,map 函数同时读取两个文件File1 和File2,为了区分两种来源的key/value 数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件File1,tag=2 表示来自文件File2。
2) map side join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个maptask 内存中存在一份(比如存放到hash table 中),然后只扫描大表: 对于大表中的每一条记录key/value,在hash table 中查找是否有相同的key 的记录,如果有,则连接后输出即可。 -
有可能使Hadoop 任务输出到多个目录中么?如果可以,怎么做?
1) 可以输出到多个目录中,采用自定义OutputFormat。
2) 实现步骤:
(1) 自定义outputformat,
(2) 改写recordwriter,具体改写输出数据的方法write() -
什么样的计算不能用mr 来提速,举5 个例子。
1) 数据量很小。
2) 繁杂的小文件。
3) 索引是更好的存取机制的时候。
4) 事务处理。
5) 只有一台机器的时候。 -
ETL 是哪三个单词的缩写
Extraction-Transformation-Loading 的缩写,中文名称为数据提取、转换和加载。 -
MapReduce 实操
给你一个1G 的数据文件。分别有id,name,mark,source 四个字段,按照mark 分组,id 排序,手写一个MapReduce?其中有几个Mapper?
1) MapReduce 实现
(1) 在map 端对id 进行排序
(2) 在reduce 端对mark 进行分组
2) 几个mapper
(1)1024m/128m=8 块