欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop入门——MapReduce对于海量小文件的多种解决方案

程序员文章站 2022-07-14 19:50:43
...

一.概述

小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储一亿个文件,则NameNode需要20G空间。这样NameNode内存容量严重制约了集群的扩展。其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个DataNode跳到另外一个DataNode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

 

二.HDFS读写流程

1.读文件流程

A.client端发送读文件请求给NameNode,如果文件不存在,返回错误信息,否则,将该文件对应的block机器所在DataNode位置发送给client。

B.client收到文件位置信息后,与不同DataNode建立socket连接并行获取数据。

2.写文件流程

A.client端发送写文件请求,NameNode检查文件是否存在,如果已经存在,直接返回错误信息,否则,发送给client一些可用节点。

B.client将文件分块,并行存储到不同DataNode节点上,发送完成以后,client同时发送信息给NameNode和DataNode。

C.NameNode收到client的信息后,发送信息给DataNode。

D.DataNode同时收到NameNode和DataNode的确认信息后,提交写操作。

 

三.解决方案

1.编写应用程序实现

public class AppForSmallFile {
 
	//定义文件读取的路径
	private static final String OUTPATH = "hdfs://liaozhongmin:9000";
		
	public static void main(String[] args) {
		
		//定义FSDataOutputStream对象
		FSDataOutputStream fsDataoutputStream = null;
		//定义输入流读文件
		InputStreamReader inputStreamReader = null;
		try {
			//创建合并后文件存储的的路径
			Path path = new Path(OUTPATH + "/combinedFile");
			
			//创建FSDataOutputStream对象
			fsDataoutputStream =  FileSystem.get(path.toUri(), new Configuration()).create(path);
			
			//创建要合并的小文件路径
			File sourceDir = new File("C:\\Windows\\System32\\drivers\\etc");
			
			//遍历小文件
			for (File fileName : sourceDir.listFiles()){
				
				//创建输入流
				//fileInputStream = new FileInputStream(fileName.getAbsolutePath());
				//只有这样才可以制定字符编码(没办法,Window是默认GBK的,Hadoop是默认UTF-8的,所以读的时候就会乱码)
				inputStreamReader = new InputStreamReader(new FileInputStream(fileName), "gbk");
				//一行一行的读取
				List<String> readLines = IOUtils.readLines(inputStreamReader);
				
				//然后再写出去
				for (String line : readLines){
					//写入一行
					fsDataoutputStream.write(line.getBytes());
					//写入一个换行符
					fsDataoutputStream.write("\n".getBytes());
				}
				
			}
			
			System.out.println("合并成功");
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			try {
				inputStreamReader.close();
				fsDataoutputStream.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			
		}
		
	}
}

注:这种方案是使用java文件相关操作,将众多的小文件写到一个文件中。

 

2.使用archive工具

创建文件 hadoop archive -archiveName xxx.har -p  /src  /dest
查看内部结构 hadoop fs -lsr /dest/xxx.har
查看内容 hadoop fs -lsr har:///dest/xxx.har

 

3.使用SequenceFile或者MapFile(以SequenceFile为例)

提供两种将小文件打成SequenceFile的方法:
方法一:

public class WriteSequenceMapReduce {
	// 定义输入路径
		private static final String INPUT_PATH = "hdfs://master:9000/files";
		// 定义输出路径
		private static final String OUT_PATH = "hdfs://master:9000/seq/";
		//定义文件系统
		private static FileSystem fileSystem = null;
		
