数仓数据清理
程序员文章站
2022-06-03 17:56:52
...
今天的需求是
清洗数据,给用户添加省市区,添加新老用户,添加idMaping(用户标识)
1清洗过滤数据
1.1过滤掉日志account及deceid全为空的记录
"!(isBlank(deviceid) and isBlank(account))")
1.2过滤日志中缺少关键字端properties/eventid/sessionid 缺任何一个都不行
"properties is not null and !isBlank(eventid) and !isBlank(sessionid)"
1.3过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
因为时间有延迟,可能采取的是昨天的日志所以需要对timestamp进行过滤
s"to_date(from_unixtime(cast(timestamp/1000 as bigint))) = '${args(1)}'")
2维度集成
2.1:将日志的GPS经纬度坐标解析为省市区
因为用户可能关闭了定位所以我们采取不到GPS左边,那么可以使用IP地址来解析
2.2将日志中的IP地址解析为省市区
3:ID_MAPPING
用户可能使用设备没有登录,
使用account账户登录,
一个用户可能拥有不止一台终端设备
一台终端设备上可能有多个用户使用
个用户可能一段时间后更换手机
那么怎么确定是同一个用户登录的是一个难点,我们要给每个登录或者设备打上标识,这样才能确定用户的日活
如何做到用户身份的精确标识是难点
3.1一个设备一个用户
3.2一个设备一个ID
3.3一个设备多个ID
最终方案
一个设备ID被绑定到某个登陆ID(A)之后,如果该设备在后续一段时间(比如一个月内)被一个新的登陆ID(B)更频繁使用,
则该设备ID会被调整至绑定登陆ID(B)
如果用户登录就使用account,没有登录就使用deviceType设备id,
4:新老访客标记
新访客,标记为1
老访客,标记为0
解决思路 一步一步解决
1:2维度集成 工具类
解析经纬度.
使用GEOHASH生成Geo表,
如何生成
1:连接数据库 公司会有全国的省市区表
2:过滤省市区表将表内的省市区取出,单独成立一个表
3:代码实现连接 val df: DataFrame = spark.read.jdbc
import spark.implicits._
val res = df.map(row=>{
val lat = row.getAs[Double]("lat")
val lng = row.getAs[Double]("lng")
val province = row.getAs[String]("province")
val city = row.getAs[String]("city")
val region = row.getAs[String]("region")
// 把经纬度坐标,转换成geohash编码
val geo = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5)
(geo,province,city,region)
}).toDF("geo","province","city","region")
取出经纬度,和省市区 写成parquet或者orc格式放入hdfs内,这样就生成了经纬度字典
1:2 ip工具类
狮子的魂 ip2region 在https://gitee.com/lionsoul/ip2region 下载
String ip = "202.102.36.87";
DbConfig config = new DbConfig();
// 加载ip字典库文件为一个字节数组
File file = new File("initdata\\ip2region.db");
RandomAccessFile ra = new RandomAccessFile(file, "r");
byte[] b = new byte[(int)file.length()];
ra.readFully(b);
// 构造一个ip2region搜索器
DbSearcher dbSearcher = new DbSearcher(config, b);
DataBlock block = dbSearcher.memorySearch(ip);
String region = block.getRegion();
System.out.println(region);
1.3:设计idMapping
在众多数据中,我们只需要
deviceid设备id
account 用户登录账户
timestamp 用户登录的时间
模仿假的数据使用一个csv表写入deviceid,account timestamp
在写一个json的昨日生成的文件
1对当天的日志访问记录进行评分 取出当日的日志
1.1过滤掉完全没有账户登录的设备记录那就是有登录的用户
1.2:没有登录记录的设备
1.3对同一个设备取时间最早的第一条
1.4对每个设备上每个登录账户评分(越早越好)
1.5将账户评分结果union
2 加载T-1的映射字典
2.1读取数据
2.2格式扁平化
2.3将历史数据idmp,账户绑定列表为空的数据单独拎出来
3合并T和T-1
3.1 分数聚合
3.2将同一个设备的账户评分整理到一起,选出guid,转成json
设置好idMapping记录后开始实现代码
开始写正式的程序
我们拿到的文件是从hive中读取的
需要配置文件
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.4</version>
</dependency>
还需要第三方json-serde-1.3.8.jar序列化lib包
要在项目的resources中放入配置文件
hive-site.xml
core-site.xml
val spark = SparkSession.builder()
.appName("ODS APP日志抽取到DWD任务")
.enableHiveSupport() // 开启spark读取hive数据的支持
.getOrCreate()
读取hive的文件
我们还需要将geohash字典放入hdfs中
将ip2region字典放入hdfs中
1:清洗过滤数据
为空的工具类
val isblank = (s:String)=>{StringUtils.isBlank(s)}
}
2:由于是从hive中拿取表信息,所有我们是Dataset
转为rdd.map获取今天正正的日志.读取日志的所有参数
val account =it.getAs[String]("account")
val properties = it.getAs[Map[String, String]]("properties")
获取了日志信息后,我们开始用geohash字典解析日志中的经纬度,获取用户的地理位置
因为我们是将文件放在hdfs中,那么每个map的task都要去hdfs中读取文件,那么浪费资源还可能导致一楼数据
所以使用广播变量,将数据广播到executor中
val geohashDict: DataFrame = spark.read.parquet("/dict_data/geohash_area_dict/")
val areaDicMap = geohashDict.rdd.map(it => {
val geo = it.getAs[String]("geo")
val province = it.getAs[String]("province")
val city = it.getAs[String]("city")
val region = it.getAs[String]("region")
(geo, (province, city, region))
}).collectAsMap()
将数据形成map形式广播出去
通过value获取province,city,region后
我们要判断经纬度是否为空,因为由的用户关闭了GPS采取不到经纬度
经纬度是lat90到-90 lng180到-180
由于geohash字典判断不了没有gps的用户地理位置,所以我们在用ip2region来获取
不过还是需要过滤,因为由的ip字典(只针对中国)里没有,可能是美国的
def loadIp2RegionDbFile():Array[Byte]={
val conf = new Configuration()
val fs = FileSystem.get(conf)
val len: Long = fs.getFileStatus(new Path("/dict_data/ip_area_dict/ip2region.db")).getLen
val fsin: FSDataInputStream = fs.open(new Path("/dict_data/ip_area_dict/ip2region.db"))
val bytes = new Array[Byte](len.toInt)
fsin.read(bytes)
bytes
}
广播到map端后
if (isblank(province + city + region)) {
val config = new DbConfig()
val searcher = new DbSearcher(config, ipDict)
val block = searcher.memorySearch(eventBean.ip)
if(block !=null && block.getRegion.split("\\|").size>4 ){
val strings = block.getRegion.split("\\|")
province = strings(2)
city = strings(3)}
}
测试的是 block.getRegion 第二个和第三个是省市
3:为每条数据标记Guid 如果account为空,那么就用deviceid,如果不为空就用account作为uid
al guid = if (isblank(eventBean.account)) idDict.get(eventBean.deviceid).get else eventBean.account
4如果包含deviceid,和guid 那么你就是老用户,老用户isnew默认1,否则就是新用户
加载所有历史出现过的dedeviceID和accountsid 我们要加载昨天的全部deciceid和guid,来判断是否有新老用户
使用昨天的数据因为昨天的数据包含了所有的用户信息,如果用户出现那么就是1如果没有出现那么就是0,
新老访客标记
如果包含deviceid,和guid 那么你就是老用户,老用户isnew默认1,否则就是新用户
if (deviceidSet.contains(eventBean.deviceid) || guidSet.contains(eventBean.guid)) isnew = 0
完毕
上一篇: 大数据数仓Hive笔试题(一)