欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop数据压缩总结

程序员文章站 2022-07-15 22:24:06
...

觉得有帮助的,请多多支持博主,点赞关注哦~

Hadoop数据压缩

一、概述

压缩技术能够有效减少底层存储系统(HDFS)读写字节数,提高了网络带宽和磁盘空间的效率。

在 Hadoop 下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下,I/O 操作和网络数据传输要花大量的时间。还有,Shuffle与 Merge 过程同样也面临着巨大的 I/O 压力。

鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O 和网络传输非常有帮助。不过,尽管压缩与解压操作的 CPU 开销不高,其性能的提升和资源的节省并非没有代价。
如果磁盘 I/O 和网络带宽影响了 MapReduce 作业性能,在任意 MapReduce 阶段启用压缩都可以改善端到端处理时间并减少 I/O 和网络流量。

压缩 Mapreduce 的一种优化策略:通过压缩编码对 Mapper 或者 Reducer 的输出进行压缩,以减少磁盘 IO,提高 MR 程序运行速度(但相应增加了 cpu 运算负担)。

注意:压缩特性运用得当能提高性能,但运用不当也可能降低性能。

基本原则:
(1)运算密集型的 job,少用压缩
(2)IO 密集型的 job,多用压缩

二、MR 支持的压缩编码

1、压缩格式

压缩格式 hadoop 自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFAULT 是,直接使用 DEFAULT .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFAULT .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

2、编码/解码器

为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器,如下表所示

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

3、压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
Snappy 250 MB/s 500 MB/s

三、压缩方式选择

Bzip2:Map输出结果
Lzo、Snappy:shuffle中间阶段结果
Gzip:reduce输出结果

1、Gzip 压缩

优点:

  • 压缩率比较高,而且压缩/解压速度也比较快;
  • hadoop 本身支持,在应用中处理gzip 格式的文件就和直接处理文本一样;大部分 linux 系统都自带 gzip 命令,使用方便。

缺点:

  • 不支持 split。

应用场景:

  • 当每个文件压缩之后在 130M 以内的(1 个块大小内),都可以考虑用 gzip压缩格式。

例如说一天或者一个小时的日志压缩成一个 gzip 文件,运行 mapreduce 程序的时候通过多个 gzip 文件达到并发。hive 程序,streaming 程序,和 java 写的 mapreduce 程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。

2、Bzip2 压缩

优点:

  • 支持 split;
  • 具有很高的压缩率,比 gzip 压缩率都高;
  • hadoop 本身支持,但不支持 native;
  • 在 linux 系统下自带 bzip2 命令,使用方便。

缺点:

  • 压缩/解压速度慢;
  • 不支持 native。

应用场景:

  • 适合对速度要求不高,但需要较高的压缩率的时候,可以作为 mapreduce 作业的输出格式;
  • 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;
  • 或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持 split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。

3、Lzo 压缩

优点:

  • 压缩/解压速度也比较快,合理的压缩率;
  • 支持 split,是 hadoop 中最流行的压缩格式;
  • 可以在 linux 系统下安装 lzop 命令,使用方便。

缺点:

  • 压缩率比 gzip 要低一些;
  • hadoop 本身不支持,需要安装;
  • 在应用中对 lzo 格式的文件需要做一些特殊处理(为了支持 split 需要建索引,还需要指定 inputformat 为 lzo 格式)。

应用场景:

  • 一个很大的文本文件,压缩之后还大于 200M 以上的可以考虑,而且单个文件越大,lzo 优点越越明显。

4、Snappy 压缩

优点:

  • 高速压缩速度和合理的压缩率。

缺点:

  • 不支持 split;
  • 压缩率比 gzip 要低;
  • hadoop 本身不支持,需要安装;

应用场景:

  • 当 Mapreduce 作业的 Map 输出的数据比较大的时候,作为 Map 到 Reduce的中间数据的压缩格式;或者作为一个 Mapreduce 作业的输出和另外一个 Mapreduce 作业的输入。

四、压缩位置选择

压缩可以在 MapReduce 作用的任意阶段启用。