		public static void main(String[] args) {
 
			try {
				// 创建配置信息
				Configuration conf = new Configuration();
 
				// 创建文件系统
				fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果输出目录存在,我们就删除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}
 
				// 创建任务
				Job job = new Job(conf, WriteSequenceMapReduce.class.getName());
 
				// 1.1 设置输入目录和设置输入数据格式化的类
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				// 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
				job.setMapperClass(WriteSequenceMapper.class);
				// 2.3 指定输出的路径和设置输出的格式化类
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				// 提交作业 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
 
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
 
	
	public static class WriteSequenceMapper extends Mapper<LongWritable, Text, Text, BytesWritable> {
		// 定义SequenceFile.Reader对象用于读文件
		private static SequenceFile.Writer writer = null;
		// 定义配置信息
		private static Configuration conf = null;
		// 定义最终输出的key和value
		private Text outkey = new Text();
		private BytesWritable outValue = new BytesWritable();
		//定义要合并的文件(存放在数组中)
		private FileStatus[] files = null;
		//定义输入流和一个字节数组
		private InputStream inputStream = null;
		private byte[] buffer = null;
		
		@Override
		protected void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
			try {
				// 创建配置信息
				conf = new Configuration();
				// 创建Path对象
				Path path = new Path(INPUT_PATH);
				// 创建SequenceFile.Writer对象,并指定压缩格式
				writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class, CompressionType.BLOCK, new BZip2Codec());
				//writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class);
				//获取要合并的文件数组
				files = fileSystem.listStatus(path);
				
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
 
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
			
 
			//遍历文件数组
			for (int i=0; i<files.length; i++){
				//将文件名作为输出的key
				outkey.set(files[i].getPath().toString());
				
				//创建输入流
				inputStream = fileSystem.open(files[i].getPath());
				//创建字节数组
				buffer = new byte[(int) files[i].getLen()];
				//通过工具类将文件读到字节数组中
				IOUtils.readFully(inputStream, buffer, 0, buffer.length);
				//将字节数组中的内容及单个文件的内容作为value输出
				outValue.set(new BytesWritable(buffer));
				
				//关闭输入流
				IOUtils.closeStream(inputStream);
				
				//将结果写到Sequencefile中
				writer.append(outkey, outValue);
				
			}
			
			//关闭流
			IOUtils.closeStream(writer);
			
			//System.exit(0);
		}
 
	}
}

方法二:自定义InputFormat和RecordReader实现

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
 
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		//创建自定义的RecordReader
		WholeFileRecordReader reader = new WholeFileRecordReader();
		
		reader.initialize(split, context);
		
		return reader;
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		
		return false;
	}
 
}
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
 
	private FileSplit fileSplit;
	private Configuration conf;
	private BytesWritable value = new BytesWritable();
	private boolean processed = false;
	
	
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
		this.fileSplit = (FileSplit) split;
		this.conf = context.getConfiguration();
		
	}
	
	/**
	 * process表示记录是否已经被处理过了
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!processed){
			byte[] contents = new byte[(int) fileSplit.getLength()];
			//获取路径
			Path file = fileSplit.getPath();
			//创建文件系统
			FileSystem fileSystem = file.getFileSystem(conf);
			FSDataInputStream in = null;
			try {
				//打开文件
				in = fileSystem.open(file);
				//将file文件中的内容放入contents数组中。使用了IOUtils工具类的readFully()方法,将in流中的内容读到contents字节数组中
				IOUtils.readFully(in, contents, 0, contents.length);
				//BytesWritable是一个可用做key或value的字节序列,而ByteWritable是单个字节
				//将value的内容设置为contents的值
				value.set(contents, 0, contents.length);
				
			} catch (Exception e) {
				e.printStackTrace();
			} finally{
				IOUtils.closeStream(in);
			}
			
			processed = true;
			return true;
		}
		return false;
	}
 
	@Override
	public NullWritable getCurrentKey() throws IOException, InterruptedException {
		return NullWritable.get();
	}
 
	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		return value;
	}
 
	@Override
	public float getProgress() throws IOException, InterruptedException {
		
		return processed ? 1.0f : 0.0f;
	}
 
	@Override
	public void close() throws IOException {
 
		//do nothing
	}
 
}
public class SmallFilesToSequenceFileConverter {
	
	
	// 定义输入路径
	private static final String INPUT_PATH = "hdfs://master:9000/files/*";
	// 定义输出路径
	private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/total.seq";
	
	public static void main(String[] args) {
		try {
			// 创建配置信息
			Configuration conf = new Configuration();
 
			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}
 
			// 创建任务
			Job job = new Job(conf, SmallFilesToSequenceFileConverter.class.getName());
 
			//1.1	设置输入目录和设置输入数据格式化的类
			FileInputFormat.addInputPaths(job, INPUT_PATH);
			job.setInputFormatClass(WholeFileInputFormat.class);
 
			//1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(SequenceFileMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(BytesWritable.class);
 
			//1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			//千万不要有这句话,否则单个小文件的内容会输出到单独的一个Sequencefile文件中(简直内伤)
			//job.setNumReduceTasks(0);
 
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(SequenceFileOutputFormat.class);
			
			// 此处的设置是最终输出的key/value,一定要注意!
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(BytesWritable.class);
			
			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);
		
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
	public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
		// 定义文件的名称作为key
		private Text fileNameKey = null;
 
		/**
		 * task调用之前,初始化fileNameKey
		 */
		@Override
		protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
			// 获取分片
			InputSplit split = context.getInputSplit();
			// 获取输入目录
			Path path = ((FileSplit) split).getPath();
			// 设置fileNameKey
			fileNameKey = new Text(path.toString());
		}
 
		@Override
		protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
				InterruptedException {
			// 将fileNameKey作为输出的key(文件名),value作为输出的value(单个小文件的内容)
			System.out.println(fileNameKey.toString());
			context.write(fileNameKey, value);
		}
	}
}

