SequenceFile文件原理及使用
文章目录
介绍
SequenceFile
是hadoop里用来存储序列化的键值对即二进制
的一种文件格式。SequenceFile
文件也可以作为MapReduce作业的输入和输出,hive和spark
也支持这种格式。
它有如下几个优点:
- 以二进制的形式存储数据,所以可以在HDFS里存储图像或者更加复杂的结构作为KV对。
-
SequenceFile
支持压缩
和分片
。当你压缩为一个SequenceFile
时,并不是将整个文件压缩成一个单独的单元,而是压缩文件里的record
或者block of records(块)
。因此SequenceFile
是能够支持分片
的,尽管使用的压缩方式如Snappy, Lz4 or Gzip
不支持分片。 -
SequenceFile
也可以用于存储多个小文件。由于Hadoop本身就是用来处理大型文件的,所以用一个SequenceFile
来存储很多小文件就可以提高处理效率,也能节省Namenode内存,因为Namenode只需一个SequenceFile
的metadata,而不是为每个小文件创建单独的metadata。 - 由于数据是以
SequenceFile
形式存储,所以中间输出文件即map输出也会用SequenceFile
来存储。
Sync points(同步点)
在SequenceFile
文件里每隔100 bytes会记录一个sync-marker
,基于这些sync points
,SequenceFile
就是可以切片的并用作MapReduce的输入。
SequenceFile的压缩形式
对SequenceFile
有三种压缩选择
-
NONE
---- 即key和value都不压缩。 -
RECORD
---- 只有value会被压缩。 -
BLOCK
---- key和value都会被压缩。key和value都会以block
的形式单独收集和压缩。block
的大小是可以配置的,在core-site,xml
里定义io.seqfile.compress.blocksize
,默认是1000000 bytes。
压缩的好处:减少磁盘占用、减少IO、减少shuffle的带宽。
SequenceFile的文件格式
尽管有三种压缩选择,但是header
的格式都是一样的。
SequenceFile文件的header格式
字段 | 说明 |
---|---|
version | 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6) |
KeyClassName | key class |
ValueClassName | value class |
Compression | A boolean which specifies if compression is turned on for keys/values in this file |
BlockCompression | CompressionCodec class which is used for compression of keys and/or values (if compression is enabled) |
Metadata | SequenceFile.Metadata for this file |
Sync | A sync marker to denote end of the header |
无压缩的SequenceFile文件格式
sync-marker
---- 每100 bytes一个sync-marker
Record压缩的SequenceFile文件格式
sync-marker
---- 每100 bytes一个sync-marker
存储格式如下
Block压缩的SequenceFile文件格式
- Header
- Record Block
- Uncompressed number of records in the block
- Compressed key-lengths block-size
- Compressed key-lengths block
- Compressed keys block-size
- Compressed keys block
- Compressed value-lengths block-size
- Compressed value-lengths block
- Compressed values block-size
- Compressed values block
- A sync-marker
every block
SequenceFile的相关类
SequenceFile
提供了SequenceFile.Writer, SequenceFile.Reader 和 SequenceFile.Sorter
用来写、读及排序数据。
由于有三种压缩选择,故SequenceFile Writer
有以下三类:
-
Writer
: Uncompressed records. -
RecordCompressWriter
: Record-compressed files, only compress values. -
BlockCompressWriter
: Block-compressed files, both keys & values are compressed.
通常用静态方法SequenceFile.createWriter
去创建Writer
,创建时指定Writer.Option
来确定是否要压缩及选择哪种方式压缩。
Java API读写SequenceFile
将HDFS某个目录下所有的小文件合并成一个SequenceFile文件(写)
以小文件的文件路径作为key,小文件的内容作为value。然后将其写入到SequenceFile
文件。
public static void writeHDFSFileTOHDFSSequenceFile() throws IOException {
String inputDir = "/user/root/input";
Path path = new Path(inputDir);
//最后生成的sequenceFile文件
Path outFile = new Path("/user/root/sequecenfile/out/seq_file");
//声明一个byte[]用于后面存放小文件内容
byte[] buffer;
//获取inputDir目录下的所有文件
FileStatus[] fileStatusArr = fs.listStatus(path);
//构造writer, 并使用try获取资源, 最后自动关闭资源
try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(outFile),//设置文件名
SequenceFile.Writer.keyClass(Text.class),//设置keyclass
SequenceFile.Writer.valueClass(Text.class),//设置valueclass
SequenceFile.Writer.appendIfExists(false),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()) //设置block+gzip的压缩方式
)){
//循环外定义key和value,避免重复定义,因为序列化时只是会序列化对应的内容
Text key = new Text();
Text value = new Text();
for (FileStatus fileStatus : fileStatusArr){
System.out.println("the file name is "+fileStatus.getPath());
//利用FileSystem打开文件
FSDataInputStream fsDataIn = fs.open(fileStatus.getPath());
//根据文件大小来定义byte[]的长度
buffer = new byte[((int) fileStatus.getLen())];
//将文件内容读入到buffer这个byte[]里
fsDataIn.read(buffer);
key.set(fileStatus.getPath().toString());
value.set(buffer);
//通过append方法写入到SequenceFile
writer.append(key, value);
}
}
}
将本地某个目录下所有的小文件合并成一个SequenceFile文件(写)
public static void writeLocalFileToHDFSSequenceFile() throws IOException {
String inputDir = "/root/test";
java.nio.file.Path localDir = Paths.get(inputDir);
Path outFile = new Path("/user/root/sequecenfile/out/seq_file");
//构造writer, 并使用try获取资源, 最后自动关闭资源
try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(outFile),//设置文件名
SequenceFile.Writer.keyClass(Text.class),//设置keyclass
SequenceFile.Writer.valueClass(Text.class),//设置valueclass
SequenceFile.Writer.appendIfExists(false),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()) //设置block+gzip的压缩方式
)){
Text key = new Text();
Text value = new Text();
// Collection<File> fileList = FileUtils.listFiles(new File(inputDir),null,false);
try(Stream<java.nio.file.Path> stream = Files.list(localDir)){
//遍历所有以.txt结尾的小文件写入到SequenceFile
stream.filter(x->x.toString().endsWith(".txt")).forEach(localPath->{
try {
System.out.println(localPath.toString());
byte[] bytes = Files.readAllBytes(localPath);
key.set(localPath.toString());
value.set(bytes);
writer.append(key, value);
} catch (IOException e) {
e.printStackTrace();
System.err.println("read "+localPath.toString()+" error");
}
});
}
}
}
读HDFS上SequenceFile文件的内容(读)
通过SequenceFile.Reader
的构造方法并指定参数创建Reader
来读取SequenceFile
文件
public static void readSequenceFile() throws IOException {
Path seqPath = new Path("/user/root/sequecenfile/out/seq_file");
try(SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(seqPath), //设置SequenceFile文件路径
SequenceFile.Reader.bufferSize(1024*8)) //设置bufferSize
){
Text key = new Text();
Text value = new Text();
while (reader.next(key, value)){
System.out.println("------------key="+key+"----------");
System.out.println(value);
System.out.println("-------------------------------");
}
}
}
将其打包成具体的jar文件后执行hadoop jar sequenceFile.jar com.utstar.patrick.hadoop.data.format.SequenceFileDemo
其中sequenceFile.jar
就是打包的jar名,com.utstar.patrick.hadoop.data.format.SequenceFileDemo
就是需要运行的main类。
部分结果如下图所示
使用SequenceFile作为MapReduce的输入和输出
使用SequenceFile作为MapReduce的输入
将SequenceFile
文件作为输入很简单,只需要在driver
代码里设置job的inputformat
即可。
job.setInputFormatClass(SequenceFileInputFormat.class);
我们以hadoop提供的example为例,我通过-Dproperty=value
的方式设置了inputformat
来读取上面生成的SequenceFile
文件
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar wordcount -Dmapreduce.job.inputformat.class=org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat /user/root/sequecenfile/out/seq_file /out_99
能正常的进行单词统计功能,如下图所示
使用SequenceFile作为MapReduce的输出
在driver
代码里设置如下参数即可,需要强调的是要通过SequenceFileOutputFormat.setOutputCompressionType
方式来说明SequenceFile选取哪种压缩方式。
//设置outputformat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("/user/root/input"));
FileOutputFormat.setOutputPath(job, new Path(outpath));
//设置压缩
FileOutputFormat.setCompressOutput(job, true);
//设置用哪种算法进行压缩
FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class);
//必须通过SequenceFileOutputFormat.setOutputCompressionType来指定SequenceFile文件的压缩类型
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
顺带提下小文件的处理方式
hadoop自带有3种小文件解决方式
-
Hadoop Archive
,具体可参考我写的这篇博客Hadoop Archive -
CombineFileInputFormat
,具体可参考官方给出的例子MultiFileWordCount.java SequenceFile
Hadoop Archive
虽然解决了小文件占用namenode内存的问题,但是提交任务时还是一个小文件一个task;CombineFileInputFormat
是一个抽象类,需要继承该类实现对应的抽象方法public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException
,比较麻烦;
所以解决小文件更好的方式还是 SequenceFile
提到小文件顺便提下小job,对于小型job可以开启jvm重用让每个jvm跑几个task,这样可以减少进程关闭和创建的开销,能有效提高小job的运行效率。mapreduce.job.jvm.numtasks
的默认值是1
mapreduce.job.jvm.numtasks=10
但是对于大型job,该参数就不合适了,因为在长期运行的job里会有内存碎片,最好是重新创建jvm,而且创建jvm的时间和整个job的运行时间相比就很微不足道了。说白了,这都是一个trade-off
的过程。
参考网址
sequence-file-hadoop
how-to-read-and-write-sequencefile-in-hadoop
what-is-sequencefileinputformat-in-hadoop-mapreduce
reuse-jvm-in-hadoop-mapreduce-jobs