MapReduce 之小文件合并(HDFS小文件预处理)
程序员文章站
2022-07-14 19:40:27
...
MapReduce 之小文件合并
1.HDFS之小文件背景知识
- HDFS作为分布式文件系统,自身机制原因,每一个数据块(默认128MB一个数据块)都会在namenode节点的元数据中保存一份索引。
- 如果是小文件,例如1MB以下的小文件,由于HDFS本身机制,每个文件都会建立一个索引。这样的小文件过多,会导致namenode的索引文件过大。
- namenode的索引文件在启动时会加载到namenode的内存中,而内存是有限的。考虑到HDFS是分布式文件系统,里面存储的文件数量会非常大,所以这些小文件单独存放到HDFS中就不是很适合
2.HDFS小文件存储解决方式
- 文件之间合并,也是稍后会做代码演示的方式
- 文件打包压缩,和方法1差不多(hdfs本身支持多种压缩方式,有的压缩方式支持数据切块,有的不支持,有的解压缩速度很快,有的压缩率很高,根据需要进行设置处理即可)
- 小文件使用其他存储方式存储,不放在HDFS文件系统上
3.使用mapreduce进行小文件合并
- 考虑到效率,一般会在把小文件上传到HDFS之前,就在本地对小文件做合并,这样效率更高。
- 在整个大数据处理前期,针对原始数据做处理,包括数据清洗,筛选,合并等操作也是一个关键环节。数据清洗做的好,能很大降低后续代码数据处理的复杂程度和数据量。例如一些数据可以提前做一些分类跟聚合,这样工作量就不用全部放在mapreduce阶段,可以有效提升数据处理速度。
- mapreduce本身自带的combiner就是出于这种思路,在maptask处理的数据之后,提前做一些数据聚合。
- 数据准备,一部准备好的txt英文小说,拆分为10份
package com.doit.hadoop.combin_small_files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author hulc
* @slogan: just do it
* @date 2020/8/22 20:18
*/
public class SmallFilesCombine2 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");
try {
Job instance = Job.getInstance(conf, SmallFilesCombine2.class.getSimpleName());
// 设置mapper类
instance.setMapperClass(SmallFilesMapper2.class);
// 设置map输出的key和value
instance.setMapOutputKeyClass(Text.class);
instance.setMapOutputValueClass(NullWritable.class);
// 设置输入和输出数据源路径
FileInputFormat.setInputPaths(instance, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\smallFiles"));
FileOutputFormat.setOutputPath(instance, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\smallFiles\\result1"));
// 设置reduce task数量,合并为一个文件
instance.setNumReduceTasks(1);
// 启动程序
boolean b = instance.waitForCompletion(true);
if(b) {
System.out.println("success");
}else {
System.out.println("failed");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class SmallFilesMapper2 extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 从各个文件中逐行读取数据,最后输出到一个文件中,就完成了对小文件的简单拼接处理.注意这里是没有对文件之间的拼接顺序做处理的
// 如果对文件处理顺序有要求,则需另外增加代码
context.write(value, NullWritable.get());
}
}
- 聚合后处理结果
内容是一致的,不过注意,聚合后内容是按照一行一行聚合,顺序并没有遵循之前的文件名以及源文件中的顺序。
上一篇: MapReduce编程场景之小文件合并
下一篇: js获取字符串的实际长度