欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据Hadoop之MR自定义排序 全排序案例实操

程序员文章站 2022-04-28 16:38:09
...

前言: MapReduce默认会对key进行字典序排列,但是在一些情况下我们需要按照某种

方式进行排序,所以要自定义排序。

1.需求

根据案例FlowCountBean产生的结果再次对总流量进行排序。

FlowCountBean的案例:https://blog.csdn.net/qq_43437122/article/details/106173182

(1)输入数据

原始数据 ----------------------第一次处理后的数据

(2)期望输出数据,就是总流量较大的放到前面

13509468723	7335	110349	117684
13736230513	2481	24681	27162
13956435636	132		1512	1644
13846544121	264		0		264

2.需求分析

大数据Hadoop之MR自定义排序 全排序案例实操

3. 代码实现

(1)FlowBean对象在在需求1基础上增加了比较功能

package com.mapreduce.fcwritablecomparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FCBeanWritableComparable implements WritableComparable<FCBeanWritableComparable>{

	private int upFlow; 
	private int downFlow;
	private int sumFlow;
	
	public FCBeanWritableComparable() {

	}
	
	public FCBeanWritableComparable(int upFlow, int downFlow) {
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		this.upFlow = in.readInt();
		this.downFlow = in.readInt();
		this.sumFlow = in.readInt();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(upFlow);
		out.writeInt(downFlow);
		out.writeInt(sumFlow);
	}

	@Override
	public int compareTo(FCBeanWritableComparable o) {
		int result;
		
		// 按照总流量大小,倒序排序
		if(sumFlow > o.getSumFlow()) {
			result = -1;
		} else if(sumFlow < o.getSumFlow()) {
			result = 1;
		} else {
			result = 0;
		}
		return result;
	}

	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	public int getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(int upFlow) {
		this.upFlow = upFlow;
	}

	public int getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(int downFlow) {
		this.downFlow = downFlow;
	}

	public int getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(int sumFlow) {
		this.sumFlow = sumFlow;
	}
}

(2)Mapper

package com.mapreduce.fcwritablecomparable;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FCMapper extends Mapper<LongWritable, Text,
	FCBeanWritableComparable, Text>{
	
	FCBeanWritableComparable fc = new FCBeanWritableComparable();
	Text p = new Text();
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		// 1. 获取数据
		String line = value.toString();
		
		// 2. 切分数据
		String[] fileds = line.split("\t");
		
		String phone = fileds[0];
		int upFlow = Integer.parseInt(fileds[1]);
		int downFlow = Integer.parseInt(fileds[2]);
		
		// 3. 封装对象
		fc.setUpFlow(upFlow);
		fc.setDownFlow(downFlow);
		fc.setSumFlow(upFlow + downFlow); 
		p.set(phone);
		
		// 4. 写出
		context.write(fc, p);
	}
}

(3)Reducer

package com.mapreduce.fcwritablecomparable;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FCReducer extends Reducer<FCBeanWritableComparable, Text, Text, FCBeanWritableComparable>{
	@Override
	protected void reduce(FCBeanWritableComparable k, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// 遍历写出,避免出现总流量相同的情况
		for (Text text : values) {
			context.write(text, k);
		}
	}
}

(4)Driver

package com.mapreduce.fcwritablecomparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FCDriver {
	public static void main(String[] args) throws Exception {
		args = new String[] {"D:\\hadoop-2.7.1\\winMR\\FCBeanWritableComparable\\input", "D:\\hadoop-2.7.1\\winMR\\FCBeanWritableComparable\\output1"};
		// 1. 创建job实例
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		// 2. 设置jar
		job.setJarByClass(FCDriver.class);
		
		// 3. 关联map和reduce
		job.setMapperClass(FCMapper.class);
		job.setReducerClass(FCReducer.class);
		
		// 4. 设置map的输出的kv类型
		job.setMapOutputKeyClass(FCBeanWritableComparable.class);
		job.setMapOutputValueClass(Text.class);
		
		// 5. 设置最终的输入输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FCBeanWritableComparable.class);
		
		// 6. 设置输入的路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7. 提交job
		job.waitForCompletion(true);
	}
}

4. 实验结果

大数据Hadoop之MR自定义排序 全排序案例实操