大数据私房菜--Hadoop架构(三)--MapReduce离线计算框架
文章内容输出来源:拉勾教育大数据高薪训练营
1.HADOOP框架
-
大数据技术解决的是什么问题?
大数据技术解决的主要是海量数据的存储和计算。 -
Apache Hadoop的重要组成
Hadoop=HDFS(分布式文件系统)+MapReduce(分布式计算框架)+Yarn(资源协调框架)+Common模块
- Hadoop HDFS:(Hadoop Distribute File System )一个高可靠、高吞吐量的分布式文件系统
- Hadoop MapReduce:一个分布式的离线并行计算框架
- Hadoop YARN:作业调度与集群资源管理的框架
2. Mapreduce总结
分布式计算
HDFS解决了数据存储问题
运算 = 并发计算 + 汇总结果
并行:并行的进行相同的任务
并发:一个并发任务输入进来,多个机器并发进行一个任务的子任务
3. Mapreduce知识框架
4. MapReduce分布式计算框架
4.1 MapReduce组成部分
MapReduce程序的代码由三个部分组成:
- Mapper类:继承org.apache.hadoop.mapreduce.Mapper重写map方法
- Reduce类:继承org.apache.hadoop.mapreduce.Reducer重写reduce方法
- 运行作业的驱动类(Driver)
4.2 hadoop序列化
序列化主要是我们通过网络通信传输数据时或者把对象持久化到文件,需要把对象序列化成二进制的结
构。
为什么Hadoop要选择建立自己的序列化格式而不使用java自带serializable?
- 序列化在分布式程序中非常重要,在Hadoop中,集群中多个节点的进程间的通信是通过RPC(远
程过程调用:Remote Procedure Call)实现;RPC将消息序列化成二进制流发送到远程节点,远
程节点再将接收到的二进制数据反序列化为原始的消息,因此RPC往往追求如下特点:- 紧凑:数据更紧凑,能充分利用网络带宽资源
- 快速:序列化和反序列化的性能开销更低
- Hadoop使用的是自己的序列化格式Writable,它比java的序列化serialization更紧凑速度更快。一
个对象使用Serializable序列化后,会携带很多额外信息比如校验信息,Header,继承体系等。
Java基本类型与Hadoop常用序列化类型
Java基本类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
4.3 MapReduce编程规范
4.3.1 mapper
- 用户自定义一个Mapper类继承Hadoop的Mapper类
- Mapper的输入数据是KV对的形式(类型可以自定义)
- Map阶段的业务逻辑定义在map()方法中
- Mapper的输出数据是KV对的形式(类型可以自定义)
注:map()方法是对输入的一个KV对调用一次!!
4.3.2 reducer类
- 用户自定义Reducer类要继承Hadoop的Reducer类
- Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
- Reducer的业务逻辑写在reduce()方法中
- Reduce()方法是对相同K的一组KV对调用执行一次
4.3.3 Driver类
创建提交YARN集群运行的Job对象,其中封装了MapReduce程序运行所需要的相关参数入输入数据路
径,输出数据路径等,也相当于是一个YARN集群的客户端,主要作用就是提交我们MapReduce程序运
行。
4.3.4 Wrodcount编写
Map阶段:
- map()方法中把传入的数据转为String类型
- 根据空格切分出单词
- 输出<单词,1>
Reduce阶段:
- 汇总各个key(单词)的个数,遍历value数据进行累加
- 输出key的总数
Driver
- 获取配置文件对象,获取job对象实例
- 指定程序jar的本地路径
- 指定Mapper/Reducer类
- 指定Mapper输出的kv数据类型
- 指定最终输出的kv数据类型
- 指定job处理的原始数据路径
- 指定job输出结果路径
- 提交作业
4.3.4.1 Mapper编写
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
final String line = value.toString();
final String[] words = line.split(" ");
for (final String word : words) {
k.set(word);
context.write(k, v);
}
}
}
4.3.4.2 Reducer编写
public class WordcountReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(final Text key, final Iterable<IntWritable> values, final Context context) throws IOException, InterruptedException {
sum = 0;
for (final IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
4.3.4.3 Driver编写
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "WordcountDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(WordcountDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/wc/");
Path outputPath = new Path("E:/output/wc/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
4.3.4.4 运行作业
输出结果:
4.4 序列化Writable接口
基本序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么该对
象就需要实现Writable序列化接口
4.4.1 自定义序列化Writable步骤
- 继承writable接口
public class PartitionBean implements Writable{
}
- 定义属性
定义bean的属性
// 定义属性
private String id; //日志id
private String deviceId; //设备id
private String appkey; //appkey厂商id
private String ip; //ip地址
private Long selfDuration; //自有内容播放时长
private Long thirdPartDuration; //第三方内容时长
private String status; //状态码
- 生成 Getter and Setter方法
generate --> Getter and Setter
- 重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
....
}
- 编写反序列方法
@Override
public void readFields(DataInput in) throws IOException {
....
}
注:反序列化的字段顺序和序列化字段的顺序必须完全一致
- 空参构造
反序列化时,需要反射调用空参构造函数,所以必须有空参构造
’generate -> constructor > 不选字段
public PartitionBean() {
}
- 有参构造
’generate -> constructor > 不选字段
public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
this.id = id;
this.deviceId = deviceId;
this.appkey = appkey;
this.ip = ip;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.status = status;
}
- 重写toString方法
注:如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为因
为MapReduce框中的Shuffle过程要求对key必须能排序!!
4.4.2 Writable接口案例
1.需求
统计每台智能音箱设备内容播放时长
原始日志格式
001 001577c3 kar_890809 120.196.100.99 1116 954 200
日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码
输出格式
001577c3 11160 9540 20700
设备id 自有内容时长(秒) 第三方内容时长(秒) 总时长
2.需求分析
源数据筛选三个已有字段和计算总时长:
map阶段:<LongWritable, Text> >> <Text, Speakbean>
在map阶段以设备id作为k,将自有内容时长(秒)和第三方内容时长(秒)封装为一个speakbean对象,并将speakbean对象作为map阶段的输出v
reduce阶段:<Text, Speakbean> >> <Text, Speakbean>
在reduce阶段我们从map阶段和shuffle阶段之后获取的是数据格式大致如下:
<设备id, <speakbean1, speakbean2, speakbean3…>>
在reduce阶段我们获取到相同的设备id的不同日志的多条数据,这样我们可以实现将所有的时长进行累加,并封装到speak对象当中作为reduce的v,最终输出为一个设备id的总体情况。
3.具体实现
3.1自定义bean格式,创建SpeakBean对象
public class SpeakBean implements Writable {
// 1.定义属性
private Long selfDuration; //自有内容播放时长
private Long thirdPartDuration; //第三方内容时长
private Long sumDuration; //第三方内容时长
// 空参构造
public SpeakBean() {
}
// 有参构造
public SpeakBean(Long selfDuration, Long thirdPartDuration) {
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.sumDuration = this.selfDuration + this.thirdPartDuration;
}
// 2.生成Getter 和 Setter方法
public Long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(final Long selfDuration) {
this.selfDuration = selfDuration;
}
public Long getThirdPartDuration() {
return thirdPartDuration;
}
public void setThirdPartDuration(final Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}
public Long getSumDuration() {
return sumDuration;
}
public void setSumDuration(final Long sumDuration) {
this.sumDuration = sumDuration;
}
// 重写序列化方法
@Override
public void write(final DataOutput out) throws IOException {
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeLong(sumDuration);
}
// 重写反序列方法
@Override
public void readFields(final DataInput in) throws IOException {
this.selfDuration = in.readLong();
this.thirdPartDuration = in.readLong();
this.sumDuration = in.readLong();
}
public void set(long selfDuration, long thirdPartDuration) {
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.sumDuration=this.selfDuration+this.thirdPartDuration;
}
// 重写toString方法
@Override
public String toString() {
return
selfDuration + "\t" +
thirdPartDuration + "\t" +
sumDuration
;
}
}
3.2 编写Mapper类
public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
SpeakBean v = new SpeakBean();
Text k = new Text();
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");
// 3 封装对象
String deviceId = fields[1];
long selfDuration = Long.parseLong(fields[fields.length - 3]);
long thirdPartDuration = Long.parseLong(fields[fields.length - 2]);
k.set(deviceId);
v.set(selfDuration, thirdPartDuration);
// 4 写mapper输出的<K,V>
context.write(k, v);
}
}
3.3 编写reducer类
public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {
@Override
protected void reduce(final Text key, final Iterable<SpeakBean> values, final Context context) throws IOException, InterruptedException {
long self_Duration = 0;
long thirdPart_Duration = 0;
// 1 遍历所用bean,将其中的自有,第三方时长分别累加
for (SpeakBean sb : values) {
self_Duration += sb.getSelfDuration();
thirdPart_Duration += sb.getThirdPartDuration();
}
// 2 封装对象
SpeakBean resultBean = new SpeakBean(self_Duration, thirdPart_Duration);
// 3 写reducer输出的<K,V>
context.write(key, resultBean);
}
}
3.4 编写driver类
public class SpeakDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "SpeakDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(SpeakDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(SpeakMapper.class);
job.setReducerClass(SpeakReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SpeakBean.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(SpeakBean.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/speak/");
Path outputPath = new Path("E:/output/speak/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
运行结果:
4.5 MapReduce原理分析
4.5.1 MapTask
MapTask运行流程
详细步骤:
-
首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文
件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关
系默认是一对一。 -
将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n
作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行
文本内容。
如果一个split切片最后没有读到\n分隔符,默认会去下一个分片找\n
- 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。
RecordReader读取一行这里调用一次。 - map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先
对其进行分区处理,默认使用HashPartitioner。
注:MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对
输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的
取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到
job上。
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结
果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之
前,key与value值都会被序列化成字节数组。
- 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信
息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概
念。 - 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以
需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘
写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写
map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例
spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill
percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map
task的输出结果还可以往剩下的20MB内存中写,互不影响。
- 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!
- 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的
value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整
个模型中会多次使用。
- 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果
map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当
整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入
磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
4.5.2 ReduceTask
ReduceTask运行流程
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。
-
copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据。
如果设置有Combiner,也是会启用的。
-
待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行
合并排序,把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。 -
对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个
或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
ReduceTask并行度
ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定
不同,ReduceTask数量的决定是可以直接手动设置:
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);
注意:
- ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
- ReduceTask数量不设置默认就是一个,输出文件数量为1个;
- 如果数据分布不均匀,可能在Reduce阶段产生倾斜;
- ReduceTask数目的影响:
- ReduceTask >= Partition:生成输出文件数目=ReduceTask数目
- ReduceTask < Partition:报错
- ReduceTask=1:只有一个输出文件;ReduceTask=0:相当于没有Reduce
4.5.3 Shuffle机制
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫
shuffle。即MapTask的map方法之后到Reduce方法之前的数据处理称为shuffle
4.5.4 Partitioner分区器
重写Partition分区器具体步骤:
- 自定义类继承Partitioner,重写getPartition()方法
- 在Driver驱动中,指定使用自定义Partitioner
- 在Driver驱动中,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask数量。
需求:按照不同的appkey把记录输出到不同的分区中
原始日志格式
001 001577c3 kar_890809 120.196.100.99 1116 954 200
日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码
输出结果
根据appkey把不同厂商的日志数据分别输出到不同的文件中
Mapper
- 读取一行文本,按照制表符切分
- 解析出appkey字段,其余数据封装为PartitionBean对象(实现序列化Writable接口)
- 设计map()输出的kv,key–>appkey(依靠该字段完成分区),PartitionBean对象作为Value输出
Partition
自定义分区器,实现按照appkey字段的前缀来区分所属分区
Reduce
- reduce()正常输出即可,无需进行聚合操作
Driver
- 在原先设置job属性的同时增加设置使用自定义分区器
- 注意设置ReduceTask的数量(与分区数量保持一致)
Mapper
public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
// 重写Mapper方法
// generate -> Override methods
final PartitionBean bean = new PartitionBean();
final Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// super.map(key, value, context);
final String[] fields = value.toString().split("\t");
String appkey = fields[2];
bean.setId(fields[0]);
bean.setDeviceId(fields[1]);
bean.setAppkey(fields[2]);
bean.setIp(fields[3]);
bean.setSelfDuration(Long.parseLong(fields[4]));
bean.setThirdPartDuration(Long.parseLong(fields[5]));
bean.setStatus(fields[6]);
k.set(appkey);
context.write(k, bean);
}
}
PartitionBean
public class PartitionBean implements Writable {
//空参构造
// generate -> constructor
public PartitionBean() {
}
public PartitionBean(String id, String deviceId, String appkey, String ip, Long selfDuration, Long thirdPartDuration, String status) {
this.id = id;
this.deviceId = deviceId;
this.appkey = appkey;
this.ip = ip;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.status = status;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(deviceId);
out.writeUTF(appkey);
out.writeUTF(ip);
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeUTF(status);
}
// 反序列方法
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.deviceId = in.readUTF();
this.appkey = in.readUTF();
this.ip = in.readUTF();
this.selfDuration = in.readLong();
this.thirdPartDuration = in.readLong();
this.status = in.readUTF();
}
// 定义属性
private String id; //日志id
private String deviceId; //设备id
private String appkey; //appkey厂商id
private String ip; //ip地址
private Long selfDuration; //自有内容播放时长
private Long thirdPartDuration; //第三方内容时长
private String status; //状态码
// generate --> Getter and Setter
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAppkey() {
return appkey;
}
public void setAppkey(String appkey) {
this.appkey = appkey;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(Long selfDuration) {
this.selfDuration = selfDuration;
}
public Long getThirdPartDuration() {
return thirdPartDuration;
}
public void setThirdPartDuration(Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return "\t" + id +
"\t" + deviceId +
'\t' + appkey +
'\t' + ip +
'\t' + selfDuration +
"\t" + thirdPartDuration +
"\t" + status;
}
}
Reducer
public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
@Override
protected void reduce(final Text key, final Iterable<PartitionBean> values, final Context context) throws IOException, InterruptedException {
for (final PartitionBean bean : values) {
context.write(key, bean);
}
}
}
CustomPartitioner
public class CustomPartitioner extends Partitioner<Text, PartitionBean>{
@Override
public int getPartition(final Text text, final PartitionBean partitionBean, final int i) {
int partition;
if(text.toString().equals("kar")){
partition = 0;
}else if(text.toString().equals("pandora")){
partition = 1;
} else {
partition = 2;
}
return partition;
}
}
Driver
public class PartirionDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置文件
final Configuration conf = new Configuration();
// 2.获取Job
final Job job = Job.getInstance(conf);
// 3.设置任务相关参数
job.setJarByClass(PartirionDriver.class);
job.setMapperClass(PartitionMapper.class);
job.setReducerClass(PartitionReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PartitionBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PartitionBean.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path("E:/input/speak/speak.data"));
FileOutputFormat.setOutputPath(job, new Path("E:/output/partition/"));
// 7.提交任务
final boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}
输出结果
4.5.5 Combiner预聚合
- Combiner是MR程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer
- Combiner和reducer的区别在于运行的位置
- Combiner是在每一个maptask所在的节点运行;
- Combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
- Combiner能够应用的前提是不能影响最终的业务逻辑,此外,Combiner的输出kv应该跟reducer
的输入kv类型要对应起来。
4.5.6 MapReduce中的排序
排序是MapReduce框架中最重要的操作之一。
- MapTask
- 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲
区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上, - 溢写完毕后,它会对磁盘上所有文件进行归并排序。
- 它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲
- ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排
序。
排序的分类
-
部分排序.
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
-
全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在
处理大型文件时效率极低,因为- -台机器处理所有文件,完全丧失了MapReduce所提供的并行架
构。 -
辅助排序: ( GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。 -
二次排序.
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
4.5.6.1 key.compareTo排序
Bean对象如果作为Map输出的key时,需要实现WritableComparable接口并重写compareTo方法指定
排序规则。
在MapTask的溢写过程中会对Mapper的输出<K, v>进行Key.compareTo排序,我们通过WritableComparable接口重写Bean对象,重写compareTo方法指定排序规则。
全排序案例
基于统计的播放时长案例的输出结果对总时长进行排序
实现全局排序只能设置一个ReduceTask!!
播放时长案例输出结果:
00fdaf3 33180 33420 00fdaf3 66600
00wersa4 30689 35191 00wersa4 65880
0a0fe2 43085 44254 0a0fe2 87339
0ad0s7 31702 29183 0ad0s7 60885
0sfs01 31883 29101 0sfs01 60984
a00df6s 33239 36882 a00df6s 70121
adfd00fd5 30727 31491 adfd00fd5 62218
需求分析
key:把所有字段封装成为一个bean对象,并且指定bean对象作为key输出,如果作为key输出,需要实
现排序接口,指定自己的排序规则;
Mapper
- 读取结果文件,按照制表符进行切分
- 解析出相应字段封装为SpeakBean
- SpeakBean实现WritableComparable接口重写compareTo方法
- map()方法输出kv;key–>SpeakBean,value–>NullWritable.get()
Reducer
- 循环遍历输出
Mapper
public class SortMapper extends Mapper<LongWritable, Text, SpeakBean, NullWritable> {
final SpeakBean bean = new SpeakBean();
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
// 1.读取一行文本转为字符串切分
final String[] fieds = value.toString().split("\t");
// 2.解析出各个字段封装成speakbean对象
bean.setDeviceId(fieds[0]);
bean.setSelfDuration(Long.parseLong(fieds[1]));
bean.setThirdPartDuration(Long.parseLong(fieds[2]));
bean.setSumDuration(Long.parseLong(fieds[3]));
// 3.输出speakbean作为key输出
context.write(bean, NullWritable.get());
}
}
SpeakBean
public class SpeakBean implements WritableComparable<SpeakBean> {
public SpeakBean() {
}
public SpeakBean(final String deviceId, final Long selfDuration, final Long thirdPartDuration, final Long sumDuration) {
this.deviceId = deviceId;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.sumDuration = sumDuration;
}
// 定义属性
private String deviceId;//设备id
private Long selfDuration;//自有内容播放时长
private Long thirdPartDuration;//第三方内容时长
private Long sumDuration;//总时长
// 排序规则
@Override
public int compareTo(final SpeakBean o) {
// return int ->> 0, 1, -1
if(this.sumDuration > o.sumDuration){
return -1;
}else if (this.sumDuration < o.sumDuration){
return 1;
}else{
return 0;
}
}
// 序列化方法
@Override
public void write(final DataOutput out) throws IOException {
out.writeLong(selfDuration);
out.writeLong(thirdPartDuration);
out.writeUTF(deviceId);
out.writeLong(sumDuration);
}
// 反序列化
@Override
public void readFields(final DataInput in) throws IOException {
this.selfDuration = in.readLong();
this.thirdPartDuration = in.readLong();
this.deviceId = in.readUTF();
this.sumDuration = in.readLong();
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(final String deviceId) {
this.deviceId = deviceId;
}
public Long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(final Long selfDuration) {
this.selfDuration = selfDuration;
}
public Long getThirdPartDuration() {
return thirdPartDuration;
}
public void setThirdPartDuration(final Long thirdPartDuration) {
this.thirdPartDuration = thirdPartDuration;
}
public Long getSumDuration() {
return sumDuration;
}
public void setSumDuration(final Long sumDuration) {
this.sumDuration = sumDuration;
}
@Override
public String toString() {
return '\t' + deviceId +
'\t' + selfDuration +
'\t' + thirdPartDuration +
'\t' + sumDuration
;
}
}
Reducer
public class SortReducer extends Reducer <SpeakBean, NullWritable, SpeakBean, NullWritable> {
@Override
protected void reduce(final SpeakBean key, final Iterable<NullWritable> values, final Context context) throws IOException, InterruptedException {
for (final NullWritable value : values) {
context.write(key, value);
}
}
}
Driver
public class SortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "SortDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(SortDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(SpeakBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(SpeakBean.class);
job.setOutputValueClass(NullWritable.class);
Path inputPath = new Path("E:/output/speak/");
Path outputPath = new Path("E:/output/sort/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
输出结果:
4.5.6.2 GroupingComparator
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为
一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻
辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。
GroupingComparator案例
需求
原始数据
订单id | 商品id | 成交金额 |
---|---|---|
1 | Pdt_01 | 222.8 |
2 | Pdt_05 | 722.4 |
1 | Pdt_02 | 33.8 |
3 | Pdt_06 | 232.8 |
3 | Pdt_02 | 33.8 |
2 | Pdt_03 | 522.8 |
2 | Pdt_04 | 122.4 |
需要求出每一个订单中成交金额最大的一笔交易。
实现思路
Mapper
- 读取一行文本数据,切分出每个字段;
- 订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id
相等再按照金额降序排; - map()方法输出kv;key–>bean对象,value–>NullWritable.get();
Shuffle
- 指定分区器,保证相同订单id的数据去往同个分区(自定义分区器)
指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;
Reduce
每个reduce()方法写出一组key的第一个
GroupMapper
public class GroupMapper extends Mapper<LongWritable, Text, GroupBean, NullWritable> {
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
final String[] arr = value.toString().split("\t");
final GroupBean bean = new GroupBean(arr[0], Double.parseDouble(arr[2]));
context.write(bean, NullWritable.get());
}
}
GroupBean
public class GroupBean implements WritableComparable<GroupBean> {
private String orderID;
private Double price;
public GroupBean() {
}
public GroupBean(final String orderID, final Double price) {
this.orderID = orderID;
this.price = price;
}
@Override
public int compareTo(final GroupBean o) {
int i = this.orderID.compareTo(o.orderID);
if (i==0){
i = -this.price.compareTo(o.price);
}
return i;
}
@Override
public void write(final DataOutput out) throws IOException {
out.writeUTF(orderID);
out.writeDouble(price);
}
@Override
public void readFields(final DataInput in) throws IOException {
this.orderID = in.readUTF();
this.price = in.readDouble();
}
public String getOrderID() {
return orderID;
}
public void setOrderID(final String orderID) {
this.orderID = orderID;
}
public Double getPrice() {
return price;
}
public void setPrice(final Double price) {
this.price = price;
}
@Override
public String toString() {
return orderID + "\t" + price;
}
}
CustomGroupingComparator
public class CustomGroupingComparator extends WritableComparator {
//将我们自定义的OrderBean注册到我们自定义的CustomGroupIngCompactor当中来
// 表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
// 传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
public CustomGroupingComparator() {
super(GroupBean.class, true);
}
@Override
public int compare(final WritableComparable a, final WritableComparable b) {
GroupBean first = (GroupBean) a;
GroupBean second = (GroupBean) b;
final int i = first.getOrderID().compareTo(second.getOrderID());
if (i == 0){
System.out.println(first.getOrderID() + "&&" + second.getOrderID());
}
return i;
}
}
CustomPartitioner
public class CustomPartitioner extends Partitioner<GroupBean, NullWritable> {
@Override
public int getPartition(final GroupBean groupBean, final NullWritable nullWritable, final int i) {
return (groupBean.getOrderID().hashCode() & Integer.MAX_VALUE) % i;
}
}
GroupReducer
public class GroupReducer extends Reducer<GroupBean, NullWritable, GroupBean, NullWritable> {
@Override
protected void reduce(final GroupBean key, final Iterable<NullWritable> values, final Context context) throws IOException, InterruptedException {
System.out.println("Reducer Starting");
context.write(key, NullWritable.get());
}
}
GroupDriver
public class GroupDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "GroupDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(GroupDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(GroupBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(GroupBean.class);
job.setOutputValueClass(NullWritable.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/group");
Path outputPath = new Path("E:/output/group");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 指定分区器,指定分组比较器,设置reducetask数量
job.setPartitionerClass(CustomPartitioner.class);
job.setGroupingComparatorClass(CustomGroupingComparator.class);
job.setNumReduceTasks(2);
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
输出结果:
4.5.7 MapReduce读取和输出数据
4.5.7.1 InputFormat
InputFormat是MapReduce框架用来读取数据的类。
InputFormat常见子类包括:
- TextInputFormat (普通文本文件,MR框架默认的读取实现类型)
- KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
- NLineInputF ormat(读取数据按照行数进行划分分片)
- CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
- 自定义InputFormat
CombineTextInputFormat案例
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat切片原理
切片生成过程分为两部分:虚拟存储过程和切片过程
- 虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比
较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于
两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时
将文件均分成2个虚拟存储块(防止出现太小切片)。
比如如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的
块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文
件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
1.txt–>2M;2M<4M;一个块;
2.txt–>7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
3.txt–>0.3M;0.3<4M ,0.3M<4M ,一个块
4.txt–>8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
所有块信息:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个
切片。
如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,
则虚拟存储之后形成7个文件块,大小分别为:
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M最终会形成3个切片,大小分别为:
(2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
自定义InputFormat
需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的
key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为
key,文件内容为value。
结果
得到一个合并了多个小文件的SequenceFile文件
整体思路
- 定义一个类继承FileInputFormat
- 重写**isSplitable()指定为不可切分;重写createRecordReader()**方法,创建自己的
RecorderReader对象 - 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出;
- Driver指定使用的InputFormat类型
CustomFileInputformat
public class CustomFileInputformat extends FileInputFormat<Text, BytesWritable> {
//
@Override
public RecordReader<Text, BytesWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
CustomRecordReader recordReader = new CustomRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
@Override
protected boolean isSplitable(final JobContext context, final Path filename) {
return false;
}
}
CustomRecordReader
public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
private Configuration conf;
// 切片
private FileSplit split;
// 是否读取到内容标识符
private boolean isProgress = true;
// 输出<K, v>
private BytesWritable value = new BytesWritable();
private Text k = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//获取到文件切片以及配置文件对象
this.split = (FileSplit) split;
conf = context.getConfiguration();
}
// 读取数据
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区
byte[] contents = new byte[(int) split.getLength()];
FileSystem fs;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath(); // 获取切片的path信息
fs = path.getFileSystem(conf);// 获取文件系统对线
// 3 读取数据
fis = fs.open(path); //获取输入流
// 4 读取文件内容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 设置输出的value值
value.set(contents, 0, contents.length);
// 6 获取文件路径及名称
String name = split.getPath().toString();
System.out.println(name);
// 7 设置输出的key值
k.set(name);
} catch (Exception e) {
} finally {
IOUtils.closeStream(fis);
}
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
//返回value
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
// 读取状态
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
// 关闭资源
@Override
public void close() throws IOException {
}
}
sequenceMapper
// Text:代表一个文件的path名称,BytesWritable:文件内容
public class sequenceMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void map(final Text key, final BytesWritable value, final Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
sequenceReducer
public class sequenceReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(final Text key, final Iterable<BytesWritable> values, final Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
sequenceDriver
public class sequenceDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*
1. 获取配置文件对象,获取job对象实例
2. 指定程序jar的本地路径
3. 指定Mapper/Reducer类
4. 指定Mapper输出的kv数据类型
5. 指定最终输出的kv数据类型
6. 指定job处理的原始数据路径
7. 指定job输出结果路径
8. 提交作业
*/
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "sequenceDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(sequenceDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(sequenceMapper.class);
job.setReducerClass(sequenceReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CustomFileInputformat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/sequence/");
Path outputPath = new Path("E:/output/sequence/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
结果输出:
4.5.7.2 OutputFormat
OutputFormat:是MapReduce输出数据的基类,所有MapReduce的数据输出都实现了OutputFormat
抽象类。下面我们介绍几种常见的OutputFormat子类
- TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,
因为TextOutputFormat调用toString()方 法把它们转换为字符串。 - SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这是一种好的输出格式,
因为它的格式紧凑,很容易被压缩。
自定义OutputFormat
需求分析
要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类输出需求可以通过自定义
OutputFormat来实现。
实现步骤
- 自定义一个类继承FileOutputFormat。
- 改写RecordWriter,改写输出数据的方法write()。
需求
网络请求日志数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.lagou.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
输出结果
lagou.log
http://www.lagou.com
other.log
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
Mapper
package com.lagou.mr.output;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/11 17:37
* # @Author : Depa
* # @Version:V 0.1
**/
public class OutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
Reducer
package com.lagou.mr.output;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/11 17:39
* # @Author : Depa
* # @Version:V 0.1
**/
public class OutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(final Text key, final Iterable<NullWritable> values, final Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
CustomOutFormat
package com.lagou.mr.output;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/11 17:50
* # @Author : Depa
* # @Version:V 0.1
**/
public class CustomOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
final FileSystem fs = FileSystem.get(context.getConfiguration());
Path lagouLog= new Path("E:/output/outputformat/lagou.log");
Path otherLog = new Path("E:/output/outputformat/other.log");
final FSDataOutputStream lagouOut = fs.create(lagouLog);
final FSDataOutputStream otherOut = fs.create(otherLog);
return new CustomWriter(lagouOut, otherOut);
}
}
CustomWriter
package com.lagou.mr.output;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import java.awt.*;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/11 17:58
* # @Author : Depa
* # @Version:V 0.1
**/
public class CustomWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream lagouOut;
private FSDataOutputStream otherOut;
public CustomWriter(final FSDataOutputStream lagouOut, final FSDataOutputStream otherOut) {
this.lagouOut = lagouOut;
this.otherOut = otherOut;
}
@Override
public void write(final Text text, final NullWritable nullWritable) throws IOException, InterruptedException {
if (text.toString().contains("lagou")){
lagouOut.write(text.toString().getBytes());
lagouOut.write("\r\n".getBytes());
} else {
otherOut.write(text.toString().getBytes());
otherOut.write("\r\n".getBytes());
}
}
@Override
public void close(final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(lagouOut);
IOUtils.closeStream(otherOut);
}
}
OutputDriver
package com.lagou.mr.output;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* # _*_ coding: utf-8 _*_
* # @Time : 2020/7/11 19:21
* # @Author : Depa
* # @Version:V 0.1
**/
public class OutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 获取配置文件对象,获取job对象实例
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf, "OutputDriver");
// 2. 指定程序jar的本地路径
job.setJarByClass(OutputDriver.class);
// 3. 指定Mapper/Reducer类
job.setMapperClass(OutputMapper.class);
job.setReducerClass(OutputReducer.class);
// 4. 指定Mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 指定最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(CustomOutputFormat.class);
// 6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
Path inputPath = new Path("E:/input/outputformat/");
Path outputPath = new Path("E:/output/outputformat/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 6. 指定job输入数据路径
FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
// 7. 指定job输出结果路径
FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
// 8. 提交作业
final boolean flag = job.waitForCompletion(true);
//jvm退出:正常退出0,非0值则是错误退出
System.exit(flag ? 0 : 1);
}
}
输出结果
4.5.8 shuffle阶段数据的压缩机制
4.5.8.1 hadoop当中支持的压缩算法
数据压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输!!
Hadoop支持的压缩格式
压缩格式 | hadoop自带 | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 否,需要安装 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器
压缩格式 | 对应的编码/解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
性能对比
压缩算法 | 原始文件大小 | 压缩后的文件大小 | 压缩速度 | 解压缩速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO-bset | 8.3GB | 2GB | 4MB/s | 60.6MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
4.5.8.2 压缩位置
-
Map输入压缩
此处使用压缩文件作为Map的输入数据,无需显示指定编解码方式,Hadoop会自动检查文件扩展
名,如果压缩方式能够匹配,Hadoop就会选择合适的编解码方式对文件进行压缩和解压。 -
Map输出压缩
Shuffle是Hadoop MR过程中资源消耗最多的阶段,如果有数据量过大造成网络传输速度缓慢,可
以考虑使用压缩 -
Reduce输出压缩
输出的结果数据使用压缩能够减少存储的数据量,降低所需磁盘的空间,并且作为第二个MR的输
入时可以复用压缩。
4.5.8.3 压缩配置
-
在驱动代码中通过Configuration直接设置使用的压缩方式,可以开启Map输出和Reduce输出压缩
设置map阶段压缩 Configuration configuration = new Configuration(); configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.i o.compress.SnappyCodec"); 设置reduce阶段的压缩 configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD" ); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.ap ache.hadoop.io.compress.SnappyCodec");
-
配置mapred-site.xml(修改后分发到集群其它节点,重启Hadoop集群),此种方式对运行在集群的
所有MR任务都会执行压缩。<property> <name>mapreduce.output.fileoutputformat.compress</name> <value>true</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.type</name> <value>RECORD</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
4.5.8.4 压缩案例
需求
使用snappy压缩方式压缩WordCount案例的输出结果数据
具体实现
在驱动代码中添加压缩配置configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD"); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache .hadoop.io.compress.SnappyCodec");
重新打成jar包,提交集群运行,验证输出结果是否已进行了snappy压缩!!
文章内容输出来源:拉勾教育大数据高薪训练营
上一篇: C#学习日记12