大数据Hadoop之MR GroupingComparator辅助排序案例实操
程序员文章站
2022-04-28 16:29:58
...
1.需求
有如下订单数据
现在需要求出每一个订单中最贵的商品。
(1)输入数据
0000001 Pdt_01 222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
(2)期望输出数据
1 222.8
2 722.4
3 232.8
2.需求分析
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如图所示。
3.代码实现
OrderBean(封装订单id和价格,比较时还运用到了二次排序):
package com.mapreduce.groupcomparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean>{
private int orderId; // 订单id
private double price; // 商品价格
public OrderBean() {
}
public OrderBean(int orderId, double price) {
this.orderId = orderId;
this.price = price;
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
orderId = in.readInt();
price = in.readDouble();
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(orderId);
out.writeDouble(price);
}
// 比较器,二次排序
@Override
public int compareTo(OrderBean o) {
if(this.orderId > o.getOrderId()) {
return 1;
} else if (this.orderId < o.getOrderId()) {
return -1;
} else {
return (this.price > o.getPrice()) ? -1 : 1;
}
}
@Override
public String toString() {
return orderId + "\t" + price;
}
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
Mapper:
package com.mapreduce.groupcomparator;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1. 读取数据
String line = value.toString();
// 2. 分割数据
String[] fields = line.split("\t");
// 3. 封装对象
k.setOrderId(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));
// 4. 写出
context.write(k, NullWritable.get());
}
}
Reduce:
package com.mapreduce.groupcomparator;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
@Override
protected void reduce(OrderBean k, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
for(NullWritable n : values) {
context.write(k, NullWritable.get());
}
}
}
OrderSortGroupingComparator(分组规则):
package com.mapreduce.groupcomparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderSortGroupingComparator extends WritableComparator {
protected OrderSortGroupingComparator() {
super(OrderBean.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if(aBean.getOrderId() > bBean.getOrderId()) {
result = 1;
} else if(aBean.getOrderId() < bBean.getOrderId()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
Driver(注意第8步):
package com.mapreduce.groupcomparator;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OrderDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, Exception {
// 初始化args
args = new String[] { "D:\\hadoop-2.7.1\\winMR\\OrderGroupComparator\\input",
"D:\\hadoop-2.7.1\\winMR\\OrderGroupComparator\\output2" };
// 1. 获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar
job.setJarByClass(OrderDriver.class);
// 3. 关联map和reduce
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4. 设置map输出的kv类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5. 设置最终输出的kv类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 8. 设置reduce端的分组
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
// 6. 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交job
job.waitForCompletion(true);
}
}
4. 运行结果:
下一篇: jquery之off()方法