Hadoop学习(十五)---hadoop 的数据压缩
1.Hadoop的三个阶段
- HDFS阶段分布式文件系统
- Mapreducer阶段分布式编程框架
- yarn阶段 mr阶段的运行资源调度框架
2.Hadoop的数据压缩技术
在mr阶段要进行大量的数据运输,压缩技术大大减轻了数据传输的压力。
压缩技术提高了网络带宽和磁盘空间的效率,节省资源,也是mr的优化策略,通过对压缩编码对mapper或者reducer数据传输进行数据的压缩,以减少磁盘io流。
3.压缩的基本原则
运算密集型任务少用压缩
io密集型的任务,多用压缩
4.mr支持的压缩编码
压缩格式 是否hadoop自带 文件扩展名 是否可以切分
DEFAULT 是 .deflate 否
Gzip 是 .gz 否
bzip2 是 .bz2 是
lzo 是 .lzo 是
snappy 是 .snappy 否
5.编码 解码器
DEFAULT|org.apache.hadoop.io.compress.DefaultCodeC
Gzip|org.apache.hadoop.io.compress.GzipCodeC
bzip2|org.apache.hadoop.io.compress.BZip2CodeC
LZO|com.hadoop.compression.lzo.LzoCodeC
Snappy|org.apache.hadoop.io.compress.SnappyCodeC
6.压缩性能
压缩算法 | 原始文件大小 | 压缩文件大小| 压缩速度| 解压速度
gzip | 8.3GB |1.8GB|17,5MB/s|58MB/s
bzip2| 8.3GB |1.1GB|2.4MB/s |9.5MB/s
LZO | 8.3gb |2.9GB|49.3MB/s|74.6MB/s
7.使用方式
map端输出压缩
//开启map端的输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
//设置压缩方式
//conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.c
lass, CompressionCodec.class);
conf.setClass("mapreduce.map.output.compress.codec",
BZip2Codec.class, CompressionCodec.class);
reduce端输出压缩
//开启reduce端的输出压缩
FileOutputFormat.setCompressOutput(job, true);
//设置压缩方式
//FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
//FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
package src.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//conf.setBoolean("mapreduce.map.output.compress", true);
//conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);
//conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
//2.获取jar包
job.setJarByClass(WordCountDriver.class);
//3.获取自定义的mapper与reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置reduce输出的数据类型(最终的数据类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//用combiner来实现对数据的优化传输,在对map阶段结束完,向reducer传进的数据进行小量的reducer,再汇总输出。
job.setCombinerClass(WordCountReducer.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
//6.设置输入存在的路径与处理后的结果路径
FileInputFormat.setInputPaths(job, new Path("f://wc//in"));
FileOutputFormat.setOutputPath(job, new Path("f://wc//out"));
//7.提交任务
boolean rs = job.waitForCompletion(true);
System.out.println(rs?0:1);
}
}
上一篇: 两个数组的交集
下一篇: flush 方法小用和pid()方法