欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用MapReduce统计每一个用户的使用总流量

程序员文章站 2022-03-16 10:57:26
...

使用MapReduce统计每一个用户的使用总流量

有上图这样的文件,需要统计每个用户使用的上行总流量,下行总流量和总流量

第一步:创建一个用户类如下:

package com.zut.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Flow implements Writable{
	private String tel;
	private long up;
	private long dw;
	private long total;

	public Flow() {
		super();
	}
	
	public Flow(long up, long dw, long total) {
		super();
		this.up = up;
		this.dw = dw;
		this.total = total;
	}
	public Flow(String tel, long up, long dw, long total) {
		super();
		this.tel = tel;
		this.up = up;
		this.dw = dw;
		this.total = total;
	}
	
	

	public String getTel() {
		return tel;
	}

	public void setTel(String tel) {
		this.tel = tel;
	}

	public long getUp() {
		return up;
	}

	public void setUp(long up) {
		this.up = up;
	}

	public long getDw() {
		return dw;
	}

	public void setDw(long dw) {
		this.dw = dw;
	}

	public long getTotal() {
		return total;
	}

	public void setTotal(long total) {
		this.total = total;
	}
	
	@Override
	public String toString() {
		return up + "\t" + dw + "\t" + total;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		tel = in.readUTF();
		up = in.readLong();
		dw = in.readLong();
		total = in.readLong();
		
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(tel);
		out.writeLong(up);
		out.writeLong(dw);
		out.writeLong(total);
		
	}
	
	

}

2.创建map

package com.zut.flow;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMap extends Mapper<LongWritable, Text, Text, Flow>{

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Flow>.Context context)
			throws IOException, InterruptedException {
		String[] splits = value.toString().split("\t");
		int length = splits.length;
		String tel = splits[1];
		long up = Long.parseLong(splits[length-3]);
		long dw = Long.parseLong(splits[length-2]);
		long total = up + dw;
		context.write(new Text(tel), new Flow(tel,up, dw, total));
		
		
	}
}

3.创建reduce

package com.zut.flow;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.log.Log4Json;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReduce extends Reducer<Text, Flow, Text, Flow>{
	@Override
	protected void reduce(Text key, Iterable<Flow> value, Reducer<Text, Flow, Text, Flow>.Context context)
			throws IOException, InterruptedException {
		long up = 0;
		long dw = 0;
		for (Flow flow : value) {
			up += flow.getUp();
			dw += flow.getDw();
		}
		long total = up + dw;
		context.write(key, new Flow(key.toString(), up, dw, total));
		
	}
}

4.创建主运行类

package com.zut.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowApp {
	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		job.setJarByClass(Flow.class);
		
		job.setMapperClass(FlowMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);
		
		job.setReducerClass(FlowReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Flow.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}

5.导出jar包(一直点击下一步,到最后一步,一定要注意)

使用MapReduce统计每一个用户的使用总流量使用MapReduce统计每一个用户的使用总流量

6. 将jar包利用FileZilla导入虚拟机,将要分析的文件上传到HDFS上

7.运行

使用MapReduce统计每一个用户的使用总流量

8.查看结果

使用MapReduce统计每一个用户的使用总流量

 

 

 

相关标签: MapReduce