阶段 说明
Map输入端采用压缩 在有大量数据并计划重复处理的情况下,应考虑对输入进行压缩。
Hadoop自动检查文件扩展名,根据扩展名自动选择编码方式对文件进行压缩和解压。(bzip2支持分片)
Map输出端采用压缩 当map任务输出的中间数据量很大时。
能提升Shuffle性能。
可选择LZO或者Snappy.
Reduce输入端采用压缩 能减少要存储的数据量。
降低所需的磁盘空间。(gzip不支持分片)

五、压缩位置选择

要在 Hadoop 中启用压缩,可以配置如下参数:

参数 默认值 阶段 建议
io.compression.codecs
(在 core-site.xml 中配置)
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.BZip2Codec
输入压缩 Hadoop 使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress
(在 mapred-site.xml中配置)
false mapper 输出 这个参数设为true 启用压缩
mapreduce.map.output.compress.codec
(在mapred-site.xml 中配置
org.apache.hadoop.io.compress.DefaultCodec mapper 输出 使用LZO 或snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress
(在mapred-site.xml 中配置)
false reducer 输出 这个参数设为true 启用压缩
mapreduce.output.fileoutputformat.compress.codec
(在mapred-site.xml 中配置)
org.apache.hadoop.io.compress.DefaultCodec reducer 输出 使用标准工具或者编解码器,如gzip 和bzip2
mapreduce.output.fileoutputformat.compress.type
(在mapred-site.xml 中配置)
RECORD reducer 输出 SequenceFile输出使用的压缩类型:NONE和BLOCK
# 案例
  $HADOOP_HOME/etc/hadoop/core-site.xml

  <property>
     <name>io.compression.codecs</name>
     <value>org.apache.hadoop.io.compress.GzipCodec</value>
  </property>

六、压缩实操案例

1、数据流的压缩和解压缩

CompressionCodec 有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩,我们可以使用 createOutputStream(OutputStreamout)方法创建一个 CompressionOutputStream,将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用 createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据。

测试一下如下压缩方式:
DEFLATE --> org.apache.hadoop.io.compress.DefaultCodec
gzip --> org.apache.hadoop.io.compress.GzipCodec
bzip2 --> org.apache.hadoop.io.compress.BZip2Codec

package com.biubiubiu.dataYasuo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;


public class TestCompress {
    public static void main(String[] args) throws Exception {
       	compress("d_in/customer2.txt","d_in/customer2gz","org.apache.hadoop.io.compress.GzipCodec");
//		decompress("d_in/customer2gz.gz","d_in/customer2gznew.txt");
    }

    // 压缩
    /*
       filename:源文件
       outpath:压缩后的文件
       method:压缩方式(写完整路径 )org.apache.hadoop.io.compress.GzipCodec
     */
    private static void compress(String filename, String outPath,String method) throws Exception {
        // 1 获取输入流
        FileInputStream fis = new FileInputStream(new File(filename));
           //基于反射得到编码对象
        Class codecClass = Class.forName(method);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());

        // 2 获取输出流
        FileOutputStream fos = new FileOutputStream(new File(outPath +codec.getDefaultExtension()));
        CompressionOutputStream cos = codec.createOutputStream(fos);

        // 3 流的对拷
        IOUtils.copyBytes(fis, cos, 1024*1024*5, false);

        // 4 关闭资源
        fis.close();
        cos.close();
        fos.close();
    }

    // 解压缩
    private static void decompress(String filename,String outPath) throws FileNotFoundException, IOException {

        // 1 校验是否能解压缩
        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
        CompressionCodec codec = factory.getCodec(new Path(filename));
        if (codec == null) {
            System.out.println("cannot find codec for file " + filename);
            return;
        }

        // 2 获取输入流
        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));

        // 3 获取输出流
        FileOutputStream fos = new FileOutputStream(new File(outPath ));

        // 4 流的对拷
        IOUtils.copyBytes(cis, fos, 1024*1024*5, false);

        // 5 关闭资源
        cis.close();
        fos.close();
    }

}

2、Map 输出端采用压缩

即使你的 MapReduce 的输入输出文件都是未压缩的文件,你仍然可以对 map 任务的中间结果输出做压缩,
因为它要写在硬盘并且通过网络传输到 reduce 节点,对其压缩可以提高很多性能,这些工作只要设置两个属性

hadoop 源码支持的压缩格式有:BZip2Codec 、DefaultCodec
//1)创建Configuration对象,指明namespace的路径
Configuration conf = new Configuration();
conf.set("dfs.defaultFS","hdfs://192.168.159.151:9000");
//开启map端输出数据压缩
conf.setBoolean("mapreduce.map.output.compress",true);
//设置map端输出数据压缩格式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