注:方法二的这三个类可以实现将小文件写到一个SequenceFile中。

读取SequenceFile文件:

public class ReadSequenceMapReduce {
	// 定义输入路径
	private static final String INPUT_PATH = "hdfs://master:9000/seq/total.seq";
	// 定义输出路径
	private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/out";
	//定义文件系统
	private static FileSystem fileSystem = null;
	
	public static void main(String[] args) {
 
		try {
			// 创建配置信息
			Configuration conf = new Configuration();
 
			// 创建文件系统
			fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}
 
			// 创建任务
			Job job = new Job(conf, ReadSequenceMapReduce.class.getName());
 
			// 1.1 设置输入目录和设置输入数据格式化的类
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			// 这个很重要,指定使用SequenceFileInputFormat类来处理我们的输入文件
			job.setInputFormatClass(SequenceFileInputFormat.class);
 
			// 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(ReadSequenceMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
 
			// 1.3 设置分区和reduce数量
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(0);
 
			// 最终输出的类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
 
			// 2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);
 
			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);
 
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
	public static class ReadSequenceMapper extends Mapper<Text, BytesWritable, Text, Text> {
 
		//定义SequenceFile.Reader对象用于读文件
		private static SequenceFile.Reader reader = null;
		//定义配置信息
		private static Configuration conf = null;
		//定义最终输出的value
		private Text outValue = new Text();
		
		/**
		 * 在setUp()函数中初始化相关对象
		 */
		@Override
		protected void setup(Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
			try {
				// 创建配置信息
				conf = new Configuration();
				// 创建文件系统
				//FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
				// 创建Path对象
				Path path = new Path(INPUT_PATH);
				// 创建SequenceFile.Reader对象
				reader = new SequenceFile.Reader(fileSystem, path, conf);
				
			} catch (Exception e) {
				e.printStackTrace();
			}
 
		}
 
		@Override
		protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
			
			if (!"".equals(key.toString())  && !"".equals(value.get())){
				
				//设置输出的value
				outValue.set(new String(value.getBytes(), 0, value.getLength()));
				//把结果写出去
				context.write(key, outValue);
			}				
		}
	}
}

 

4.使用CombineFileInputFormat

具体查看:MapReduce基于CombineFileInputFormat处理海量小文件

 

 

原文转自:https://blog.csdn.net/lzm1340458776/article/details/43410731