欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce案例-统计手机号总流量

程序员文章站 2022-06-29 21:55:57
...

map方法和reduce方法都是循环调用的
map方法---每行数据调用一次
reduce方法---每个KV调用一次
只执行一次的代码写在setup和cleanup中
MapReduce案例-统计手机号总流量

统计每个手机号的总流量

数据格式:
1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
需求:统计每个手机号的总流量
     数据为多个手机号访问不同网站消耗的流量
     每个手机号对应多个网站
第二位: 手机号
倒数第二位: 下行流量
倒数第三位: 上行流量


 

public class Flow {
    /**
     * Map阶段计算每行数据---手机号对应网站总流量---以Key Value的形式输出
     * key 手机号
     * value 手机号对应网站总流量
     * 
     * Reduce阶段会获取每个手机号对应的多个站点总流量 手机号<总流量1,总流量2...>
     * 最终输出key 手机号 value 总流量
     * 
     * 泛型类型:
     * LongWritable             每行数据的偏移量
     * Text                     行数据
     * Text                     手机号
     * LongWritable             总流量
     */
    static class FLowMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Text k = new Text();
        LongWritable v = new LongWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                //将Text类型转为String类型 , 进行数据切分
                //行数据:     1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
                String line = value.toString();
                //按照空格进行切分
                String[] split = line.split("\\s+");
                //1号索引是手机号
                String phone = split[1];
                //上行流量
                Long upFlow = Long.parseLong(split[split.length - 3]);
                //下行流量
                Long downFlow = Long.parseLong(split[split.length - 2]);
                //给k和v赋值,输出
                k.set(phone);
                v.set(upFlow + downFlow);
                context.write(k, v);
            } catch (Exception e) {
                //将错误行打印出来
                System.out.println(value.toString());
            }
        }
    }


    static class FlowReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        LongWritable v = new LongWritable();

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            //一个手机号一个迭代器---手机号 <总流量1 , 总流量2 , 总流量3>
            Long totalFlow = 0L;
            for (LongWritable value : values) {
                totalFlow += value.get();
            }
            //k是手机号---v是总流量
            v.set(totalFlow);
            context.write(key, v);
        }
    }

    public static void main(String[] args) throws Exception {
        //获取配置对象
        Configuration conf = new Configuration();
        //获取Job对象
        Job job = Job.getInstance(conf, "flow");
        //设置实现Mapper的类和实现Reducer的类
        job.setMapperClass(FLowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //设置Map阶段的输出的KV数据类型
        //如果Map阶段和Reducer阶段KV数据类型一致---可以省略一个不写
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //设置Reducer阶段也是最终输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //设置文件输入输出
        FileInputFormat.setInputPaths(job, new Path("D://mrdata/flow/input/flow.log"));
        FileOutputFormat.setOutputPath(job, new Path("D://mrdata/flow/input/out"));
        提交任务,等待任务执行完毕
        job.waitForCompletion(true);

    }
}

 

相关标签: # MapReduce