欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据之直播平台数据统计

程序员文章站 2024-03-22 12:29:28
...

已知有以下直播平台数据
大数据之直播平台数据统计
json格式

{"id":158008900435,"uid":120010010445,"nickname":"jack435","gold":445,"watchnumpv":4350,"watchnumuv":870,"hots":1350,"nofollower":435,"looktime":8700,"smlook":2175,"follower":1740,"gifter":870,"length":2620,"area":"A_US","rating":"B","exp":1305,"type":"video_rating"}

其中,每一行代表一次直播,uid代表主播id,watchnumpv代表观看的次数,follower代表本次直播关注的人数,length代表本次直播时长。
求某一天主播的总watchnumpv,follower,length,并求时长前十的主播

第一步,对json数据进行数据清洗

public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        String line = v1.toString();
        JSONObject jsonObject = JSON.parseObject(line);
        String id = jsonObject.getString("uid");
        int gold = jsonObject.getIntValue("gold");
        int watchnumpv = jsonObject.getIntValue("watchnumpv");
        int follower= jsonObject.getIntValue("follower");
        int length = jsonObject.getIntValue("length");
        if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {
            Text k2 = new Text();
            k2.set(id);
            Text v2 = new Text();
            v2.set(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
            context.write(k2, v2);
        }
    }
}
public class DataCleanJob {
    public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.exit(100);
            }
            Configuration conf = new Configuration();
            Job wcjob = Job.getInstance(conf);
            wcjob.setJarByClass(DataCleanJob.class);
            wcjob.setMapperClass(DataCleanMap.class);
            // mapper输出的key类型
            wcjob.setMapOutputKeyClass(Text.class);
            // mapper输出的value的类型
            wcjob.setMapOutputValueClass(Text.class);
            wcjob.setNumReduceTasks(0);
            // 输入文件路径
            FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
            // 输出路径,路径不能已存在,否则就结出错
            FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
            boolean res = wcjob.waitForCompletion(true);
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

第二步,统计主播这天的总watchnumpv,follower,length

public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        String line = v1.toString();
        String[] fields = line.split("\t");
        String id = fields[0];
        long gold = Long.parseLong(fields[1]);
        long watchnumpv = Long.parseLong(fields[2]);
        long follower = Long.parseLong(fields[3]);
        long length = Long.parseLong(fields[4]);
        Text k2 = new Text();
        k2.set(id);
        VideoInfoWritable v2 = new VideoInfoWritable();
        v2.set(gold, watchnumpv, follower, length);
        context.write(k2, v2);
    }
}
public class VideoInfoReduce extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable> {
    @Override
    protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s, Context context) throws IOException, InterruptedException {
        long goldsum = 0;
        long watchnumpvsum = 0;
        long followersum = 0;
        long lengthsum = 0;

        for (VideoInfoWritable v2 : v2s) {
            goldsum += v2.getGold();
            watchnumpvsum += v2.getWatchnumpv();
            followersum += v2.getFollower();
            lengthsum += v2.getLength();
        }
        Text k3 = k2;
        VideoInfoWritable v3 = new VideoInfoWritable();
        v3.set(goldsum, watchnumpvsum, followersum, lengthsum);
        context.write(k3, v3);
    }
}
public class VideoInfoWritable implements Writable {
    private long gold;
    private long watchnumpv;
    private long follower;
    private long length;

    public void set(long gold, long watchnumpv, long follower, long length) {
        this.gold = gold;
        this.watchnumpv = watchnumpv;
        this.follower = follower;
        this.length = length;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(gold);
        dataOutput.writeLong(watchnumpv);
        dataOutput.writeLong(follower);
        dataOutput.writeLong(length);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.gold = dataInput.readLong();
        this.watchnumpv = dataInput.readLong();
        this.follower = dataInput.readLong();
        this.length = dataInput.readLong();
    }

    @Override
    public String toString() {
        return gold + "\t" + watchnumpv + "\t" + follower + "\t" + length;
    }
    ...get and set method
}
public class VideoInfoJob {
    public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.exit(100);
            }
            Configuration conf = new Configuration();
            Job wcjob = Job.getInstance(conf);
            wcjob.setJarByClass(VideoInfoJob.class);
            wcjob.setMapperClass(VideoInfoMap.class);
            // mapper输出的key类型
            wcjob.setMapOutputKeyClass(Text.class);
            // mapper输出的value的类型
            wcjob.setMapOutputValueClass(VideoInfoWritable.class);
            wcjob.setReducerClass(VideoInfoReduce.class);
            // reducer输出的key类型
            wcjob.setOutputKeyClass(Text.class);
            // reducer输出的value类型
            wcjob.setOutputValueClass(VideoInfoWritable.class);
            // 输入文件路径
            FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
            // 输出路径,路径不能已存在,否则就结出错
            FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
            boolean res = wcjob.waitForCompletion(true);
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

第三步,排序输出length前十的主播

