使用MapReduce统计每一个用户的使用总流量
程序员文章站
2022-03-16 10:57:26
...
有上图这样的文件,需要统计每个用户使用的上行总流量,下行总流量和总流量
第一步:创建一个用户类如下:
package com.zut.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String tel;
private long up;
private long dw;
private long total;
public Flow() {
super();
}
public Flow(long up, long dw, long total) {
super();
this.up = up;
this.dw = dw;
this.total = total;
}
public Flow(String tel, long up, long dw, long total) {
super();
this.tel = tel;
this.up = up;
this.dw = dw;
this.total = total;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public long getUp() {
return up;
}
public void setUp(long up) {
this.up = up;
}
public long getDw() {
return dw;
}
public void setDw(long dw) {
this.dw = dw;
}
public long getTotal() {
return total;
}
public void setTotal(long total) {
this.total = total;
}
@Override
public String toString() {
return up + "\t" + dw + "\t" + total;
}
@Override
public void readFields(DataInput in) throws IOException {
tel = in.readUTF();
up = in.readLong();
dw = in.readLong();
total = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeLong(up);
out.writeLong(dw);
out.writeLong(total);
}
}
2.创建map
package com.zut.flow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowMap extends Mapper<LongWritable, Text, Text, Flow>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Flow>.Context context)
throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
int length = splits.length;
String tel = splits[1];
long up = Long.parseLong(splits[length-3]);
long dw = Long.parseLong(splits[length-2]);
long total = up + dw;
context.write(new Text(tel), new Flow(tel,up, dw, total));
}
}
3.创建reduce
package com.zut.flow;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.log.Log4Json;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReduce extends Reducer<Text, Flow, Text, Flow>{
@Override
protected void reduce(Text key, Iterable<Flow> value, Reducer<Text, Flow, Text, Flow>.Context context)
throws IOException, InterruptedException {
long up = 0;
long dw = 0;
for (Flow flow : value) {
up += flow.getUp();
dw += flow.getDw();
}
long total = up + dw;
context.write(key, new Flow(key.toString(), up, dw, total));
}
}
4.创建主运行类
package com.zut.flow;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowApp {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(Flow.class);
job.setMapperClass(FlowMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setReducerClass(FlowReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
5.导出jar包(一直点击下一步,到最后一步,一定要注意)
6. 将jar包利用FileZilla导入虚拟机,将要分析的文件上传到HDFS上
7.运行
8.查看结果
上一篇: php手册中中括号是什么意思
下一篇: JAVA感悟