欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SequenceFile文件原理及使用

程序员文章站 2024-03-15 15:41:35
...

介绍

SequenceFile是hadoop里用来存储序列化的键值对即二进制的一种文件格式。SequenceFile文件也可以作为MapReduce作业的输入和输出,hive和spark也支持这种格式。
它有如下几个优点:

  1. 以二进制的形式存储数据,所以可以在HDFS里存储图像或者更加复杂的结构作为KV对。
  2. SequenceFile支持压缩分片。当你压缩为一个SequenceFile时,并不是将整个文件压缩成一个单独的单元,而是压缩文件里的record或者block of records(块)。因此SequenceFile是能够支持分片的,尽管使用的压缩方式如Snappy, Lz4 or Gzip不支持分片。
  3. SequenceFile也可以用于存储多个小文件。由于Hadoop本身就是用来处理大型文件的,所以用一个SequenceFile来存储很多小文件就可以提高处理效率,也能节省Namenode内存,因为Namenode只需一个SequenceFile的metadata,而不是为每个小文件创建单独的metadata。
  4. 由于数据是以SequenceFile形式存储,所以中间输出文件即map输出也会用SequenceFile来存储。

Sync points(同步点)

SequenceFile文件里每隔100 bytes会记录一个sync-marker,基于这些sync pointsSequenceFile就是可以切片的并用作MapReduce的输入。

SequenceFile的压缩形式

SequenceFile有三种压缩选择

  1. NONE ---- 即key和value都不压缩。
  2. RECORD ---- 只有value会被压缩。
  3. BLOCK ---- key和value都会被压缩。key和value都会以block的形式单独收集和压缩。block的大小是可以配置的,在core-site,xml里定义io.seqfile.compress.blocksize,默认是1000000 bytes。

压缩的好处:减少磁盘占用、减少IO、减少shuffle的带宽。

SequenceFile的文件格式

尽管有三种压缩选择,但是header的格式都是一样的。

SequenceFile文件的header格式

SequenceFile文件原理及使用

字段 说明
version 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
KeyClassName key class
ValueClassName value class
Compression A boolean which specifies if compression is turned on for keys/values in this file
BlockCompression CompressionCodec class which is used for compression of keys and/or values (if compression is enabled)
Metadata SequenceFile.Metadata for this file
Sync A sync marker to denote end of the header

无压缩的SequenceFile文件格式

SequenceFile文件原理及使用
sync-marker ---- 每100 bytes一个sync-marker

Record压缩的SequenceFile文件格式

SequenceFile文件原理及使用
sync-marker ---- 每100 bytes一个sync-marker

存储格式如下
SequenceFile文件原理及使用

Block压缩的SequenceFile文件格式

  • Header
  • Record Block
    • Uncompressed number of records in the block
    • Compressed key-lengths block-size
    • Compressed key-lengths block
    • Compressed keys block-size
    • Compressed keys block
    • Compressed value-lengths block-size
    • Compressed value-lengths block
    • Compressed values block-size
    • Compressed values block
  • A sync-marker every block
    SequenceFile文件原理及使用

SequenceFile的相关类

SequenceFile提供了SequenceFile.Writer, SequenceFile.Reader 和 SequenceFile.Sorter用来写、读及排序数据。
由于有三种压缩选择,故SequenceFile Writer有以下三类:

  • Writer : Uncompressed records.
  • RecordCompressWriter : Record-compressed files, only compress values.
  • BlockCompressWriter : Block-compressed files, both keys & values are compressed.

通常用静态方法SequenceFile.createWriter去创建Writer,创建时指定Writer.Option来确定是否要压缩及选择哪种方式压缩。

Java API读写SequenceFile

将HDFS某个目录下所有的小文件合并成一个SequenceFile文件(写)

以小文件的文件路径作为key,小文件的内容作为value。然后将其写入到SequenceFile文件。

public static void writeHDFSFileTOHDFSSequenceFile() throws IOException {
    String inputDir = "/user/root/input";
    Path path = new Path(inputDir);
    //最后生成的sequenceFile文件
    Path outFile = new Path("/user/root/sequecenfile/out/seq_file");
    //声明一个byte[]用于后面存放小文件内容
    byte[] buffer;
    //获取inputDir目录下的所有文件
    FileStatus[] fileStatusArr = fs.listStatus(path);
    //构造writer, 并使用try获取资源, 最后自动关闭资源
    try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,
            SequenceFile.Writer.file(outFile),//设置文件名
            SequenceFile.Writer.keyClass(Text.class),//设置keyclass
            SequenceFile.Writer.valueClass(Text.class),//设置valueclass
            SequenceFile.Writer.appendIfExists(false),
            SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()) //设置block+gzip的压缩方式
    )){
        //循环外定义key和value,避免重复定义,因为序列化时只是会序列化对应的内容
        Text key = new Text();
        Text value = new Text();
        for (FileStatus fileStatus : fileStatusArr){
            System.out.println("the file name is "+fileStatus.getPath());
            //利用FileSystem打开文件
            FSDataInputStream fsDataIn = fs.open(fileStatus.getPath());
            //根据文件大小来定义byte[]的长度
            buffer = new byte[((int) fileStatus.getLen())];
            //将文件内容读入到buffer这个byte[]里
            fsDataIn.read(buffer);
            key.set(fileStatus.getPath().toString());
            value.set(buffer);
            //通过append方法写入到SequenceFile
            writer.append(key, value);
        }

    }

}

