大数据之直播平台数据统计
程序员文章站
2024-03-22 12:29:28
...
已知有以下直播平台数据
json格式
{"id":158008900435,"uid":120010010445,"nickname":"jack435","gold":445,"watchnumpv":4350,"watchnumuv":870,"hots":1350,"nofollower":435,"looktime":8700,"smlook":2175,"follower":1740,"gifter":870,"length":2620,"area":"A_US","rating":"B","exp":1305,"type":"video_rating"}
其中,每一行代表一次直播,uid代表主播id,watchnumpv代表观看的次数,follower代表本次直播关注的人数,length代表本次直播时长。
求某一天主播的总watchnumpv,follower,length,并求时长前十的主播
第一步,对json数据进行数据清洗
public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
String line = v1.toString();
JSONObject jsonObject = JSON.parseObject(line);
String id = jsonObject.getString("uid");
int gold = jsonObject.getIntValue("gold");
int watchnumpv = jsonObject.getIntValue("watchnumpv");
int follower= jsonObject.getIntValue("follower");
int length = jsonObject.getIntValue("length");
if (gold >= 0 && watchnumpv >= 0 && follower >= 0 && length >= 0) {
Text k2 = new Text();
k2.set(id);
Text v2 = new Text();
v2.set(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
context.write(k2, v2);
}
}
}
public class DataCleanJob {
public static void main(String[] args) {
try {
if (args.length != 2) {
System.exit(100);
}
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
wcjob.setJarByClass(DataCleanJob.class);
wcjob.setMapperClass(DataCleanMap.class);
// mapper输出的key类型
wcjob.setMapOutputKeyClass(Text.class);
// mapper输出的value的类型
wcjob.setMapOutputValueClass(Text.class);
wcjob.setNumReduceTasks(0);
// 输入文件路径
FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
// 输出路径,路径不能已存在,否则就结出错
FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
boolean res = wcjob.waitForCompletion(true);
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
第二步,统计主播这天的总watchnumpv,follower,length
public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
String line = v1.toString();
String[] fields = line.split("\t");
String id = fields[0];
long gold = Long.parseLong(fields[1]);
long watchnumpv = Long.parseLong(fields[2]);
long follower = Long.parseLong(fields[3]);
long length = Long.parseLong(fields[4]);
Text k2 = new Text();
k2.set(id);
VideoInfoWritable v2 = new VideoInfoWritable();
v2.set(gold, watchnumpv, follower, length);
context.write(k2, v2);
}
}
public class VideoInfoReduce extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable> {
@Override
protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s, Context context) throws IOException, InterruptedException {
long goldsum = 0;
long watchnumpvsum = 0;
long followersum = 0;
long lengthsum = 0;
for (VideoInfoWritable v2 : v2s) {
goldsum += v2.getGold();
watchnumpvsum += v2.getWatchnumpv();
followersum += v2.getFollower();
lengthsum += v2.getLength();
}
Text k3 = k2;
VideoInfoWritable v3 = new VideoInfoWritable();
v3.set(goldsum, watchnumpvsum, followersum, lengthsum);
context.write(k3, v3);
}
}
public class VideoInfoWritable implements Writable {
private long gold;
private long watchnumpv;
private long follower;
private long length;
public void set(long gold, long watchnumpv, long follower, long length) {
this.gold = gold;
this.watchnumpv = watchnumpv;
this.follower = follower;
this.length = length;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(gold);
dataOutput.writeLong(watchnumpv);
dataOutput.writeLong(follower);
dataOutput.writeLong(length);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.gold = dataInput.readLong();
this.watchnumpv = dataInput.readLong();
this.follower = dataInput.readLong();
this.length = dataInput.readLong();
}
@Override
public String toString() {
return gold + "\t" + watchnumpv + "\t" + follower + "\t" + length;
}
...get and set method
}
public class VideoInfoJob {
public static void main(String[] args) {
try {
if (args.length != 2) {
System.exit(100);
}
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
wcjob.setJarByClass(VideoInfoJob.class);
wcjob.setMapperClass(VideoInfoMap.class);
// mapper输出的key类型
wcjob.setMapOutputKeyClass(Text.class);
// mapper输出的value的类型
wcjob.setMapOutputValueClass(VideoInfoWritable.class);
wcjob.setReducerClass(VideoInfoReduce.class);
// reducer输出的key类型
wcjob.setOutputKeyClass(Text.class);
// reducer输出的value类型
wcjob.setOutputValueClass(VideoInfoWritable.class);
// 输入文件路径
FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
// 输出路径,路径不能已存在,否则就结出错
FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
boolean res = wcjob.waitForCompletion(true);
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
第三步,排序输出length前十的主播
public class VideoInfoTop10Map extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
String line = v1.toString();
String[] fields = line.split("\t");
String id = fields[0];
long length = Long.parseLong(fields[4]);
Text k2 = new Text();
k2.set(id);
LongWritable v2 = new LongWritable();
v2.set(length);
context.write(k2, v2);
}
}
public class VideoInfoTop10Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
HashMap<String, Long> map = new HashMap<>();
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
long lengthsum = 0;
for (LongWritable v2 : v2s) {
lengthsum += v2.get();
}
map.put(k2.toString(), lengthsum);
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String dt = conf.get("dt");
Map<String, Long> sortedMap = MapUtils.sortValue(map);
Set<Map.Entry<String, Long>> entries = sortedMap.entrySet();
Iterator<Map.Entry<String, Long>> it = entries.iterator();
int count = 1;
while (count <= 10 && it.hasNext()) {
Map.Entry<String, Long> entry = it.next();
String key = entry.getKey();
Long value = entry.getValue();
Text k3 = new Text();
k3.set(dt + "\t" + key);
LongWritable v3 = new LongWritable();
v3.set(value);
context.write(k3, v3);
count++;
}
}
}
public class VideoInfoJobTop10 {
public static void main(String[] args) {
try {
if (args.length != 2) {
System.exit(100);
}
String[] fielsd = args[0].split("/");
String tmpdt = fielsd[fielsd.length - 1];
String dt = DateUtils.transDataFormat(tmpdt);
Configuration conf = new Configuration();
conf.set("dt", dt);
Job wcjob = Job.getInstance(conf);
wcjob.setJarByClass(VideoInfoJobTop10.class);
wcjob.setMapperClass(VideoInfoTop10Map.class);
// mapper输出的key类型
wcjob.setMapOutputKeyClass(Text.class);
// mapper输出的value的类型
wcjob.setMapOutputValueClass(LongWritable.class);
wcjob.setReducerClass(VideoInfoTop10Reduce.class);
// reducer输出的key类型
wcjob.setOutputKeyClass(Text.class);
// reducer输出的value类型
wcjob.setOutputValueClass(LongWritable.class);
// 输入文件路径
FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
// 输出路径,路径不能已存在,否则就结出错
FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));
boolean res = wcjob.waitForCompletion(true);
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class MapUtils {
/**
* 根据Map的value值降序排序
* @param map
* @param <K>
* @param <V>
* @return
*/
public static <K, V extends Comparable<? super V>> Map<K, V> sortValue(Map map) {
List<Map.Entry<K, V>> list = new ArrayList<>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
@Override
public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
int compare = (o1.getValue()).compareTo(o2.getValue());
return -compare;
}
});
Map<K, V> returnMap = new LinkedHashMap<>();
for (Map.Entry<K, V> entry : list) {
returnMap.put(entry.getKey(), entry.getValue());
}
return returnMap;
}
}
public class DateUtils {
private static SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMdd");
private static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");
public static String transDataFormat(String dt) {
String res = "1970-01-01";
try {
Date date = sdf1.parse(dt);
res = sdf2.format(date);
} catch (Exception e) {
System.out.println("日期转换失败:" + dt);
}
return res;
}
}
第四步,用脚本每天执行以上代码
假设每天的数据都放到一个文件中,文件名格式为yyyyMMdd那么,每天定时执行这个脚本就能知道昨天的直播时长top10的主播
#!/bin/bash
if [ "X$1" = "X" ]
then
# OSX获取昨天日期
yes_time=`date -v-1d +"%Y%m%d"`
# linux系统获取昨天日期
# yes_time=`date +%Y%m%d --date="1 days ago"`
else
yes_time=$1
fi
cleanjob_input=hdfs://localhost:9000/data/videoinfo/${yes_time}
cleanjob_output=hdfs://localhost:9000/data/videoinfo_clean/${yes_time}
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://localhost:9000/res/videoinfojob/${yes_time}
videoinfojobtop10_input=${cleanjob_output}
videoinfojobtop10_output=hdfs://localhost:9000/res/videoinfojobtop10/${yes_time}
jobs_home=/Users/liangjiepeng/Documents/ideaCode/a/target
hdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfojobtop10_output}
hadoop jar \
${jobs_home}/a-v1.0-jar-with-dependencies.jar \
com.itmayiedu.hadoop.dataClean.DataCleanJob \
${cleanjob_input} \
${cleanjob_output}
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
if [ "$?" = 0 ]
then
echo "cleanJob execute success..."
hadoop jar \
${jobs_home}/a-v1.0-jar-with-dependencies.jar \
com.itmayiedu.hadoop.dataClean.videoinfo.VideoInfoJob \
${videoinfojob_input} \
${videoinfojob_output}
hdfs dfs -ls ${videoinfojob_output}/_SUCCESS
if [ "$?" != "0" ]
then
echo "VideoInfoJob execute faild..."
fi
hadoop jar \
${jobs_home}/a-v1.0-jar-with-dependencies.jar \
com.itmayiedu.hadoop.dataClean.top10.VideoInfoJobTop10 \
${videoinfojobtop10_input} \
${videoinfojobtop10_output}
hdfs dfs -ls ${videoinfojobtop10_output}/_SUCCESS
if [ "$?" != "0" ]
then
echo "VideoInfoJobTop10 execute faild..."
fi
else
echo "cleanJob execute faild... date time is ${year_time}"
fi
第五步,通过sqoop将数据导入到mysql中,便于web页面读取数据并展示
sqoop export --connect jdbc:mysql://localhost:3306/test --username root --password 123456 --table top10 --export-dir /res/videoinfojobtop10/20200306 --input-fields-terminated-by "\t"
上一篇: Python操作MongoDB数据库
推荐阅读
-
python数据分析可视化(数据统计分析三大软件)
-
常用什么软件好统计数据(免费大数据查询平台)
-
python数据分析可视化(数据统计分析三大软件)
-
常用什么软件好统计数据(免费大数据查询平台)
-
基于阿里云 数据统计平台设计(类CNZZ或百度统计)
-
MySQL之统计查询,按月查询每天数据,无数据自动填充0
-
成就企业驾驭大数据浪潮 :Sybase数据分析与管理技术之四大法宝 博客分类: SYBASE ASE
-
大数据之直播平台数据统计
-
(转)数据库SQL优化大总结之 百万级数据库优化方案 博客分类: 其他 sql优化
-
收藏 | 全球大数据7大阵营,你都知道吗? 博客分类: 大数据平台大数据应用预测分析数据分析BI 大数据数据分析数据可视化商业智能