欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

数仓数据清理

程序员文章站 2022-06-03 17:56:52
...
今天的需求是
清洗数据,给用户添加省市区,添加新老用户,添加idMaping(用户标识)

1清洗过滤数据
1.1过滤掉日志account及deceid全为空的记录
"!(isBlank(deviceid) and isBlank(account))")
1.2过滤日志中缺少关键字端properties/eventid/sessionid 缺任何一个都不行
"properties is not null and !isBlank(eventid) and !isBlank(sessionid)"
1.3过滤掉日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
因为时间有延迟,可能采取的是昨天的日志所以需要对timestamp进行过滤
s"to_date(from_unixtime(cast(timestamp/1000 as bigint))) = '${args(1)}'")

2维度集成
2.1:将日志的GPS经纬度坐标解析为省市区
因为用户可能关闭了定位所以我们采取不到GPS左边,那么可以使用IP地址来解析
2.2将日志中的IP地址解析为省市区

3:ID_MAPPING
用户可能使用设备没有登录,
使用account账户登录,
一个用户可能拥有不止一台终端设备
一台终端设备上可能有多个用户使用
个用户可能一段时间后更换手机
那么怎么确定是同一个用户登录的是一个难点,我们要给每个登录或者设备打上标识,这样才能确定用户的日活
如何做到用户身份的精确标识是难点

3.1一个设备一个用户
3.2一个设备一个ID
3.3一个设备多个ID

最终方案
一个设备ID被绑定到某个登陆ID(A)之后,如果该设备在后续一段时间(比如一个月内)被一个新的登陆ID(B)更频繁使用,
则该设备ID会被调整至绑定登陆ID(B)
如果用户登录就使用account,没有登录就使用deviceType设备id,

4:新老访客标记
新访客,标记为1
老访客,标记为0



解决思路  一步一步解决

1:2维度集成  工具类
解析经纬度.
使用GEOHASH生成Geo表,
如何生成
1:连接数据库 公司会有全国的省市区表
2:过滤省市区表将表内的省市区取出,单独成立一个表
3:代码实现连接   val df: DataFrame = spark.read.jdbc
    import spark.implicits._
    val res = df.map(row=>{
      val lat = row.getAs[Double]("lat")
      val lng = row.getAs[Double]("lng")
      val province = row.getAs[String]("province")
      val city = row.getAs[String]("city")
      val region = row.getAs[String]("region")
      // 把经纬度坐标,转换成geohash编码
      val geo = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5)
      (geo,province,city,region)
    }).toDF("geo","province","city","region")
取出经纬度,和省市区 写成parquet或者orc格式放入hdfs内,这样就生成了经纬度字典


1:2 ip工具类
狮子的魂 ip2region 在https://gitee.com/lionsoul/ip2region 下载
String ip = "202.102.36.87";
DbConfig config = new DbConfig();
// 加载ip字典库文件为一个字节数组
File file = new File("initdata\\ip2region.db");
RandomAccessFile ra = new RandomAccessFile(file, "r");
byte[] b = new byte[(int)file.length()];
ra.readFully(b);
// 构造一个ip2region搜索器
DbSearcher dbSearcher = new DbSearcher(config, b);
DataBlock block = dbSearcher.memorySearch(ip);
String region = block.getRegion();
System.out.println(region);

1.3:设计idMapping
在众多数据中,我们只需要 
deviceid设备id		
account 用户登录账户
timestamp 用户登录的时间

模仿假的数据使用一个csv表写入deviceid,account timestamp
在写一个json的昨日生成的文件

1对当天的日志访问记录进行评分 取出当日的日志
1.1过滤掉完全没有账户登录的设备记录那就是有登录的用户
1.2:没有登录记录的设备
1.3对同一个设备取时间最早的第一条
1.4对每个设备上每个登录账户评分(越早越好)
1.5将账户评分结果union

2 加载T-1的映射字典
2.1读取数据
2.2格式扁平化
2.3将历史数据idmp,账户绑定列表为空的数据单独拎出来

