基于MaxCompute的拉链表设计 数据配置
程序员文章站
2022-03-22 23:41:22
...
摘要: 简单的拉链表设计
背景信息:
在数据仓库的数据模型设计过程中,经常会遇到这样的需求:
1. 数据量比较大;
2. 表中的部分字段会被update,如用户的地址,产品的描述信息,订单的状态、手机号码等等;
3. 需要查看某一个时间点或者时间段的历史快照信息。(比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等)
4. 变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费;
综上所述:引入'拉链历史表',既能满足反应数据的历史状态,又可以最大程度的节省存储。
(备注:在阿里巴巴内部很大程度上是基于存储换计算来提供开发的效率及易用性,因为在当今,存储的成本远低于CPU和内存。因此在阿里巴巴内部会采用快照的方式将每日的全量数据进行快照,同时也会通过极限存储的方式,压缩率高,在合适的场景下,约能压缩为原始数据的1/30。)
Demo数据
以下只是demo如何在MaxCompute中实现拉链表,所以是基于一些假设:
● 同一天中同一订单只有一个状态发生;
● 基于20150821及之前的数据并没有同一个订单有两个状态的最简单场景模拟;
● 且数据源在阿里云RDS for Mysql中。且表明为orders。
创建MaxCompute表
--ODS层:订单的增量数据表,按天分区,存放每天的增量数据
CREATE TABLE ods_orders_inc_d
(
orderid BIGINT
,createtime STRING
,modifiedtime STRING
,o_status STRING
)
PARTITIONED BY (dt STRING)
LIFECYCLE 7;
--DW层:历史数据拉链表,存放订单的历史状态数据
CREATE TABLE dw_orders_his_d
(
orderid BIGINT COMMENT '订单ID'
,createtime STRING COMMENT '订单创建时间'
,modifiedtime STRING COMMENT '订单修改时间'
,o_status STRING COMMENT '订单修改时间'
,dw_start_date STRING COMMENT '订单生命周期开始时间'
,dw_end_date STRING COMMENT '订单生命周期结束时间'
);
实现思路
● 全量初始化:将2015-08-21及以前的全量历史数据通过全量方式同步至ODS并刷进DW层。
● 增量更新:将2015-08-22、2015-08-23的全天增量数据以增量方式刷入下游数据。
全量初始化
1. 创建节点任务:数据同步
2. 选择调度类型:手动调度
3. 配置数据同步任务:Mysql:Orders-->ODPS:ods_orders_inc_d
4. where条件配置:modifiedtime <= '20150821'
5. 分区值dt=20150821
提交调度系统,待数据同步任务执行成功后,再将ODS数据刷入DW。
创建SQL脚本:
INSERT overwrite TABLE dw_orders_his_d
SELECT orderid,createtime,modifiedtime,o_status,createtime AS dw_start_date,'99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = '20150821';
增量抽取并生成拉链表
1. 创建工作流任务并选择周期性调度。
2. 依次拖入数据同步节点任务和SQL任务。
3. 在数据同步任务中where条件配置为:modifiedtime=${bdp.system.bizdate}
4. 目标表ods_orders_inc_d分区配置为dt=${bdp.system.bizdate}
5. 配置SQL节点,且为数据同步节点的下游节点。
--通过DW历史数据和ODS增量数据刷新DW表
insert overwrite table dw_orders_his_d
SELECT a0.orderid, a0.createtime, a0.modifiedtime, a0.o_status, a0.dw_start_date, a0.dw_end_date
FROM (
-- 对orderid进行开窗然后按照生命周期结束时间倒序排,支持重跑
SELECT a1.orderid, a1.createtime, a1.modifiedtime, a1.o_status, a1.dw_start_date, a1.dw_end_date
, ROW_NUMBER() OVER (distribute BY a1.orderid,a1.createtime, a1.modifiedtime,a1.o_status sort BY a1.dw_end_date DESC) AS nums
FROM (
-- 用历史数据与增量22日的数据进行匹配,当发现在22日新增数据中存在且end_date > 当前日期的就表示数据状态发生过变化,然后修改生命周期
-- 修改昨日已经生命截止的数据并union最新增量数据到DW
SELECT a.orderid, a.createtime, a.modifiedtime, a.o_status, a.dw_start_date
, CASE
WHEN b.orderid IS NOT NULL AND a.dw_end_date > ${bdp.system.bizdate} THEN ${yesterday}
ELSE a.dw_end_date
END AS dw_end_date
FROM dw_orders_his_d a
LEFT OUTER JOIN (
SELECT *
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.orderid = b.orderid
UNION ALL
--2015-08-22的增量数据刷新到DW
SELECT orderid, createtime, modifiedtime, o_status, modifiedtime AS dw_start_date
, '99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) a1
) a0
-- 开窗口后对某个订单中生命周期为'9999-12-31'的取值并写入,防止重跑数据情况。
WHERE a0.nums = 1
order by a0.orderid,a0.dw_start_date;
备注:测试运行的时候,选择业务日期为20150822。也可以通过补数据方式,直接把20150822和20150823两天的增量数据刷入DW中。上面SQL中${yesterday}为自定义变量,其赋值为${yyyymmdd-1}
如何使用拉链表
● 查看某一天的全量历史快照数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150822'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
● 取一段时间的变化记录集合,如在20150822-20150823变化的记录。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150823'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
● 查看某一订单历史变化情况。
SELECT *
FROM dw_orders_his_d
WHERE orderid = 8
ORDER BY dw_start_date;
● 取最新的数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_end_date = '99991231'
关于基于历史拉链表回滚某一天或一段时间内的数据,还是一个相对比较复杂的话题,这个可以下载再谈。
原文链接:https://yq.aliyun.com/articles/542146?spm=a2c41.11181499.0.0
背景信息:
在数据仓库的数据模型设计过程中,经常会遇到这样的需求:
1. 数据量比较大;
2. 表中的部分字段会被update,如用户的地址,产品的描述信息,订单的状态、手机号码等等;
3. 需要查看某一个时间点或者时间段的历史快照信息。(比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等)
4. 变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费;
综上所述:引入'拉链历史表',既能满足反应数据的历史状态,又可以最大程度的节省存储。
(备注:在阿里巴巴内部很大程度上是基于存储换计算来提供开发的效率及易用性,因为在当今,存储的成本远低于CPU和内存。因此在阿里巴巴内部会采用快照的方式将每日的全量数据进行快照,同时也会通过极限存储的方式,压缩率高,在合适的场景下,约能压缩为原始数据的1/30。)
Demo数据
以下只是demo如何在MaxCompute中实现拉链表,所以是基于一些假设:
● 同一天中同一订单只有一个状态发生;
● 基于20150821及之前的数据并没有同一个订单有两个状态的最简单场景模拟;
● 且数据源在阿里云RDS for Mysql中。且表明为orders。
创建MaxCompute表
--ODS层:订单的增量数据表,按天分区,存放每天的增量数据
CREATE TABLE ods_orders_inc_d
(
orderid BIGINT
,createtime STRING
,modifiedtime STRING
,o_status STRING
)
PARTITIONED BY (dt STRING)
LIFECYCLE 7;
--DW层:历史数据拉链表,存放订单的历史状态数据
CREATE TABLE dw_orders_his_d
(
orderid BIGINT COMMENT '订单ID'
,createtime STRING COMMENT '订单创建时间'
,modifiedtime STRING COMMENT '订单修改时间'
,o_status STRING COMMENT '订单修改时间'
,dw_start_date STRING COMMENT '订单生命周期开始时间'
,dw_end_date STRING COMMENT '订单生命周期结束时间'
);
实现思路
● 全量初始化:将2015-08-21及以前的全量历史数据通过全量方式同步至ODS并刷进DW层。
● 增量更新:将2015-08-22、2015-08-23的全天增量数据以增量方式刷入下游数据。
全量初始化
1. 创建节点任务:数据同步
2. 选择调度类型:手动调度
3. 配置数据同步任务:Mysql:Orders-->ODPS:ods_orders_inc_d
4. where条件配置:modifiedtime <= '20150821'
5. 分区值dt=20150821
提交调度系统,待数据同步任务执行成功后,再将ODS数据刷入DW。
创建SQL脚本:
INSERT overwrite TABLE dw_orders_his_d
SELECT orderid,createtime,modifiedtime,o_status,createtime AS dw_start_date,'99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = '20150821';
增量抽取并生成拉链表
1. 创建工作流任务并选择周期性调度。
2. 依次拖入数据同步节点任务和SQL任务。
3. 在数据同步任务中where条件配置为:modifiedtime=${bdp.system.bizdate}
4. 目标表ods_orders_inc_d分区配置为dt=${bdp.system.bizdate}
5. 配置SQL节点,且为数据同步节点的下游节点。
--通过DW历史数据和ODS增量数据刷新DW表
insert overwrite table dw_orders_his_d
SELECT a0.orderid, a0.createtime, a0.modifiedtime, a0.o_status, a0.dw_start_date, a0.dw_end_date
FROM (
-- 对orderid进行开窗然后按照生命周期结束时间倒序排,支持重跑
SELECT a1.orderid, a1.createtime, a1.modifiedtime, a1.o_status, a1.dw_start_date, a1.dw_end_date
, ROW_NUMBER() OVER (distribute BY a1.orderid,a1.createtime, a1.modifiedtime,a1.o_status sort BY a1.dw_end_date DESC) AS nums
FROM (
-- 用历史数据与增量22日的数据进行匹配,当发现在22日新增数据中存在且end_date > 当前日期的就表示数据状态发生过变化,然后修改生命周期
-- 修改昨日已经生命截止的数据并union最新增量数据到DW
SELECT a.orderid, a.createtime, a.modifiedtime, a.o_status, a.dw_start_date
, CASE
WHEN b.orderid IS NOT NULL AND a.dw_end_date > ${bdp.system.bizdate} THEN ${yesterday}
ELSE a.dw_end_date
END AS dw_end_date
FROM dw_orders_his_d a
LEFT OUTER JOIN (
SELECT *
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.orderid = b.orderid
UNION ALL
--2015-08-22的增量数据刷新到DW
SELECT orderid, createtime, modifiedtime, o_status, modifiedtime AS dw_start_date
, '99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) a1
) a0
-- 开窗口后对某个订单中生命周期为'9999-12-31'的取值并写入,防止重跑数据情况。
WHERE a0.nums = 1
order by a0.orderid,a0.dw_start_date;
备注:测试运行的时候,选择业务日期为20150822。也可以通过补数据方式,直接把20150822和20150823两天的增量数据刷入DW中。上面SQL中${yesterday}为自定义变量,其赋值为${yyyymmdd-1}
如何使用拉链表
● 查看某一天的全量历史快照数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150822'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
● 取一段时间的变化记录集合,如在20150822-20150823变化的记录。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150823'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
● 查看某一订单历史变化情况。
SELECT *
FROM dw_orders_his_d
WHERE orderid = 8
ORDER BY dw_start_date;
● 取最新的数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_end_date = '99991231'
关于基于历史拉链表回滚某一天或一段时间内的数据,还是一个相对比较复杂的话题,这个可以下载再谈。
原文链接:https://yq.aliyun.com/articles/542146?spm=a2c41.11181499.0.0
上一篇: Spring(17)——使用Java类的形式配置bean
下一篇: R语言笔记——创建数据集(一)