3、Reduce 输出端采用压缩

//开启reduce端输出数据压缩
FileOutputFormat.setCompressOutput(job,true);
//设置reduce端输出数据压缩格式
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
//FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

基于 workcount 案例处理

package com.biubiubiu.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    /**
     * 数据压缩
     *
     * 第1部分:自定义Mapper(静态内部类)
     *  hello,aa,b,c
     * d,e,f
     * aa,bb,cc
     * dd,ee,ff
     * Mapper<四个参数>:
     *  1)前两个是输入参数:行号,当前行的内容
     *     Object,Text
     *  2)后两个是输出参数
     *     hello   1
     *
     *  结论:都是(key,value)
     */
    static class CustomMapper extends Mapper<Object, Text,Text, IntWritable>{

        /**
         * aa,bb,cc  输出结果:aa 1
         *                   bb 1
         *                   cc 1
         * 参数说明:
         *   参数1:Object key:行号(很少用)
         *   参数2:Text value:行的内容(必须用)
         *   参数3:Context context:(上下文环境)
         */
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("map函数的输入值:"+key.toString()+"  "+value.toString());
            //1)得到当前行的内容
            String line=value.toString();
            //2)对当前行内容进行清理
            String [] words=line.split(",");
            //3)向context输出结果
            for(String word :words){
                context.write(new Text(word),new IntWritable(1));
                System.out.println("map函数的输出结果:"+word+" "+1);

            }

        }
    }

    /**第2部分:自定义Reducer(静态内部类)
     *  4个参数:
     *    1)前两个是输入参数(来源于map的输出)(因此reduce的输入参数=map的输出参数):
     *    2)后两个参数reduce的输出结果:
     *       hello 4
     *       a  5
    */
    static class CustomReducer extends Reducer<Text, IntWritable,Text,IntWritable>{

        /**reduce函数执行的次数,取决于key的个数
         * @param key   :hello
         * @param values :(1,1,1,1,1,1)
         * @param context
         * 输出结果(hello,6)
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //1)初始化变量
             int count =0;
             //2)迭代累加
            for(IntWritable value:values){
                count+=1;
            }
            //3)输出关于当前key的结果
            context.write(new Text(key.toString()),new IntWritable(count));
            System.out.println("*************reduce的输出结果:"+key.toString()+" "+count);
        }
    }

    //第3部分,编写Driver(main方法)
    public static void main(String[] args) {
       try{
           //1)创建Configuration对象,指明namespace的路径
           Configuration conf = new Configuration();
           conf.set("dfs.defaultFS","hdfs://192.168.159.151:9000");




           //开启map端输出数据压缩
           conf.setBoolean("mapreduce.map.output.compress",true);
           //设置map端输出数据压缩格式
           conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);





           //2)创建Job
           Job job =Job.getInstance(conf,"mywordcount");
           job.setJarByClass(WordCount.class);

           //3)自定义Mapper进行输出参数(key,value)的配置
           job.setMapperClass(CustomMapper.class);
           job.setMapOutputKeyClass(Text.class);
           job.setMapOutputValueClass(IntWritable.class);

           //4)自定义Reducer进行参数的配置
           job.setReducerClass(CustomReducer.class);
           job.setOutputKeyClass(Text.class);
           job.setOutputValueClass(IntWritable.class);

           //5)配置处理的文件的路径(input)以及处理结果存放的路径(output)
           FileInputFormat.addInputPath(job,new Path("data/word.txt"));
           FileOutputFormat.setOutputPath(job,new Path("data/dataYasuo/default"));




           //开启reduce端输出数据压缩
           FileOutputFormat.setCompressOutput(job,true);
           //设置reduce端输出数据压缩格式
           FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//           FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
//           FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);




           //6)让程序执行
           boolean result=job.waitForCompletion(true);
           if(result){
               System.out.println("执行正确!!!");
           }else{
               System.out.println("执行失败.....");
           }
       }catch(Exception ex){
           System.out.println("执行出错:"+ex.getMessage());
           ex.printStackTrace();
       }

    }
}

觉得有帮助的,请多多支持博主,点赞关注哦~