3合并T和T-1
3.1 分数聚合
3.2将同一个设备的账户评分整理到一起,选出guid,转成json


设置好idMapping记录后开始实现代码



开始写正式的程序
我们拿到的文件是从hive中读取的
需要配置文件
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.4.4</version>
</dependency>
还需要第三方json-serde-1.3.8.jar序列化lib包
要在项目的resources中放入配置文件
hive-site.xml
core-site.xml
 
val spark = SparkSession.builder()
  .appName("ODS APP日志抽取到DWD任务")
  .enableHiveSupport() // 开启spark读取hive数据的支持
  .getOrCreate()
读取hive的文件
  
  我们还需要将geohash字典放入hdfs中
  将ip2region字典放入hdfs中
  
  
  1:清洗过滤数据
    为空的工具类
    val isblank = (s:String)=>{StringUtils.isBlank(s)}

}
  2:由于是从hive中拿取表信息,所有我们是Dataset
  转为rdd.map获取今天正正的日志.读取日志的所有参数
  val  account =it.getAs[String]("account")
   val properties = it.getAs[Map[String, String]]("properties")
 
 获取了日志信息后,我们开始用geohash字典解析日志中的经纬度,获取用户的地理位置
因为我们是将文件放在hdfs中,那么每个map的task都要去hdfs中读取文件,那么浪费资源还可能导致一楼数据
所以使用广播变量,将数据广播到executor中
 val geohashDict: DataFrame = spark.read.parquet("/dict_data/geohash_area_dict/")
    val areaDicMap = geohashDict.rdd.map(it => {
      val geo = it.getAs[String]("geo")
      val province = it.getAs[String]("province")
      val city = it.getAs[String]("city")
      val region = it.getAs[String]("region")
      (geo, (province, city, region))
    }).collectAsMap()
	将数据形成map形式广播出去
	通过value获取province,city,region后
	我们要判断经纬度是否为空,因为由的用户关闭了GPS采取不到经纬度
	经纬度是lat90到-90  lng180到-180
	
由于geohash字典判断不了没有gps的用户地理位置,所以我们在用ip2region来获取
不过还是需要过滤,因为由的ip字典(只针对中国)里没有,可能是美国的
    def loadIp2RegionDbFile():Array[Byte]={
    val conf = new Configuration()
    val fs = FileSystem.get(conf)
    val len: Long = fs.getFileStatus(new Path("/dict_data/ip_area_dict/ip2region.db")).getLen
    val fsin: FSDataInputStream = fs.open(new Path("/dict_data/ip_area_dict/ip2region.db"))
    val bytes = new Array[Byte](len.toInt)
    fsin.read(bytes)
      bytes
  }
  广播到map端后
     if (isblank(province + city + region)) {
          val config = new DbConfig()
          val searcher = new DbSearcher(config, ipDict)
          val block = searcher.memorySearch(eventBean.ip)
          if(block !=null && block.getRegion.split("\\|").size>4 ){
          val strings =  block.getRegion.split("\\|")
          province = strings(2)
          city = strings(3)}
        }
	测试的是	block.getRegion 第二个和第三个是省市

3:为每条数据标记Guid 如果account为空,那么就用deviceid,如果不为空就用account作为uid
al guid = if (isblank(eventBean.account)) idDict.get(eventBean.deviceid).get else eventBean.account
4如果包含deviceid,和guid 那么你就是老用户,老用户isnew默认1,否则就是新用户
加载所有历史出现过的dedeviceID和accountsid 我们要加载昨天的全部deciceid和guid,来判断是否有新老用户
使用昨天的数据因为昨天的数据包含了所有的用户信息,如果用户出现那么就是1如果没有出现那么就是0,
新老访客标记
如果包含deviceid,和guid 那么你就是老用户,老用户isnew默认1,否则就是新用户
if (deviceidSet.contains(eventBean.deviceid) || guidSet.contains(eventBean.guid))   isnew = 0
完毕

	

 

相关标签: 大数据