海量数据处理
需求分析
1、数据预处理
–》 release_channel,device_id,city,device_id_type,app_ver_name 这几个字段如果缺失,则过滤
–》 将数据整成 字段,字段,字段,…… 这种形式
–》 在每条数据中添加一个字段:user_id(值就是device_id)
2、导入hive中的表的天分区
3、进行数据统计分析
每天的新增用户 –》 事实
维度 –》 各种组合
版本 渠道 城市 设备 新增用户数
所有 所有 所有 所有 21949382
所有 所有 所有 具体设备 xxx
所有 所有 具体城市 所有 yyyy
所有 具体渠道 所有 所有 yyyy
具体版本 所有 所有 所有 zzz
具体版本 具体渠道 所有 所有 zzz
数据预处理
- 原数据
{
“events”: “1473367236143\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000027\u0001\n1473367261933\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000028\u0001\n1473367280349\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000029\u0001\n1473367331326\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000030\u0001\n1473367353310\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000031\u0001\n1473367387087\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000032\u0001\n1473367402167\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000033\u0001\n1473367451994\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000034\u0001\n1473367474316\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000035\u0001\n1473367564181\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000036\u0001\n1473367589527\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000037\u0001\n1473367610310\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000038\u0001\n1473367624647\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000039\u0001\n1473368004298\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000040\u0001\n1473368017851\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000041\u0001\n1473369599067\u00010\u0001AppLaunch\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000042\u0001\n1473369622274\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u00011609072239570000043\u0001\n”,
“header”: {
“cid_sn”: “1501004207EE98AA”,
“mobile_data_type”: “”,
“os_ver”: “9”,
“mac”: “88:1f:a1:03:7d:a8”,
“resolution”: “2560x1337”,
“commit_time”: “1473399829041”,
“sdk_ver”: “103”,
“device_id_type”: “mac”,
“city”: “江门市”,
“android_id”: “”,
“device_model”: “MacBookPro11,1”,
“carrier”: “中国xx”,
“promotion_channel”: “1”,
“app_ver_name”: “1.7”,
“imei”: “”,
“app_ver_code”: “23”,
“pid”: “pid”,
“net_type”: “3”,
“device_id”: “m.88:1f:a1:03:7d:a8”,
“app_device_id”: “m.88:1f:a1:03:7d:a8”,
“release_channel”: “appstore”,
“country”: “CN”,
“time_zone”: “28800000”,
“os_name”: “ios”,
“manufacture”: “apple”,
“commit_id”: “fde7ee2e48494b24bf3599771d7c2a78”,
“app_token”: “XIAONIU_I”,
“account”: “none”,
“app_id”: “com.appid.xiaoniu”,
“build_num”: “YVF6R16303000403”,
“language”: “zh”
}
} - map逻辑
package hive.mapreduce;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
public class HiveDriver3 {
public static class HiveMapper3 extends Mapper<LongWritable, Text, NullWritable, Text> {
ObjectMapper objectMapper = null;
Text text = null;
NullWritable nullWritable = NullWritable.get();
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 获取一个映射JSON格式数据的对象;
objectMapper = new ObjectMapper();
text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 获取json格式数据
JsonNode readTree = objectMapper.readTree(value.toString());
// 由key——header获得value
JsonNode header = readTree.get("header");
// release_channel,device_id,city,device_id_type,app_ver_name
// 获取需要筛选的字段
String release_channel = header.get("release_channel").getTextValue();
String device_id = header.get("device_id").getTextValue();
String city = header.get("city").getTextValue();
String device_id_type = header.get("device_id_type").getTextValue();
String app_ver_name = header.get("app_ver_name").getTextValue();
String android_id = header.get("android_id").getTextValue();
// 判断字段对应的value值是否存在
if (StringUtils.isBlank(release_channel) || StringUtils.isBlank(device_id) || StringUtils.isBlank(city)
|| StringUtils.isBlank(device_id_type) || StringUtils.isBlank(app_ver_name)) {
return;
}
// 增加一个字段user_id
String user_id;
if ("android".equals(device_id_type)) {
if (StringUtils.isNotBlank(android_id)) {
user_id = android_id;
} else {
return;
}
} else {
user_id = device_id;
}
StringBuilder sb = new StringBuilder();
sb.append(header.get("cid_sn").getTextValue()).append(",");
sb.append(header.get("mobile_data_type").getTextValue()).append(",");
sb.append(header.get("os_ver").getTextValue()).append(",");
sb.append(header.get("mac").getTextValue()).append(",");
sb.append(header.get("resolution").getTextValue()).append(",");
sb.append(header.get("commit_time").getTextValue()).append(",");
sb.append(header.get("sdk_ver").getTextValue()).append(",");
sb.append(header.get("device_id_type").getTextValue()).append(",");
sb.append(header.get("city").getTextValue()).append(",");
sb.append(header.get("android_id").getTextValue()).append(",");
sb.append(header.get("device_model").getTextValue()).append(",");
sb.append(header.get("carrier").getTextValue()).append(",");
sb.append(header.get("promotion_channel").getTextValue()).append(",");
sb.append(header.get("app_ver_name").getTextValue()).append(",");
sb.append(header.get("imei").getTextValue()).append(",");
sb.append(header.get("app_ver_code").getTextValue()).append(",");
sb.append(header.get("pid").getTextValue()).append(",");
sb.append(header.get("net_type").getTextValue()).append(",");
sb.append(header.get("device_id").getTextValue()).append(",");
sb.append(header.get("app_device_id").getTextValue()).append(",");
sb.append(header.get("release_channel").getTextValue()).append(",");
sb.append(header.get("country").getTextValue()).append(",");
sb.append(header.get("time_zone").getTextValue()).append(",");
sb.append(header.get("os_name").getTextValue()).append(",");
sb.append(header.get("manufacture").getTextValue()).append(",");
sb.append(header.get("commit_id").getTextValue()).append(",");
sb.append(header.get("app_token").getTextValue()).append(",");
sb.append(header.get("account").getTextValue()).append(",");
sb.append(header.get("app_id").getTextValue()).append(",");
sb.append(header.get("build_num").getTextValue()).append(",");
sb.append(header.get("language").getTextValue()).append(",");
sb.append(user_id);
text.set(sb.toString());
context.write(nullWritable, text);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");
Job job = Job.getInstance(conf);
job.setJarByClass(HiveDriver3.class);
job.setMapperClass(HiveMapper3.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("D:/join/input/20170101/"));
FileOutputFormat.setOutputPath(job, new Path("D:/join/output/20170101/"));
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
if (res) {
System.out.println("运行成功!!!!!!");
} else {
System.out.println("运行失败!!!!!!");
}
}
}
分布式并发运算
活跃用户查询 ——##
//建模
1.创建原始数据表
create table app.ods_applog(
cid_sn string
,mobile_data_type string
,os_ver string
,mac string
,resolution string
,commit_time string
,sdk_ver string
,device_id_type string
,city string
,android_id string
,device_model string
,carrier string
,promotion_channel string
,app_ver_name string
,imei string
,app_ver_code string
,pid string
,net_type string
,device_id string
,app_device_id string
,release_channel string
,country string
,time_zone string
,os_name string
,manufacture string
,commit_id string
,app_token string
,account string
,app_id string
,build_num string
,language string
,user_id string
)
partitioned by (day string)
row format delimited fields terminated by ‘\001’;
// 每日活跃用户信息表
//release_channel,,city,,app_ver_name,user_id
create table app.reg_user_data(
city string,
release_channel string,
user_id string,
app_ver_name string)
partitioned by (day string);
// 每日活跃用户维度统计报表
create table app.dimensionality_user_data(
city string,
release_channel string,
app_ver_name string,
amount int)
partitioned by (day string,flag string);
// 2、每日加载预处理后的数据到ods_applog
load data local inpath ‘/root/applog/20170101/’ into table app.ods_applog partition(day=’20170101’);
load data local inpath ‘/root/applog/20170102/’ into table app.ods_applog partition(day=’20170102’);
查询
select * from app.ods_applog where day=20170101;
// 3、开始分析
3.1 统计当日活跃用户(当日出现过的用户)
最终统计目标:各维度组合下的活跃用户数
时间,城市,渠道,版本,活跃用户数
北京,appstore,3.2,89009
步骤:一般可以先抽取(ETL)出当日的活跃用户的信息
然后再统计需要的报表
步骤1:抽取活跃用户信息(对每个用户保留最早的一条数据,保留“城市、渠道、版本、user_id”)
// 2017-01-01
insert into table app.reg_user_data partition(day=’20170101’)
select city,release_channel,app_ver_name,user_id
from
(
select city,release_channel,app_ver_name,user_id,
row_number() over(partition by user_id order by commit_time) as rannk
from app.ods_applog where day=’20170101’
) d1
where d1.rannk=1;
步骤2:统计出活跃用户维度报表
时间, 城市, 渠道, 版本, 活跃用户数
20170101 0 0 0
20170101 0 0 1
20170101 0 1 0
20170101 0 1 1
20170101 1 0 0
20170101 1 0 1
20170101 1 1 0
20170101 1 1 1
// multiple insert 功能,多重插入:对同一张表进行多重查询并将结果插入到另一张表
from app.reg_user_data
insert into table dimensionality_user_data partition(day=’20170101’,flag=’000’)
select ‘all’,’all’,’all’,count(1)
insert into table dimensionality_user_data partition(day=’20170101’,flag=’001’)
select ‘all’,’all’,app_ver_name,count(1)
group by app_ver_name
insert into table dimensionality_user_data partition(day=’20170101’,flag=’010’)
select ‘all’,release_channel,’all’,count(1)
group by release_channel
insert into table dimensionality_user_data partition(day=’20170101’,flag=’011’)
select ‘all’,release_channel,app_ver_name,count(1)
group by release_channel,app_ver_name
insert into table dimensionality_user_data partition(day=’20170101’,flag=’100’)
select city,’all’,’all’,count(1)
group by city
insert into table dimensionality_user_data partition(day=’20170101’,flag=’101’)
select city,’all’,app_ver_name,count(1)
group by city,app_ver_name
insert into table dimensionality_user_data partition(day=’20170101’,flag=’110’)
select city,release_channel,’all’,count(1)
group by city,release_channel
insert into table dimensionality_user_data partition(day=’20170101’,flag=’111’)
select city,release_channel,app_ver_name,count(1)
group by city,release_channel,app_ver_name;
select * from dimensionality_user_data where day=’20170101’ and flag=’010’;
+——————————–+——————————————-+—————————————-+———————————-+——————————-+——————————–+–+
| dimensionality_user_data.city | dimensionality_user_data.release_channel | dimensionality_user_data.app_ver_name | dimensionality_user_data.amount | dimensionality_user_data.day | dimensionality_user_data.flag |
+——————————–+——————————————-+—————————————-+———————————-+——————————-+——————————–+–+
| all | 1003 | all | 10025 | 20170101 | 010 |
| all | 1006 | all | 9969 | 20170101 | 010 |
| all | 1007 | all | 10050 | 20170101 | 010 |
| all | 1009 | all | 10305 | 20170101 | 010 |
| all | appstore | all | 32799 | 20170101 | 010 |
+——————————–+——————————————-+—————————————-+———————————-+——————————-+——————————–+–+
新增用户查询 ——##
每日新用户统计:
//建模
// 每日新用户信息表(表结构与活跃用户信息表一致)
create table app.reg_user_new_data like app.reg_user_data;
// 历史用户表(之存储user_id)
create table app.reg_user_history_data(user_id string);
// 每日新用户维度统计报表
create table app.dim_user_new_data(
city string,
release_channel string,
app_ver_name string,
amount int)
partitioned by (day string,flag string);
思路:
1.抽取出当日的新用户信息(从当日活跃用户信息表etl_user_active_d中,抽取出新用户)
2.再做新用户的维度统计
步骤1.抽取新用户
// 1.将当日活跃用户比对历史表,得出当日新用户信息
// 2017-01-01
insert into table app.reg_user_new_data partition(day=’20170101’)
select city,release_channel,app_ver_name,user_id
from
(
select a.city,a.release_channel,a.app_ver_name,a.user_id,b.user_id as b_user_id
from app.reg_user_data a left join app.reg_user_history_data b
on a.user_id=b.user_id
where a.day=’20170101’
) t1
where b_user_id is null;
// 2017-01-02
insert into table app.reg_user_new_data partition(day=’20170102’)
select city,release_channel,app_ver_name,user_id
from
(
select a.city,a.release_channel,a.app_ver_name,a.user_id,b.user_id as b_user_id
from app.reg_user_data a left join app.reg_user_history_data b
on a.user_id=b.user_id
where a.day=’20170102’
) t1
where b_user_id is null;
// 2.将当日新用户的user_id插入历史用户表
// 2017-01-01
insert into table app.reg_user_history_data
select user_id from app.reg_user_new_data where day=’20170101’;
// 2017-01-02
insert into table app.reg_user_history_data
select user_id from app.reg_user_new_data where day=’20170102’;
// 步骤2:对新用户做维度统计报表
时间, 城市, 渠道, 版本, 新用户数
20170101 0 0 0
20170101 0 0 1
20170101 0 1 0
20170101 0 1 1
20170101 1 0 0
20170101 1 0 1
20170101 1 1 0
20170101 1 1 1
from app.reg_user_new_data
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’000’)
select ‘all’,’all’,’all’,count(1)
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’001’)
select ‘all’,’all’,app_ver_name,count(1)
group by app_ver_name
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’010’)
select ‘all’,release_channel,’all’,count(1)
group by release_channel
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’011’)
select ‘all’,release_channel,app_ver_name,count(1)
group by release_channel,app_ver_name
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’100’)
select city,’all’,’all’,count(1)
group by city
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’101’)
select city,’all’,app_ver_name,count(1)
group by city,app_ver_name
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’110’)
select city,release_channel,’all’,count(1)
group by city,release_channel
insert into table app.dim_user_new_data partition(day=’20170102’,flag=’111’)
select city,release_channel,app_ver_name,count(1)
group by city,release_channel,app_ver_name;
alter table app.dim_user_new_data drop partition(day=’20170102’);
select * from app.dim_user_new_data where day=’20170102’ and flag=’010’;
+————————+————————-+–+
| dim_user_new_data.city | dim_user_new_data.release_channel | dim_user_new_data.app_ver_name | dim_user_new_data.amount | dim_user_new_data.day | dim_user_new_data.flag |
+————————-+————————————+———————————+—————————+————————+————————-+–+
| all | 1003 | all | 28116 | 20170102 | 010 |
| all | 1006 | all | 28174 | 20170102 | 010 |
| all | 1007 | all | 28431 | 20170102 | 010 |
| all | 1009 | all | 28886 | 20170102 | 010 |
| all | appstore | all | 76865 | 20170102 | 010 |
+————————-+————————————+———————————+—————————+————————+————————-+–+
上一篇: 海量数据处理(3):布隆过滤器
下一篇: SparkSQL简单教程