大数据Hadoop之MR TopN案例
程序员文章站
2022-03-08 07:54:07
...
1.需求
对输入数据进行加工,输出流量使用量在前10的用户信息
(1)输入数据
13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548
(2)输出数据
13509468723 7335 110349 117684
13975057813 11058 48243 59301
13568436656 3597 25635 29232
13736230513 2481 24681 27162
18390173782 9531 2412 11943
13630577991 6960 690 7650
15043685818 3659 3538 7197
13992314666 3008 3720 6728
15910133277 3156 2936 6092
13560439638 918 4938 5856
2.需求分析
这个案例的实现可以是在map端也可以是在reduce端单独实现,我们以map端为例。在map端创建一个TreeMap集合,每次将封装好的数据以FlowBean对象为key,以手机号为value存入TreeMap集合。我们每次要判断集合的数据是否超过了十条,如果超过了,就要将总流量最小的那个元素移出去,保证TreeMap结合的元素不能大于10。最后,我们在setup阶段将数据写出。
3.实现代码
FlowBean类:
package com.mrreview.topn;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean> {
private int upFlow;
private int downFlow;
private int sumFlow;
public FlowBean() {
}
public FlowBean(int upFlow, int downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.sumFlow = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public int compareTo(FlowBean o) {
return -(this.sumFlow - o.sumFlow);
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
Mapper类:
package com.mrreview.topn;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TopNMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
// 定义一个TreeMap作为存储数据的容器(天然按key排序)
private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
private FlowBean kBean;
private Text v;
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
kBean = new FlowBean();
v = new Text();
// 1. 获取一行数据
String line = value.toString();
// 2. 切割
String[] fields = line.split("\t");
// 3. 封装数据
String phoneNum = fields[0];
int upFlow = Integer.parseInt(fields[1]);
int downFlow = Integer.parseInt(fields[2]);
int sumFlow = Integer.parseInt(fields[3]);
kBean.setUpFlow(upFlow);
kBean.setDownFlow(downFlow);
kBean.setSumFlow(sumFlow);
v.set(phoneNum);
// 4. 向TreeMap中添加数据
flowMap.put(kBean, v);
// 5. 限制TreeMap的数据量,超过10条就删除流量最小的一条
if(flowMap.size() > 10) {
flowMap.remove(flowMap.lastKey());
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 6. 遍历集合输出数据
Iterator<FlowBean> bean = flowMap.keySet().iterator();
while(bean.hasNext()) {
FlowBean k = bean.next();
context.write(flowMap.get(k), k);
}
}
}
Driver:
package com.mrreview.topn;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class TopNDriver {
public static void main(String[] args) throws Exception {
args = new String[] {"D:\\hadoop-2.7.1\\winMR\\TopN\\input", "D:\\hadoop-2.7.1\\winMR\\TopN\\output"};
Job job = Job.getInstance();
job.setJarByClass(TopNDriver.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 只用到map要将reduce的数量设置为0
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
4. 运行结果:
上一篇: OSGi For Web Demo(含Java源码)
下一篇: 【算法】旋转数组【LeetCode】
推荐阅读
-
大数据Hadoop之KeyValueTextInputFormat使用案例
-
大数据Hadoop之MR自定义排序 全排序案例实操
-
hadoop详细笔记(十五) mapreduce数据分析案例之高效topN
-
大数据Hadoop之MR Partition分区案例
-
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客案例
-
大数据Hadoop之MR自定义排序 区内排序案例实操
-
大数据Hadoop之MR Combiner案例实操
-
大数据Hadoop之MR GroupingComparator辅助排序案例实操
-
MR案例 之 重复数据处理
-
大数据Hadoop之Hadoop序列化案例实操