public class VideoInfoTop10Map extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        String line = v1.toString();
        String[] fields = line.split("\t");
        String id = fields[0];
        long length = Long.parseLong(fields[4]);
        Text k2 = new Text();
        k2.set(id);
        LongWritable v2 = new LongWritable();
        v2.set(length);
        context.write(k2, v2);
    }
}
public class VideoInfoTop10Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    HashMap<String, Long> map = new HashMap<>();

    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
        long lengthsum = 0;
        for (LongWritable v2 : v2s) {
            lengthsum += v2.get();
        }
        map.put(k2.toString(), lengthsum);
    }

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String dt = conf.get("dt");

        Map<String, Long> sortedMap = MapUtils.sortValue(map);
        Set<Map.Entry<String, Long>> entries = sortedMap.entrySet();
        Iterator<Map.Entry<String, Long>> it = entries.iterator();
        int count = 1;
        while (count <= 10 && it.hasNext()) {
            Map.Entry<String, Long> entry = it.next();
            String key = entry.getKey();
            Long value = entry.getValue();
            Text k3 = new Text();
            k3.set(dt + "\t" + key);
            LongWritable v3 = new LongWritable();
            v3.set(value);
            context.write(k3, v3);
            count++;
        }
    }
}
public class VideoInfoJobTop10 {
    public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.exit(100);
            }
            String[] fielsd = args[0].split("/");
            String tmpdt = fielsd[fielsd.length - 1];
            String dt = DateUtils.transDataFormat(tmpdt);

            Configuration conf = new Configuration();
            conf.set("dt", dt);
            Job wcjob = Job.getInstance(conf);
            wcjob.setJarByClass(VideoInfoJobTop10.class);
            wcjob.setMapperClass(VideoInfoTop10Map.class);
            // mapper输出的key类型
            wcjob.setMapOutputKeyClass(Text.class);
            // mapper输出的value的类型
            wcjob.setMapOutputValueClass(LongWritable.class);
            wcjob.setReducerClass(VideoInfoTop10Reduce.class);
            // reducer输出的key类型
            wcjob.setOutputKeyClass(Text.class);
            // reducer输出的value类型
            wcjob.setOutputValueClass(LongWritable.class);
            // 输入文件路径
            FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
            // 输出路径,路径不能已存在,否则就结出错
            FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
            boolean res = wcjob.waitForCompletion(true);
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
public class MapUtils {
    /**
     * 根据Map的value值降序排序
     * @param map
     * @param <K>
     * @param <V>
     * @return
     */
    public static <K, V extends Comparable<? super V>> Map<K, V> sortValue(Map map) {
        List<Map.Entry<K, V>> list = new ArrayList<>(map.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
            @Override
            public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
                int compare = (o1.getValue()).compareTo(o2.getValue());
                return -compare;
            }
        });
        Map<K, V> returnMap = new LinkedHashMap<>();
        for (Map.Entry<K, V> entry : list) {
            returnMap.put(entry.getKey(), entry.getValue());
        }
        return returnMap;
    }
}
public class DateUtils {
    private static SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMdd");
    private static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");

    public static String transDataFormat(String dt) {
        String res = "1970-01-01";
        try {
            Date date = sdf1.parse(dt);
            res = sdf2.format(date);
        } catch (Exception e) {
            System.out.println("日期转换失败:" + dt);
        }
        return res;
    }
}

第四步,用脚本每天执行以上代码

假设每天的数据都放到一个文件中,文件名格式为yyyyMMdd那么,每天定时执行这个脚本就能知道昨天的直播时长top10的主播

#!/bin/bash
if [ "X$1" = "X" ]
then
    # OSX获取昨天日期
    yes_time=`date -v-1d +"%Y%m%d"`
    # linux系统获取昨天日期
#    yes_time=`date +%Y%m%d --date="1 days ago"`
else
    yes_time=$1
fi

cleanjob_input=hdfs://localhost:9000/data/videoinfo/${yes_time}
cleanjob_output=hdfs://localhost:9000/data/videoinfo_clean/${yes_time}

videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://localhost:9000/res/videoinfojob/${yes_time}

videoinfojobtop10_input=${cleanjob_output}
videoinfojobtop10_output=hdfs://localhost:9000/res/videoinfojobtop10/${yes_time}

jobs_home=/Users/liangjiepeng/Documents/ideaCode/a/target

hdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfojobtop10_output}

hadoop jar \
${jobs_home}/a-v1.0-jar-with-dependencies.jar \
com.itmayiedu.hadoop.dataClean.DataCleanJob \
${cleanjob_input} \
${cleanjob_output}

hdfs dfs -ls ${cleanjob_output}/_SUCCESS
if [ "$?" = 0 ]
then
    echo "cleanJob execute success..."
    hadoop jar \
    ${jobs_home}/a-v1.0-jar-with-dependencies.jar \
    com.itmayiedu.hadoop.dataClean.videoinfo.VideoInfoJob \
    ${videoinfojob_input} \
    ${videoinfojob_output}
    hdfs dfs -ls ${videoinfojob_output}/_SUCCESS
    if [ "$?" != "0" ]
    then
        echo "VideoInfoJob execute faild..."
    fi

    hadoop jar \
    ${jobs_home}/a-v1.0-jar-with-dependencies.jar \
    com.itmayiedu.hadoop.dataClean.top10.VideoInfoJobTop10 \
    ${videoinfojobtop10_input} \
    ${videoinfojobtop10_output}
    hdfs dfs -ls ${videoinfojobtop10_output}/_SUCCESS
    if [ "$?" != "0" ]
    then
        echo "VideoInfoJobTop10 execute faild..."
    fi
else
    echo "cleanJob execute faild... date time is ${year_time}"
fi

第五步,通过sqoop将数据导入到mysql中,便于web页面读取数据并展示

sqoop export --connect jdbc:mysql://localhost:3306/test --username root --password 123456 --table top10 --export-dir /res/videoinfojobtop10/20200306 --input-fields-terminated-by "\t"

大数据之直播平台数据统计

相关标签: hadoop