大数据Hadoop之MR自定义排序 全排序案例实操
程序员文章站
2022-04-28 16:38:09
...
前言: MapReduce默认会对key进行字典序排列,但是在一些情况下我们需要按照某种
方式进行排序,所以要自定义排序。
1.需求
根据案例FlowCountBean产生的结果再次对总流量进行排序。
FlowCountBean的案例:https://blog.csdn.net/qq_43437122/article/details/106173182
(1)输入数据
原始数据 ----------------------第一次处理后的数据
(2)期望输出数据,就是总流量较大的放到前面
13509468723 7335 110349 117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
2.需求分析
3. 代码实现
(1)FlowBean对象在在需求1基础上增加了比较功能
package com.mapreduce.fcwritablecomparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FCBeanWritableComparable implements WritableComparable<FCBeanWritableComparable>{
private int upFlow;
private int downFlow;
private int sumFlow;
public FCBeanWritableComparable() {
}
public FCBeanWritableComparable(int upFlow, int downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.sumFlow = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public int compareTo(FCBeanWritableComparable o) {
int result;
// 按照总流量大小,倒序排序
if(sumFlow > o.getSumFlow()) {
result = -1;
} else if(sumFlow < o.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
}
(2)Mapper
package com.mapreduce.fcwritablecomparable;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FCMapper extends Mapper<LongWritable, Text,
FCBeanWritableComparable, Text>{
FCBeanWritableComparable fc = new FCBeanWritableComparable();
Text p = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// 1. 获取数据
String line = value.toString();
// 2. 切分数据
String[] fileds = line.split("\t");
String phone = fileds[0];
int upFlow = Integer.parseInt(fileds[1]);
int downFlow = Integer.parseInt(fileds[2]);
// 3. 封装对象
fc.setUpFlow(upFlow);
fc.setDownFlow(downFlow);
fc.setSumFlow(upFlow + downFlow);
p.set(phone);
// 4. 写出
context.write(fc, p);
}
}
(3)Reducer
package com.mapreduce.fcwritablecomparable;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FCReducer extends Reducer<FCBeanWritableComparable, Text, Text, FCBeanWritableComparable>{
@Override
protected void reduce(FCBeanWritableComparable k, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 遍历写出,避免出现总流量相同的情况
for (Text text : values) {
context.write(text, k);
}
}
}
(4)Driver
package com.mapreduce.fcwritablecomparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FCDriver {
public static void main(String[] args) throws Exception {
args = new String[] {"D:\\hadoop-2.7.1\\winMR\\FCBeanWritableComparable\\input", "D:\\hadoop-2.7.1\\winMR\\FCBeanWritableComparable\\output1"};
// 1. 创建job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar
job.setJarByClass(FCDriver.class);
// 3. 关联map和reduce
job.setMapperClass(FCMapper.class);
job.setReducerClass(FCReducer.class);
// 4. 设置map的输出的kv类型
job.setMapOutputKeyClass(FCBeanWritableComparable.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置最终的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FCBeanWritableComparable.class);
// 6. 设置输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交job
job.waitForCompletion(true);
}
}
4. 实验结果
上一篇: python基础教学入门