欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce 之小文件合并(HDFS小文件预处理)

程序员文章站 2022-07-14 19:40:27
...

MapReduce 之小文件合并

1.HDFS之小文件背景知识

  1. HDFS作为分布式文件系统,自身机制原因,每一个数据块(默认128MB一个数据块)都会在namenode节点的元数据中保存一份索引。
  2. 如果是小文件,例如1MB以下的小文件,由于HDFS本身机制,每个文件都会建立一个索引。这样的小文件过多,会导致namenode的索引文件过大。
  3. namenode的索引文件在启动时会加载到namenode的内存中,而内存是有限的。考虑到HDFS是分布式文件系统,里面存储的文件数量会非常大,所以这些小文件单独存放到HDFS中就不是很适合

2.HDFS小文件存储解决方式

  1. 文件之间合并,也是稍后会做代码演示的方式
  2. 文件打包压缩,和方法1差不多(hdfs本身支持多种压缩方式,有的压缩方式支持数据切块,有的不支持,有的解压缩速度很快,有的压缩率很高,根据需要进行设置处理即可)
  3. 小文件使用其他存储方式存储,不放在HDFS文件系统上

3.使用mapreduce进行小文件合并

  1. 考虑到效率,一般会在把小文件上传到HDFS之前,就在本地对小文件做合并,这样效率更高。
  2. 在整个大数据处理前期,针对原始数据做处理,包括数据清洗,筛选,合并等操作也是一个关键环节。数据清洗做的好,能很大降低后续代码数据处理的复杂程度和数据量。例如一些数据可以提前做一些分类跟聚合,这样工作量就不用全部放在mapreduce阶段,可以有效提升数据处理速度。
  3. mapreduce本身自带的combiner就是出于这种思路,在maptask处理的数据之后,提前做一些数据聚合。
  4. 数据准备,一部准备好的txt英文小说,拆分为10份
    MapReduce 之小文件合并(HDFS小文件预处理)
    MapReduce 之小文件合并(HDFS小文件预处理)
package com.doit.hadoop.combin_small_files;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author hulc
 * @slogan: just do it
 * @date 2020/8/22 20:18
 */

public class SmallFilesCombine2 {
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name", "local");

        try {
            Job instance = Job.getInstance(conf, SmallFilesCombine2.class.getSimpleName());

            // 设置mapper类
            instance.setMapperClass(SmallFilesMapper2.class);

            // 设置map输出的key和value
            instance.setMapOutputKeyClass(Text.class);
            instance.setMapOutputValueClass(NullWritable.class);

            // 设置输入和输出数据源路径
            FileInputFormat.setInputPaths(instance, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\smallFiles"));
            FileOutputFormat.setOutputPath(instance, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\smallFiles\\result1"));

            // 设置reduce task数量,合并为一个文件
            instance.setNumReduceTasks(1);

            // 启动程序
            boolean b = instance.waitForCompletion(true);

            if(b) {
                System.out.println("success");
            }else {
                System.out.println("failed");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class SmallFilesMapper2 extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 从各个文件中逐行读取数据,最后输出到一个文件中,就完成了对小文件的简单拼接处理.注意这里是没有对文件之间的拼接顺序做处理的
        // 如果对文件处理顺序有要求,则需另外增加代码

        context.write(value, NullWritable.get());
    }
}

  1. 聚合后处理结果
    MapReduce 之小文件合并(HDFS小文件预处理)
    内容是一致的,不过注意,聚合后内容是按照一行一行聚合,顺序并没有遵循之前的文件名以及源文件中的顺序。