欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop-->mapreduce编程统计手机流量并排序

程序员文章站 2024-03-08 10:42:22
...

首先看一下mapreduce的执行过程
hadoop-->mapreduce编程统计手机流量并排序

可以看出数据的排序是在 map阶段这里我做了一个验证,在自定义排序规则的的时候,我们通常需要创建实体类
写入我们需要的数据属性,通过实现WritableComparable类

hadoop-->mapreduce编程统计手机流量并排序

WritableComparable继承了Writable,Comparable,

hadoop-->mapreduce编程统计手机流量并排序

Writable可以让bean属性序列化和反序列化,因此就必须重写writer , readFileds

hadoop-->mapreduce编程统计手机流量并排序

Comparable里面有个compareTo方法 , 这个是我们自定义排序规则的关键因此需要重写他

hadoop-->mapreduce编程统计手机流量并排序

从这个地方可以看出来map把数据读入程序中,然后利用我们自定义的排序规则进行排序。 (我找了半天源码这个compareTo是在那调用的,没找到。。。)

hadoop-->mapreduce编程统计手机流量并排序

其实这个compareTo在reduce阶段又被调用了次,不过是已经排序好的数据。

hadoop-->mapreduce编程统计手机流量并排序

贴上源码

package com.liang;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.MessageFormat;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

class FlowBean implements WritableComparable<Object> {
	private long upflow;
	private long downflow;
	private long sumflow;
	private long phoneNB;

	public FlowBean() {

	}

	public long getUpflow() {
		return upflow;
	}

	/**
	 * @param upflow
	 */
	public void setUpflow(long upflow) {
		this.upflow = upflow;
	}

	public long getDownflow() {
		return downflow;
	}

	public void setPhoneNB(long phoneNB) {
		this.phoneNB = phoneNB;
	}

	public long getPhoneNB() {
		return phoneNB;
	}

	public void setDownflow(long downflow) {
		this.downflow = downflow;
	}

	public long getSumflow() {
		return sumflow;
	}

	public void setSumflow(long sumflow) {
		this.sumflow = sumflow;
	}

	public FlowBean(long upflow, long downflow, long phoneNB) {
		this.upflow = upflow;
		this.downflow = downflow;
		this.phoneNB = phoneNB;
		this.sumflow = upflow + downflow;
	}

	@Override
	public void write(DataOutput output) throws IOException {
		output.writeLong(this.phoneNB);
		output.writeLong(this.upflow);
		output.writeLong(this.downflow);
		output.writeLong(this.sumflow);
	}

	@Override
	public void readFields(DataInput Input) throws IOException {
		this.phoneNB = Input.readLong();
		this.upflow = Input.readLong();
		this.downflow = Input.readLong();
		this.sumflow = Input.readLong();
	}

	@Override
	public String toString() {
		return this.phoneNB + "\t" + this.upflow + "\t" + this.downflow + "\t" + this.sumflow + "\t" + this.phoneNB;
	}

	@Override
	public int compareTo(Object o) {
		FlowBean fb = (FlowBean) o;

		System.err.println(MessageFormat.format("liang->{0}>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>{1}", this.sumflow,
				fb.sumflow));
		if (this.sumflow == fb.sumflow) {
			return 0;
		} else {
			return this.sumflow > fb.sumflow ? -1 : 1;
		}
	}

}

class MapWritable extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
			throws IOException, InterruptedException {

		String line = value.toString();
		String[] fields = line.split(" ");
		
		System.err.print("map阶段 -------------------------------------->");
		
		long upflow = Long.parseLong(fields[fields.length - 3]);
		long downflow = Long.parseLong(fields[fields.length - 2]);

		FlowBean fb = new FlowBean(upflow, downflow, Long.parseLong(fields[1]));

		context.write(fb, NullWritable.get());
	}

}

class ReduceWritable extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {
	@Override
	protected void reduce(FlowBean key, Iterable<NullWritable> values,
			Reducer<FlowBean, NullWritable, FlowBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		for (NullWritable n : values) {
			System.err.println("---------------->" + n);
			context.write(key, NullWritable.get());
		}
	}
}

public class MyDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1.获取job信息
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		args = new String[] { "hdfs://192.168.2.130:9000/data/phone_data.txt", "hdfs://192.168.2.130:9000/output" };

		Path path = new Path(args[1]);

		// 删除hadoop文件
		if (path.getFileSystem(conf).exists(path)) {

			FileStatus[] fss = path.getFileSystem(conf).listStatus(path);
			if (fss.length != 0) {
				for (FileStatus file : fss) {
					path.getFileSystem(conf).delete(file.getPath(), true);
				}
			}
			path.getFileSystem(conf).delete(path, true);
		}

		// 2.加载jar包
		job.setJarByClass(MyDriver.class);

		// 3.关联map和reduce
		job.setMapperClass(MapWritable.class);
		job.setReducerClass(ReduceWritable.class);

		// 4.设置最终输出类型
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(FlowBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 设置reduce数量
		job.setNumReduceTasks(1);
		// 5.设置输入和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 6.提交job任务
		job.waitForCompletion(true);
	}
}

参考:
https://www.cnblogs.com/edisonchou/p/4299085.html

https://blog.csdn.net/reasery/article/details/82875815?utm_medium=distribute.pc_relevant_bbs_down.none-task-blog-baidujs-1.nonecase&depth_1-utm_source=distribute.pc_relevant_bbs_down.none-task-blog-baidujs-1.nonecase

在网上借鉴了许多别人的写法,自己总结一下。