欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

用户行为分析-埋点实时数仓实践(附用户关联源码)

程序员文章站 2022-07-12 08:11:10
...

目录

一、概述

二、数据模型

三、数据格式

四、架构图

五、动态添加ClickHouse列

六、用户关联(IdMapping)

七、批量写入

八、结束(附用户关联源码)


一、概述

埋点采集、用户行为分析、实时数仓、IdMapping

此文重点讲述埋点的数据模型、数据格式、数据实时采集、加工、存储及用户关联。关于用户行为分析的概念、意义以及埋点相关的东西此文不作赘述

二、数据模型

业界比较流行的事件、用户模型;即:

* who: 设备ID、登录ID
       * when: 事件发生时间、上报时间
       * where: 设备环境、网络环境、业务环境等
       * what: 事件标识、事件参数

我们的数据存储也只有events和users两张表

events:不会变的日志表且数据量大;我们用ClickHouse的分布式表存储

users:我们只有几百万用户,且做用户关联时会频繁根据用户id查询、更新,而且做数据分析时要和事件表关联;我们用ClickHouse的mysql Engine存储

events建表语句:

-- 事件local表;按日期周分区
CREATE TABLE analytics.events_replica ON CLUSTER ck_cluster(
	`track_id` String COMMENT '埋点',
	`event_id` Int64 COMMENT '事件id',
	`distinct_id` String COMMENT '设备id/用户中心id',
	`user_id` Int64 COMMENT '用户表id',
	`type` String COMMENT '埋点类型',
	`event` String COMMENT '埋点事件',
	`date` Date COMMENT '埋点日期',
	`time` DateTime64 ( 3, 'Asia/Shanghai' ) COMMENT '埋点上传时间',
	`receive_time` DateTime64 ( 3, 'Asia/Shanghai' ) COMMENT '埋点接受时间',
	`day` Int64 COMMENT '埋点距1970/01/01的天数',
	`week_id` Int64 COMMENT '埋点距1970/01/01的周数',
	`month_id` Int64 COMMENT '埋点距1970/01/01的月数'
	其他业务公共字段
	所有事件属性
	
) ENGINE = ReplicatedMergeTree ( '/clickhouse/tables/analytics/events_replica/{shard}', '{replica}' )
PARTITION BY toMonday ( date ) 
ORDER BY
	( track_id ) SETTINGS index_granularity = 8192

-- 事件分布式表
CREATE TABLE analytics.events ON CLUSTER ck_cluster
AS analytics.events_replica ENGINE =Distributed('ck_cluster', 'analytics', 'events_replica', rand())

 

users建表语句:

-- ClickHouse Mysql Engine表
CREATE TABLE cON CLUSTER ck_cluster
(
    `id` Int64 comment '系统用户id',
    `first_id` String comment '第一次关联的设备id',
    `second_id` String comment '用户中心id',
    `$device_id_list` String comment '非第一次关联的设备id集合;逗号分隔'
)
ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password');
		 
-- mysql表
CREATE TABLE `users` (
  `id` bigint(32) DEFAULT NULL,
  `first_id` varchar(100) DEFAULT NULL,
  `second_id` varchar(100) DEFAULT NULL,
  `$device_id_list` varchar(500) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

 

三、数据格式

1.事件埋点
埋点时机: 行为事件记录
type = track
用户登录前: is_login_id=false, distinct_id=设备id
用户登录后: is_login_id=true, distinct_id=用户id
只能在properties里添加属性

{
  "distinct_id": "登录前(设备id)、登录后(用户id)",
  "time": "当前时间戳",
  "type": "track",
  "event": "事件名",
  "properties": {
    "$is_login_id": true, 
    "$内置属性名": "内置属性值",
    "$自定义属性名": "自定义属性值"
  }
}

例子:
{
  "distinct_id": "123456",
  "time": 1434556935000,
  "type": "track",
  "event": "ViewProduct",
  "properties": {
    "$is_login_id": true,
    "$app_version": "1.3",
    "$wifi": true,
    "$ip": "180.79.35.65",
    "$province": "湖南",
    "$city": "长沙",
    "$user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_2 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) CriOS/58.0.3029.113 Mobile/14F89 Safari/602.1",
    "$screen_width": 320,
    "$screen_height": 568,
    "product_id": 12345,
    "product_name": "苹果",
    "product_classify": "水果",
    "product_price": 14.0
  }
}

2. 用户关联
埋点时机: 用户登录后
type=track_signup; event=$SignUp; distinct_id=用户ID; original_id=设备id

{
    "distinct_id":"用户Id",
    "original_id":"设备id",
    "time": "当前时间戳",
    "type": "track_signup",
    "event": "$SignUp",
    "properties": {
        "$内置属性名":"内置属性值"
    }
}

例子:
{
    "distinct_id":"12345",
    "original_id":"2b0a6f51a3cd6775",
    "time": 1434557935000,
    "type": "track_signup",
    "event": "$SignUp",
    "properties": {
        "$manufacturer":"Apple",
        "$model": "iPhone5,2",
        "$os":"iOS",
        "$os_version":"7.0",
        "$app_version":"1.3",
        "$wifi":true,
        "$ip":"180.79.35.65",
        "$province":"湖南",
        "$city":"长沙",
        "$screen_width":320,
        "$screen_height":568
    }
}
 

 

四、架构图

用户行为分析-埋点实时数仓实践(附用户关联源码)

  • 前后端埋点:分为全埋点和自定义事件埋点;按数据条数和时间间隔批量发送
  • 埋点收集器:一个API接口,通过nginx作负载均衡,接收到埋点后异步写入kafka;业界通用的做法是用nginx接受埋点后直接落盘,然后再通过flume、logstash等日志采集工具采集到kafka。
  • kafka原始数据:通过flume采集一份到离线数仓
  • Flink ETL:核心数据处理逻辑

1.动态添加ClickHouse列

2.用户关联

3.数据校验、解析、清洗

  • 批量写入:按数据条数和时间间隔批量写入ClickHouse

五、动态添加ClickHouse列

自定义埋点的事件属性会随着业务增加,事件属性会作为events表的列形成一张宽表,所以采集到事件后,会根据事件的属性实时动态添加events表的字段

events表的列会初始化一份到redis的set里,在Flink ETL里,和埋点属性的集合取差集,并更新redis

需要注意的时:添加列时需要同时添加events的local表和distributed表

用户行为分析-埋点实时数仓实践(附用户关联源码)

 

六、用户关联(IdMapping)

参考某策的用户关联:标识用户

 

用户行为分析-埋点实时数仓实践(附用户关联源码)

大概逻辑:

1.根据埋点事件、用户关联事件的设备ID或登录ID去用户表里找到对应的用户ID作为事件表的用户ID

2.定时调度刷新设备多对一的情况

流程图如下(源码见文末):

 

 

用户行为分析-埋点实时数仓实践(附用户关联源码)

七、批量写入

由于jdbc的batchInsert需要sql一样,我们的实时采集事件却有所差别,导致sql不一样;这里我们可以根据sql分组,按一分钟或1000条批量写入即可

 

八、结束(附用户关联源码)

我基于mysql实现了用户关联的逻辑;可以做到设备多对一,关联登录前后的用户

用户关联源码:https://github.com/ostarsier/idmapping