hadoop-->mapreduce编程统计手机流量并排序
程序员文章站
2024-03-08 10:42:22
...
首先看一下mapreduce的执行过程
可以看出数据的排序是在 map阶段这里我做了一个验证,在自定义排序规则的的时候,我们通常需要创建实体类
写入我们需要的数据属性,通过实现WritableComparable类
WritableComparable继承了Writable,Comparable,
Writable可以让bean属性序列化和反序列化,因此就必须重写writer , readFileds
Comparable里面有个compareTo方法 , 这个是我们自定义排序规则的关键因此需要重写他
从这个地方可以看出来map把数据读入程序中,然后利用我们自定义的排序规则进行排序。 (我找了半天源码这个compareTo是在那调用的,没找到。。。)
其实这个compareTo在reduce阶段又被调用了次,不过是已经排序好的数据。
贴上源码
package com.liang;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.MessageFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
class FlowBean implements WritableComparable<Object> {
private long upflow;
private long downflow;
private long sumflow;
private long phoneNB;
public FlowBean() {
}
public long getUpflow() {
return upflow;
}
/**
* @param upflow
*/
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setPhoneNB(long phoneNB) {
this.phoneNB = phoneNB;
}
public long getPhoneNB() {
return phoneNB;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
public FlowBean(long upflow, long downflow, long phoneNB) {
this.upflow = upflow;
this.downflow = downflow;
this.phoneNB = phoneNB;
this.sumflow = upflow + downflow;
}
@Override
public void write(DataOutput output) throws IOException {
output.writeLong(this.phoneNB);
output.writeLong(this.upflow);
output.writeLong(this.downflow);
output.writeLong(this.sumflow);
}
@Override
public void readFields(DataInput Input) throws IOException {
this.phoneNB = Input.readLong();
this.upflow = Input.readLong();
this.downflow = Input.readLong();
this.sumflow = Input.readLong();
}
@Override
public String toString() {
return this.phoneNB + "\t" + this.upflow + "\t" + this.downflow + "\t" + this.sumflow + "\t" + this.phoneNB;
}
@Override
public int compareTo(Object o) {
FlowBean fb = (FlowBean) o;
System.err.println(MessageFormat.format("liang->{0}>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>{1}", this.sumflow,
fb.sumflow));
if (this.sumflow == fb.sumflow) {
return 0;
} else {
return this.sumflow > fb.sumflow ? -1 : 1;
}
}
}
class MapWritable extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
System.err.print("map阶段 -------------------------------------->");
long upflow = Long.parseLong(fields[fields.length - 3]);
long downflow = Long.parseLong(fields[fields.length - 2]);
FlowBean fb = new FlowBean(upflow, downflow, Long.parseLong(fields[1]));
context.write(fb, NullWritable.get());
}
}
class ReduceWritable extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values,
Reducer<FlowBean, NullWritable, FlowBean, NullWritable>.Context context)
throws IOException, InterruptedException {
for (NullWritable n : values) {
System.err.println("---------------->" + n);
context.write(key, NullWritable.get());
}
}
}
public class MyDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
args = new String[] { "hdfs://192.168.2.130:9000/data/phone_data.txt", "hdfs://192.168.2.130:9000/output" };
Path path = new Path(args[1]);
// 删除hadoop文件
if (path.getFileSystem(conf).exists(path)) {
FileStatus[] fss = path.getFileSystem(conf).listStatus(path);
if (fss.length != 0) {
for (FileStatus file : fss) {
path.getFileSystem(conf).delete(file.getPath(), true);
}
}
path.getFileSystem(conf).delete(path, true);
}
// 2.加载jar包
job.setJarByClass(MyDriver.class);
// 3.关联map和reduce
job.setMapperClass(MapWritable.class);
job.setReducerClass(ReduceWritable.class);
// 4.设置最终输出类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
// 设置reduce数量
job.setNumReduceTasks(1);
// 5.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6.提交job任务
job.waitForCompletion(true);
}
}
参考:
https://www.cnblogs.com/edisonchou/p/4299085.html
在网上借鉴了许多别人的写法,自己总结一下。
上一篇: 解析java中volatile关键字