MapReduce案例-统计手机号总流量
程序员文章站
2022-06-29 21:55:57
...
map方法和reduce方法都是循环调用的
map方法---每行数据调用一次
reduce方法---每个KV调用一次
只执行一次的代码写在setup和cleanup中
统计每个手机号的总流量
数据格式:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
需求:统计每个手机号的总流量
数据为多个手机号访问不同网站消耗的流量
每个手机号对应多个网站
第二位: 手机号
倒数第二位: 下行流量
倒数第三位: 上行流量
public class Flow {
/**
* Map阶段计算每行数据---手机号对应网站总流量---以Key Value的形式输出
* key 手机号
* value 手机号对应网站总流量
*
* Reduce阶段会获取每个手机号对应的多个站点总流量 手机号<总流量1,总流量2...>
* 最终输出key 手机号 value 总流量
*
* 泛型类型:
* LongWritable 每行数据的偏移量
* Text 行数据
* Text 手机号
* LongWritable 总流量
*/
static class FLowMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text k = new Text();
LongWritable v = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
//将Text类型转为String类型 , 进行数据切分
//行数据: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
String line = value.toString();
//按照空格进行切分
String[] split = line.split("\\s+");
//1号索引是手机号
String phone = split[1];
//上行流量
Long upFlow = Long.parseLong(split[split.length - 3]);
//下行流量
Long downFlow = Long.parseLong(split[split.length - 2]);
//给k和v赋值,输出
k.set(phone);
v.set(upFlow + downFlow);
context.write(k, v);
} catch (Exception e) {
//将错误行打印出来
System.out.println(value.toString());
}
}
}
static class FlowReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//一个手机号一个迭代器---手机号 <总流量1 , 总流量2 , 总流量3>
Long totalFlow = 0L;
for (LongWritable value : values) {
totalFlow += value.get();
}
//k是手机号---v是总流量
v.set(totalFlow);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
//获取配置对象
Configuration conf = new Configuration();
//获取Job对象
Job job = Job.getInstance(conf, "flow");
//设置实现Mapper的类和实现Reducer的类
job.setMapperClass(FLowMapper.class);
job.setReducerClass(FlowReducer.class);
//设置Map阶段的输出的KV数据类型
//如果Map阶段和Reducer阶段KV数据类型一致---可以省略一个不写
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置Reducer阶段也是最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置文件输入输出
FileInputFormat.setInputPaths(job, new Path("D://mrdata/flow/input/flow.log"));
FileOutputFormat.setOutputPath(job, new Path("D://mrdata/flow/input/out"));
提交任务,等待任务执行完毕
job.waitForCompletion(true);
}
}