将本地某个目录下所有的小文件合并成一个SequenceFile文件(写)

public static void writeLocalFileToHDFSSequenceFile() throws IOException {
    String inputDir = "/root/test";
    java.nio.file.Path localDir = Paths.get(inputDir);
    Path outFile = new Path("/user/root/sequecenfile/out/seq_file");
    //构造writer, 并使用try获取资源, 最后自动关闭资源
    try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,
            SequenceFile.Writer.file(outFile),//设置文件名
            SequenceFile.Writer.keyClass(Text.class),//设置keyclass
            SequenceFile.Writer.valueClass(Text.class),//设置valueclass
            SequenceFile.Writer.appendIfExists(false),
            SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()) //设置block+gzip的压缩方式
    )){
        Text key = new Text();
        Text value = new Text();

//            Collection<File> fileList = FileUtils.listFiles(new File(inputDir),null,false);
        try(Stream<java.nio.file.Path> stream = Files.list(localDir)){

            //遍历所有以.txt结尾的小文件写入到SequenceFile
            stream.filter(x->x.toString().endsWith(".txt")).forEach(localPath->{
                try {
                    System.out.println(localPath.toString());
                    byte[] bytes = Files.readAllBytes(localPath);
                    key.set(localPath.toString());
                    value.set(bytes);
                    writer.append(key, value);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.err.println("read "+localPath.toString()+" error");
                }
            });
        }

    }

}

读HDFS上SequenceFile文件的内容(读)

通过SequenceFile.Reader的构造方法并指定参数创建Reader来读取SequenceFile文件

public static void readSequenceFile() throws IOException {
    Path seqPath = new Path("/user/root/sequecenfile/out/seq_file");
    try(SequenceFile.Reader reader = new SequenceFile.Reader(conf,
            SequenceFile.Reader.file(seqPath), //设置SequenceFile文件路径
            SequenceFile.Reader.bufferSize(1024*8)) //设置bufferSize
    ){
        Text key = new Text();
        Text value = new Text();
        while (reader.next(key, value)){
            System.out.println("------------key="+key+"----------");
            System.out.println(value);
            System.out.println("-------------------------------");
        }
    }
}

将其打包成具体的jar文件后执行hadoop jar sequenceFile.jar com.utstar.patrick.hadoop.data.format.SequenceFileDemo
其中sequenceFile.jar就是打包的jar名,com.utstar.patrick.hadoop.data.format.SequenceFileDemo就是需要运行的main类。
部分结果如下图所示
SequenceFile文件原理及使用

使用SequenceFile作为MapReduce的输入和输出

使用SequenceFile作为MapReduce的输入

SequenceFile文件作为输入很简单,只需要在driver代码里设置job的inputformat即可。

job.setInputFormatClass(SequenceFileInputFormat.class);

我们以hadoop提供的example为例,我通过-Dproperty=value的方式设置了inputformat来读取上面生成的SequenceFile文件

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar wordcount -Dmapreduce.job.inputformat.class=org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat /user/root/sequecenfile/out/seq_file /out_99

能正常的进行单词统计功能,如下图所示
SequenceFile文件原理及使用

使用SequenceFile作为MapReduce的输出

driver代码里设置如下参数即可,需要强调的是要通过SequenceFileOutputFormat.setOutputCompressionType方式来说明SequenceFile选取哪种压缩方式。

//设置outputformat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

FileInputFormat.addInputPath(job, new Path("/user/root/input"));
FileOutputFormat.setOutputPath(job, new Path(outpath));
//设置压缩
FileOutputFormat.setCompressOutput(job, true);
//设置用哪种算法进行压缩
FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class);
//必须通过SequenceFileOutputFormat.setOutputCompressionType来指定SequenceFile文件的压缩类型
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

顺带提下小文件的处理方式

hadoop自带有3种小文件解决方式

Hadoop Archive虽然解决了小文件占用namenode内存的问题,但是提交任务时还是一个小文件一个task;
CombineFileInputFormat是一个抽象类,需要继承该类实现对应的抽象方法public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,比较麻烦;
所以解决小文件更好的方式还是 SequenceFile

提到小文件顺便提下小job,对于小型job可以开启jvm重用让每个jvm跑几个task,这样可以减少进程关闭和创建的开销,能有效提高小job的运行效率。
mapreduce.job.jvm.numtasks的默认值是1

mapreduce.job.jvm.numtasks=10

但是对于大型job,该参数就不合适了,因为在长期运行的job里会有内存碎片,最好是重新创建jvm,而且创建jvm的时间和整个job的运行时间相比就很微不足道了。说白了,这都是一个trade-off的过程。

参考网址

sequence-file-hadoop
how-to-read-and-write-sequencefile-in-hadoop
what-is-sequencefileinputformat-in-hadoop-mapreduce
reuse-jvm-in-hadoop-mapreduce-jobs