欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kettle构建Hadoop ETL实践(九):事实表技术

程序员文章站 2024-02-23 09:30:16
...

目录

一、事实表概述

二、周期快照

1. 修改数据仓库模式

2. 创建快照表数据装载Kettle转换

三、累计快照

1. 修改数据库模式

2. 修改增量抽取销售订单表的Kettle转换

3. 修改定期装载销售订单事实表的Kettle转换

4. 修改定期装载Kettle作业

6. 测试

四、无事实的事实表

1. 建立新产品发布的无事实事实表

2. 初始装载无事实事实表

3. 修改定期装载Kettle作业

4. 测试定期装载作业

五、迟到的事实

1. 修改数据仓库模式

2. 修改定期装载Kettle转换

3. 修改装载月销售周期快照事实表的作业

4.    测试

六、累积度量

1. 修改模式

2. 初始装载

3. 定期装载

4. 测试定期装载

5. 查询

七、小结


        上两篇里介绍了几种基本的维度表技术,并用示例演示了每种技术的实现过程。本篇说明多维数据仓库中常见的事实表技术。我们将讲述五种基本事实表扩展,分别是周期快照、累积快照、无事实的事实表、迟到的事实和累积度量。和讨论维度表一样,也会从概念开始认识这些技术,继而给出常见的使用场景,最后以销售订单数据仓库为例,给出Kettle实现的作业、转换和测试过程。

一、事实表概述

        发生在业务系统中的操作型事务,其所产生的可度量数值,存储在事实表中。从最细节粒度级别看,事实表和操作型事务表的数据有一一对应的关系。因此,数据仓库中事实表的设计应该依赖于业务系统,而不受可能产生的最终报表影响。除数字类型的度量外,事实表总是包含所引用维度表的外键,也能包含可选的退化维度键或时间戳。数据分析的实质就是基于事实表开展计算或聚合操作。

        事实表中的数字度量值可划分为可加、半可加、不可加三类。可加性度量可以按照与事实表关联的任意维度汇总,就是说按任何维度汇总得到的度量和是相同的,事实表中的大部分度量属于此类。半可加度量可以对某些维度汇总,但不能对所有维度汇总。余额是常见的半可加度量,除了时间维度外,它们可以跨所有维度进行加法操作。另外还有些度量是完全不可加的,例如比例。对非可加度量,较好的处理方法是尽可能存储构成非可加度量的可加分量,如构成比例的分子和分母,并将这些分量汇总到最终的结果集合中,而对不可加度量的计算通常发生在BI层或OLAP层。

        事实表中可以存在空值度量。所有聚合函数,如sum、count、min、max、avg等均可针对空值度量进行计算,其中sum、count(字段名)、min、max、avg会忽略空值,而count(1)或count(*)在计数时会将空值包含在内。然而,事实表中的外键不能存在空值,否则会导致违反参照完整性的情况发生。关联的维度表应该用默认代理键而不是空值表示未知的条件。

        很多情况下数据仓库需要装载如下三种不同类型的事实表。

  • 事务事实表:以每个事务或事件为单位,如一个销售订单记录、一笔转账记录等,作为事实表里的一行数据。这类事实表可能包含精确的时间戳和退化维度键,其度量值必须与事务粒度保持一致。销售订单数据仓库中的sales_order_fact表就是事务事实表。
  • 周期快照事实表:这种事实表里并不保存全部数据,只保存固定时间间隔的数据,例如每天或每周的销售额,或每月的账户余额等。
  • 累积快照事实表:累积快照用于跟踪事实表的变化。例如,数据仓库可能需要累积或存储销售订单从下订单的时间开始,到订单中的商品被打包、运输和到达的各阶段的时间点数据来跟踪订单生命周期的进展情况。当这个过程进行时,随着以上各种时间的出现,事实表里的记录也要不断更新。

二、周期快照

        周期快照事实表中的每行汇总了发生在某一标准周期,如一天、一周或一月的多个度量。其粒度是周期性的时间段,而不是单个事务。周期快照事实表通常包含许多数据的总计,因为任何与事实表时间范围一致的记录都会被包含在内。在这些事实表中,外键的密度是均匀的,因为即使周期内没有活动发生,通常也会在事实表中为每个维度插入包含0或空值的行。

        周期快照在库存管理和人力资源系统中有比较广泛的应用。商店的库存优化水平对连锁企业的获利将产生巨大影响。需要确保正确的产品处于正确的商店中,在正确的时间尽量减少出现脱销的情况,并减少总的库存管理费用。零售商希望通过产品和商店分析每天保有商品的库存水平。在这个场景下,通常希望分析的业务过程是零售商店库存的每日周期快照。在人力资源管理系统中,除了为员工建立档案外,还希望获得员工状态的例行报告,包括员工数量、支付的工资、假期天数、新增员工数量、离职员工数量,晋升人员数量等。这时需要建立一个每月员工统计周期快照。

        周期快照是在一个给定的时间对事实表进行一段时期的总计。有些数据仓库用户,尤其是业务管理者或者运营部门,经常要看某个特定时间点的汇总数据。下面在示例数据仓库中创建一个月销售订单周期快照,用于按产品统计每个月总的销售订单金额和产品销售数量。

1. 修改数据仓库模式

        需求是要按产品统计每个月的销售金额和销售数量。单从功能上看,此数据能够从事务事实表中直接查询得到。例如要取得2020年10月的销售数据,可以使用以下的语句查询:

select b.month_sk, a.product_sk, sum(order_amount), sum(order_quantity)  
  from dw.sales_order_fact a,  
       dw.month_dim b,  
       dw.order_date_dim d
 where a.order_date_sk = d.order_date_sk  
   and b.month = d.month  
   and b.year = d.year  
   and b.month = 10   
   and b.year = 2020
 group by b.month_sk, a.product_sk ;

        只要将年、月参数传递给这条查询语句,就可以获得任何年月的统计数据。但即便是在如此简单的场景下,我们仍然需要建立独立的周期快照事实表。事务事实表的数据量都会很大,如果每当需要月销售统计数据时,都从最细粒度的事实表查询,那么性能将会差到不堪忍受的程度。再者,月统计数据往往只是下一步数据分析的输入信息,有时把更复杂的逻辑放到一个单一的查询语句中效率会更差。因此,好的做法是将事务型事实表作为一个基石事实数据,以此为基础,向上逐层建立需要的快照事实表。图9-1中的模式显示了一个名为month_end_sales_order_fact的周期快照事实表。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-1 月销售统计周期快照事实表

        新的周期快照事实表中有两个度量值,month_order_amount和month_order_quantity。这两个值是不能加到sales_order_fact表中的,原因是sales_order_fact表和新度量值有不同的时间属性,也即数据的粒度不同。sales_order_fact表包含的是单一事务记录。新的度量值要的是每月的汇总数据。销售周期快照是一个普通的引用两个维度的事实表。月份维度表包含以月为粒度的销售周期描述符。产品代理键对应有效的产品维度行,也就是给定报告月的最后一天对应的产品代理键,以保证月末报表是对当前产品信息的准确描述。快照中的事实包含每月的数字度量和计数,它们是可加的。该快照事实表使用ORC存储格式。使用下面的脚本建立month_end_sales_order_fact表并装载历史数据。

use dw;     
create table month_end_sales_order_fact (    
    order_month_sk int comment '月份代理键',   
    product_sk int comment '产品代理键',   
    month_order_amount decimal(10,2) comment '月销售金额',   
    month_order_quantity int comment '月销售数量'  
)  
clustered by (order_month_sk) into 8 buckets        
stored as orc tblproperties ('transactional'='true');

-- 初始装载
insert into month_end_sales_order_fact
select d.month_sk month_sk,  
       a.product_sk product_sk,  
       sum(order_amount) order_amount,  
       sum(order_quantity) order_quantity  
  from sales_order_fact a,  
       date_dim b,    
       month_dim d  
 where a.order_date_sk = b.date_sk             
   and b.month = d.month
   and b.year = d.year  
   and b.dt < '2020-11-01'   
 group by d.month_sk , a.product_sk;

2. 创建快照表数据装载Kettle转换

        建立了month_end_sales_order_fact表后,现在需要向表中装载数据。实际装载时,月销售周期快照事实表的数据源是已有的销售订单事务事实表,而并没有关联产品维度表。之所以可以这样做,是因为总是先处理事务事实表,再处理周期快照事实表,并且事务事实表中的产品代理键就是当时有效的产品描述。这样做还有一个好处是,不必要非在1号装载上月的数据,这点在后面修改定时自动执行作业时会详细说明。用于装载月销售订单周期快照事实表的Kettle作业如图9-2所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-2 装载月销售订单周期快照事实表的作业

        “设置年月变量”所调用的Kettle转换如图9-3所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-3 设置年月变量的转换

获取系统信息步骤取得上月第一天,公式步骤用month和year函数获得上月对应的月份与年份,设置环境变量步骤设置MONTH和YEAR两个全局变量用于后面SQL作业项中的替换变量。        SQL作业项中的语句如下:

delete from dw.month_end_sales_order_fact
 where month_end_sales_order_fact.order_month_sk in
 (select month_sk
    from dw.month_dim
   where month = ${MONTH}
     and year = ${YEAR});

insert into dw.month_end_sales_order_fact
select b.month_sk, a.product_sk, sum(a.order_amount), sum(a.order_quantity)
  from dw.order_date_dim d 
  left join dw.sales_order_fact a on d.order_date_sk = a.order_date_sk
 inner join dw.month_dim b on b.month = d.month and b.year = d.year
   and b.month = ${MONTH}
   and b.year = ${YEAR}
 group by b.month_sk, a.product_sk;

        第一句删除上月数据,实现幂等操作,第二句装载上月销售汇总数据。前面曾经提到过,周期快照表的外键密度是均匀的,因此这里使用外连接关联订单日期维度和事务事实表。即使上个月没有任何销售记录,周期快照中仍然会有一行记录。在这种情况下,周期快照记录中只有月份代理键,其它字段值为NULL。严格地说产品维度表中应该增加如‘N/A’这样一行表示没有对应产品时的缺省值。

        可以在每个月给定的任何一天,销售订单事实表定期装载完成后,执行图9-2所示的作业,装载上个月的销售订单汇总数据。为此需要修改定期装载Kettle作业,如图9-4所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-4 增加了周期快照装载的作业

        在定期装载作业中增加了“判断日期”和“装载周期快照表”两个作业项。“判断日期”是一个用来判断当天的日期JavaScript作业项,脚本如下:

var d = new Date();
var n = d.getDate();
if( n==12) {true;} else {false;}

        当日期等于12时,JavaScript作业项返回true,执行其后的“装载周期快照表”作业项,否则转到成功节点结束作业。“装载周期快照表”子作业调用图9-2所示的作业,执行完成后转到成功节点结束。很明显,本例中“判断日期”的作用就是控制在并且只在一个月当中的某一天执行周期快照表的数据装载,其它日期不做这步操作。这里的n==12只是为了方便测试,因为SQL中是以上个月的年月作为过滤条件,所以换做当月中任何一天都可以。这个作业保证了每月汇总只有在某天定期装载执行完后才开始,并且每月只执行一次。

        周期快照粒度表示一种常规性的重复的度量或度量集合,比如每月报表。这类事实表通常包括一个单一日期列,表示一个周期。周期快照事实必须满足粒度需求,仅描述适合于所定义周期的时间范围的度量。周期快照是一种常见的事实表类型,其周期通常是天、周或月。

        周期快照具有与事务粒度事实表相似的装载特性,插入数据的过程类似。传统上,周期快照在适当的时期结束时被装载,就像示例演示的那样。还有常见的一种做法是,滚动式地添加周期快照记录。在满足以下两个条件时,往往采用滚动式数据装载:一是事务数据量非常大,以至于装载一个月的快照需要很长时间;二是快照的度量是可加的。例如可以建立每日销售周期快照,数据从事务事实表汇总而来,然后月快照数据从每日快照汇总。这样能够把一个大的查询分散到每一天进行。

三、累计快照

        累积快照事实表用于定义业务过程开始、结束以及期间的可区分的里程碑事件。通常在此类事实表中针对过程中的关键步骤都包含日期外键,并包含每个步骤的度量,这些度量的产生一般都会滞后于数据行的创建时间。累积快照事实表中的一行,对应某一具体业务的多个状态。例如,当订单产生时会插入一行。当该订单的状态改变时,累积事实表行被访问并修改。这种对累积快照事实表行的一致性修改在三种类型的事实表中具有独特性,对于前面介绍的两类事实表只追加数据,不会对已经存在的行进行更新操作。除了日期外键与每个关键过程步骤关联外,累积快照事实表中还可以包含其它维度和可选退化维度的外键。

        累积快照事实表在库存、采购、销售、电商等业务领域都有广泛应用。比如在电商订单里面,下单的时候只有下单时间,但是在支付的时候,又会有支付时间,同理,还有发货时间,完成时间等等。下面以销售订单数据仓库为例,讨论累积快照事实表的实现。

        假设希望跟踪以下五个销售订单里程碑:下订单、分配库房、打包、配送和收货,分别用状态N、A、P、S、R表示。这五个里程碑的日期及其各自的数量来自源数据库的销售订单表。一个订单完整的生命周期由五行数据描述:下订单时生成一条销售订单记录;订单商品被分配到相应库房时,新增一条记录,存储分配时间和分配数量;产品打包时新增一条记录,存储打包时间和数量;类似的,订单配送和客户收货时也都分别新增一条记录,保存各自的时间戳与数量。为简化示例,不考虑每种状态出现多条记录的情况(如一条订单中的产品可能是在不同时间点分多次出库),并且假设这五个里程碑是以严格的时间顺序正向进行的。

        对订单的每种状态新增记录只是处理这种场景的多种设计方案之一。如果里程碑的定义良好并且不会轻易改变,也可以考虑在源订单事务表中新增每种状态对应的数据列,例如,新增8列,保存每个状态的时间戳和数量。新增列的好处是仍然能够保证订单号的唯一性,并保持相对较少的记录数。但是这种方案还需要额外增加一个last_modified字段记录订单的最后修改时间,用于增量数据抽取。因为每条订单在状态变更时都会被更新,所以订单号字段已经不能作为变化数据捕获的比较依据。

1. 修改数据库模式

        执行下面的脚本将源数据库中销售订单事务表结构做相应改变,以处理五种不同的状态。

-- mysql  
use source;    
-- 修改销售订单事务表    
alter table sales_order    
      change order_date status_date datetime, 
      add order_status varchar(1) after status_date, 
      change order_quantity quantity int;    
  
-- 删除sales_order表的主键    
alter table sales_order change order_number order_number int not null;
alter table sales_order drop primary key;
   
-- 建立新的主键  
alter table sales_order add id int unsigned not null auto_increment primary key comment '主键' first;

        说明:

  • 将order_date字段改名为status_date,因为日期不再单纯指订单日期,而是指变为某种状态日期。
  • 将order_quantity字段改名为quantity,因为数量变为某种状态对应的数量。
  • 在status_date字段后增加order_status字段,存储N、A、P、S、R等订单状态之一。它描述了status_date列对应的状态值,例如,如果一条记录的状态为N,则status_date列是下订单的日期,如果状态是R,status_date列是收货日期。
  • 每种状态都会有一条订单记录,这些记录具有相同的订单号,因此订单号不能再作为事务表的主键,需要删除order_number字段上的自增属性与主键约束。
  • 添加id自增字段作为销售订单表的主键,它是表中的第一个字段。

        依据源数据库事务表的结构,执行下面的脚本修改Hive中相应的过渡区表。

use rds;  
alter table sales_order change order_date status_date timestamp comment '状态日期';  
alter table sales_order change order_quantity quantity int comment '数量';  
alter table sales_order add columns (order_status varchar(1) comment '订单状态');

        说明:

  • 将销售订单事实表中order_date和order_quantity字段的名称修改为与源表一致。
  • 增加订单状态字段。
  • rds.sales_order并没有增加id列,原因有两个:一是该列只作为增量检查列,不用在过渡表中存储;二是不需要再重新导入已有数据。

        执行下面的脚本将数据仓库中的事务事实表增加订单状态列、创建累积快照事实表并添加分区。

use dw;    

-- 增加订单状态列
alter table sales_order_fact add columns (order_status varchar(1) comment '订单状态'); 

-- 创建累积快照事实表
create table sales_order_fact_accumulate
(
  order_number int COMMENT '销售订单号', 
  customer_sk int COMMENT '客户维度代理键', 
  product_sk int COMMENT '产品维度代理键', 
  order_date_sk int COMMENT '日期维度代理键', 
  order_amount decimal(10,2) COMMENT '销售金额', 
  order_quantity int COMMENT '销售数量', 
  request_delivery_date_sk int COMMENT '请求交付日期', 
  sales_order_attribute_sk int COMMENT '订单属性代理键', 
  customer_zip_code_sk int COMMENT '客户邮编代理键', 
  shipping_zip_code_sk int COMMENT '送货邮编代理键',
  allocate_date_sk int comment '分配日期代理键',  
  allocate_quantity int comment '分配数量',  
  packing_date_sk int comment '打包日期代理键',  
  packing_quantity int comment '打包数量',  
  ship_date_sk int comment '配送日期代理键',  
  ship_quantity int comment '配送数量',  
  receive_date_sk int comment '收货日期代理键',  
  receive_quantity int comment '收货数量'
)
partitioned by (flag string)
clustered by (order_number) into 8 buckets    
stored as orc tblproperties ('transactional'='true');

-- 创建分区
alter table sales_order_fact_accumulate add partition (flag='active') partition (flag='readonly');

        累积快照事实表在销售订单事实表基础上新增加八个字段存储后四个状态的日期代理键和度量值,并且以flag字段作为分区键划分为active与readonly两个分区。累积事实表的数据装载需要面对两个挑战:1. ETL过程处理尽量少的数据;2. 不使用DML(Data Manipulation Language,数据操纵语言,如insert、update、delete等)。针对前者,解决方案是将整个累积事实表分为活动和只读两个分区,可以通过Hive的分区表实现。活动分区存储没有完成全部五个里程碑的订单数据,反之,只读分区存储已经完成全部五个里程碑的完整订单数据。所有的状态更新操作都发生在活动分区,通常活动分区相对较小。

        在传统关系数据库中实现增量处理累积快照,需要行级更新,但Hive中无法这样做。这里存在两个限制,一是Hive的update只能set常量,不支持多表更新和子查询,这使得不能直接用sales_order_fact来更新sales_order_fact_accumulate。如果说第一个限制还能用临时表勉强解决的话,那第二个限制则更加难于处理。我们之前多次指出,处于性能考虑,除周期快照外的事实表装载都是用的“ORC output”步骤,而不用“表输出”步骤,但这带来的问题是再对ORC表执行行级更新操作数据会出现错误。解决这个问题所采取的以下处理流程能完全避免使用DML。

  1. 读取活动分区中的所有数据,同时删除活动分区。
  2. 从源系统中抽取变化的数据,和上一步读取的活动分区中的所有数据合并。
  3. 把完整记录加载到只读分区,把不完整记录加载到活动分区。

        使用Kettle实现时,可以将活动分区中的所有数据装载到一个临时表中,如sales_order_fact_accumulate_tmp。该表的结构除了没有分区键字段flag以外,其它与sales_order_fact_accumulate相同(因此这里没有列出建表语句)。至于删除活动分区,只需要用“ORC output”步骤输出同名文件,以覆盖原有文件即可实现。

2. 修改增量抽取销售订单表的Kettle转换

        修改后的转换如图9-5所示,抽取的字段名称要做相应修改。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-5 增量抽取销售订单表的转换

3. 修改定期装载销售订单事实表的Kettle转换

        “销售订单事务数据”数据库连接步骤的SQL需要做三点修改:将查询中的字段名a.order_quantity改为a.quantity order_quantity;查询中增加a.order_status字段;SQL中出现的所有order_date改名为status_date。修改后的SQL如下:

select a.order_number, 
       b.customer_sk, 
       c.product_sk, 
       d.date_sk, 
       a.order_amount, 
       a.quantity order_quantity,
       e.date_sk request_delivery_date_sk,
       f.sales_order_attribute_sk,
       g.customer_zip_code_sk,
       h.shipping_zip_code_sk,
       a.order_status
  from rds.sales_order a,
       dw.customer_dim b,
       dw.product_dim c,
       dw.date_dim d,
       dw.date_dim e,
       dw.sales_order_attribute_dim f,
       dw.customer_zip_code_dim g,
       dw.shipping_zip_code_dim h,
       rds.customer i 
 where a.customer_number = b.customer_number
   and a.status_date >= b.effective_date and a.status_date < b.expiry_date
   and a.product_code = c.product_code
   and a.status_date >= c.effective_date and a.status_date < c.expiry_date
   and to_date(a.status_date) = d.dt
   and to_date(a.request_delivery_date) = e.dt
   and a.verification_ind = f.verification_ind 
   and a.credit_check_flag = f.credit_check_flag    
   and a.new_customer_ind = f.new_customer_ind    
   and a.web_order_flag = f.web_order_flag 
   and a.customer_number = i.customer_number
   and i.customer_zip_code = g.customer_zip_code
   and a.status_date >= g.effective_date and a.status_date < g.expiry_date
   and i.shipping_zip_code = h.shipping_zip_code
   and a.status_date >= h.effective_date and a.status_date < h.expiry_date
   and a.entry_date >= ? and a.entry_date < ?

        “ORC output”步骤的字段最后增加String类型的order_status。

4. 修改定期装载Kettle作业

        在“装载事实表”作业项后增加装载累积快照事实表的子作业,如图9-6所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-6 增加装载累积快照事实表的子作业

        子作业调用一个如图9-7所示的Kettle作业装载累积快照事实表。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-7 装载累积快照事实表的作业

        该作业顺序调用两个Kettle转换。“读取活动分区数据”转换实现分区处理流程的第一步,“数据合并与分区”转换实现分区处理流程的第二步和第三步。

        “读取活动分区数据”转换如图9-8所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-8 读取活动分区的转换

        转换的“表输入”步骤从累积事实表读取活动分区的全部数据,SQL如下:

select order_number, 
       customer_sk, 
       product_sk, 
       order_date_sk, 
       order_amount, 
       order_quantity, 
       request_delivery_date_sk, 
       sales_order_attribute_sk, 
       customer_zip_code_sk, 
       shipping_zip_code_sk, 
       allocate_date_sk, 
       allocate_quantity, 
       packing_date_sk, 
       packing_quantity, 
       ship_date_sk, 
       ship_quantity, 
       receive_date_sk, 
       receive_quantity
  from dw.sales_order_fact_accumulate
 where flag='active'

        “ORC output”步骤输出文件到sales_order_fact_accumulate_tmp表所对应的的HDFS目录下:hdfs://nameservice1/user/hive/warehouse/dw.db/sales_order_fact_accumulate_tmp/sales_order_fact_accumulate_tmp。输出的字段是与sales_order_fact_accumulate_tmp表所对应的18个字段。每次覆盖原同名文件,因此不用另行删除临时表sales_order_fact_accumulate_tmp的数据。注意勾选该步骤中的“Overwrite existing output file”选项。

        “数据合并与分区”转换如图9-9所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-9 数据合并与分区的转换

        “排序合并”步骤以order_number字段排序,合并两个数据集合,功能类似于SQL中的union。该步骤要求它所合并的数据集合具有完全相同的字段结构,并且已经按步骤中指定的字段排序,否则可能导致错误结果。

        第一个数据集合是销售订单事实表中的增量数据,通过“读取时间窗口”、“查询事实表增量数据”、“字段选择”、“排序记录”、“行转列”五个步骤获得。“读取时间窗口”步骤从时间戳表查出需要处理的起止时间,SQL为:

select last_load , current_load from rds.cdc_time

        “查询事实表增量数据”步骤从销售订单事实表查询增量数据,SQL为:

select order_number, 
       customer_sk, 
       product_sk, 
       order_date_sk, 
       order_amount,     
       order_quantity, 
       request_delivery_date_sk, 
       sales_order_attribute_sk, 
       customer_zip_code_sk, 
       shipping_zip_code_sk, 
       order_status
  from dw.sales_order_fact t1, dw.date_dim t2
 where t1.order_date_sk = t2.date_sk and t2.dt >= ? and t2.dt < ?

其中参数为前一步骤输出的last_load和current_load字段。整个定期装载作业中,装载过渡区数据、装载销售订单事实表、装载累积快照事实表三个部分都查询了时间戳表rds.cdc_time,以获得增量处理的时间窗口。比较高效的做法是将查询rds.cdc_time表的操作作为前导步骤只查一次,并将起始时间赋予整个作业的全局变量,后续步骤都可以引用这些变量作为增量判断条件。这里每次判断增量都查询一次rds.cdc_time表只是出于便于调试一个目的。“字段选择”步骤只选择dw.sales_order_fact表输出的11个字段。“排序记录”步骤按order_number字段排序,这既是“行转列”步骤的要求,也是“排序合并”步骤的要求。“行转列”步骤的设置如图9-10所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-10 行转列步骤

该步骤按order_number字段进行分组,将一组中order_status具有不同值的行转为固定的10列,缺失状态的列值为空。步骤输出为累积订单表对应的18个字段。

        要合并的第二个数据集合为当前活动分区的数据,由“查询活动分区数据”表输入步骤和“排序记录 2”步骤获得。表输入步骤中的SQL如下:

select order_number,
       customer_sk,
       product_sk,
       order_amount,
       request_delivery_date_sk,
       sales_order_attribute_sk,
       customer_zip_code_sk,
       shipping_zip_code_sk,
       order_date_sk,
       order_quantity,
       allocate_date_sk,
       allocate_quantity,
       packing_date_sk,
       packing_quantity,
       ship_date_sk,
       ship_quantity,
       receive_date_sk,
       receive_quantity
  from dw.sales_order_fact_accumulate_tmp

sales_order_fact_accumulate_tmp表数据已由前面的“读取活动分区数据”转换装载。“排序记录 2”按order_number字段排序。

        两个数据集合在合并后进行分组,实现将同一订单号的多行转为一行。“分组”步骤中的分组字段为前8个字段,聚合字段为后10个字段,聚合类型选择“最大”。聚合字段的值只有NULL和整数两种可能,按照比较规则整数大,因此选最大值。“过滤记录”步骤判断receive_date_sk字段的值是否为空,若是则输出到活动分区,否则输出到只读分区。因为假设五个里程碑只能按顺序进行,依据最后一个的日期代理键是否有值就可区分订单是否完整。最后两个“ORC output”步骤生成累积事实表中两个分区所对应的HDFS文件。活动分区对应的Folder/File name属性如下:hdfs://nameservice1/user/hive/warehouse/dw.db/sales_order_fact_accumulate/flag=active/all,每次执行将覆盖已有文件,注意勾选“Overwrite existing output file”选项。只读分区对应的文件为:hdfs://nameservice1/user/hive/warehouse/dw.db/sales_order_fact_accumulate/flag=readonly/${PRE_DATE}。${PRE_DATE}变量值是前一天的日期,以它作为文件名的目的是追加数据,而不是全量覆盖。只读分区存储已完成订单数据,不存在更新问题,因此采取增量处理。

6. 测试

        可以按照以下步骤进行累积快照事实表的数据装载测试。
(1)在源数据库的销售订单事务表中新增两个销售订单记录。

use source;
insert into sales_order values
(143, 143, 1, 1, 'n', 'n', 'n' ,'n', '2020-11-02 11:11:11', 'N', '2020-11-10', '2020-11-02 11:11:11', 7500, 75), 
(144, 144, 2, 2, 'y', 'y', 'y', 'y', '2020-11-02 12:12:12', 'N', '2020-11-10', '2020-11-02 12:12:12', 1000, 10) ;
commit ;

(2)设置适当的cdc_time时间窗口。

update rds.cdc_time set last_load='2020-11-02';

(3)执行图9-6所示的定期装载作业。

(4)修改生成的HDFS文件名,避免后面再次执行作业时覆盖已装载数据。

hdfs dfs -mv /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-16.txt /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-02.txt
hdfs dfs -mv /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-16 /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-02

(5)    查询sales_order_fact_accumulate表,确认定期装载成功。此时应该只有订单日期代理键列有值,其它状态的日期值都是NULL,因为这两个订单是新增的,并且还没有分配库房、打包、配送或收货。

select order_date_sk,order_quantity,
       allocate_date_sk,allocate_quantity,
       packing_date_sk,packing_quantity,
       ship_date_sk,ship_quantity,
       receive_date_sk,receive_quantity,
       flag
  from dw.sales_order_fact_accumulate;

(6)在源数据库中插入数据作为这两个新增订单的分配库房和打包里程碑。

use source;
insert into sales_order values
(145, 143, 1, 1, 'n', 'n', 'n' ,'n', '2020-11-03 11:11:11', 'A', '2020-11-10', '2020-11-03 11:11:11', 7500, 75), 
(146, 143, 1, 1, 'n', 'n', 'n' ,'n', '2020-11-03 11:11:11', 'P', '2020-11-10', '2020-11-03 11:11:11', 7500, 75), 
(147, 144, 2, 2, 'y', 'y', 'y', 'y', '2020-11-03 12:12:12', 'A', '2020-11-10', '2020-11-03 12:12:12', 1000, 10) ;
commit ;

(7)设置适当的cdc_time时间窗口。

update rds.cdc_time set last_load='2020-11-03';

(8)执行定期装载作业。

(9)修改生成的HDFS文件名。

hdfs dfs -mv /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-16.txt /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-03.txt
hdfs dfs -mv /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-16 /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-03

(10)查询sales_order_fact_accumulate表,确认定期装载成功。此时订单应该具有了分配库房或打包的日期代理键和度量值。

(11)在源数据库中插入数据作为这两个订单后面的里程碑:打包、配送和收货。注意四个状态日期可能相同。

use source;
insert into sales_order values
(148, 143, 1, 1, 'n', 'n', 'n' ,'n', '2020-11-04 11:11:11', 'S', '2020-11-10', '2020-11-04 11:11:11', 7500, 75), 
(149, 143, 1, 1, 'n', 'n', 'n' ,'n', '2020-11-04 11:11:11', 'R', '2020-11-10', '2020-11-04 11:11:11', 7500, 75), 
(150, 144, 2, 2, 'y', 'y', 'y', 'y', '2020-11-04 12:12:12', 'P', '2020-11-10', '2020-11-04 12:12:12', 1000, 10) ;
commit ;

(12)设置适当的cdc_time时间窗口。

update rds.cdc_time set last_load='2020-11-04';

(13)执行定期装载作业。

(14)修改生成的HDFS文件名。

hdfs dfs -mv /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-16.txt /user/hive/warehouse/rds.db/sales_order/sales_order_2020-11-04.txt
hdfs dfs -mv /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-16 /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-11-04

(15)查询sales_order_fact_accumulate表,确认定期装载成功。此时订单应该具有了所有五个状态的日期代理键和度量值。

        累积快照粒度表示一个有明确开始和结束过程的当前发展状态。通常这些过程持续时间较短,并且状态之间没有固定的时间间隔,因此无法将它归类到周期快照中。订单处理是一种典型的累积快照示例。累积快照的设计和管理与其它两类事实表存在较大的差异。所有累积快照事实表都包含一系列日期,用于描述典型的处理工作流。

        例如销售订单示例包含订单日期、分配库房日期、打包日期、配送日期以及收货日期等,这5个不同的日期以5个不同日期值代理键的外键出现。订单行首次建立时只有订单日期,因为其它的状态都还没有发生。当订单在其流水线上执行时,同一个事实行被顺序访问。每当订单状态发生改变时,累积快照事实行就被修改。日期外键被重写,各类度量被更新。通常初始的订单生成日期不会更新,因为它描述的是行被建立的时间,但是所有其它日期都可以被重写。

        通常利用三种事实表类型来满足各种需要。细节数据可以被保存到事务粒度事实表中,周期历史可以通过周期快照获取,而对于具有多个定义良好里程碑的处理工作流,则可以使用累积快照。

四、无事实的事实表

        在多维数据仓库建模中,有一种事实表叫做“无事实的事实表”。普通事实表中,通常会保存若干维度外键和多个数字型度量,度量是事实表的关键所在。然而在无事实的事实表中没有这些度量值,只有多个维度外键。表面上看,无事实事实表是没有意义的,因为作为事实表,毕竟最重要的就是度量。但在数据仓库中,这类事实表有其特殊用途。无事实的事实表通常用来跟踪某种事件或者说明某些活动的范围。

        无事实的事实表可以用来跟踪事件的发生。例如,在给定的某一天中发生的学生参加课程的事件,可能没有可记录的数字化事实,但该事实行带有一个包含日期、学生、教师、地点、课程等定义良好的外键。利用无事实的事实表可以按各种维度计数上课这个事件。再比如学生注册事件,学校需要对学生按学期进行跟踪。维度表包括学期维度、课程维度、系维度、学生维度、注册专业维度和取得学分维度等,而事实表由这些维度的主键组成,事实只有注册数,并且恒为1,因此没有必要用单独一列来表示。这样的事实表主要用于回答各种情况下的注册数。

        无事实的事实表还可以用来说明某些活动的范围,常被用于回答“什么未发生”这样的问题,例如促销范围事实表。通常销售事实表可以回答促销商品的销售情况,可是无法回答的一个重要问题是:处于促销状态但尚未销售的产品包括哪些?销售事实表所记录的仅仅是实际卖出的产品。事实表行中不包括由于没有销售行为而销售数量为零的行,因为如果将包含零值的产品都加到事实表中,那么事实表将变得非常巨大。这时,通过建立促销范围事实表,将商场需要促销的商品单独建立事实表保存,然后通过这个促销范围事实表和销售事实表即可得出哪些促销商品没有销售出去。

        为确定当前促销的产品中哪些尚未卖出,需要两步过程:首先,查询促销无事实的事实表,确定给定时间内促销的产品。然后从销售事实表中确定哪些产品已经卖出去了。答案就是上述两个列表的差集。这样的促销范围事实表只是用来说明促销活动的范围,其中没有任何事实度量。可能有读者会想,建立一个单独的促销商品维度表能否可以达到同样的效果呢?促销无事实的事实表包含多个维度的主键,可以是日期、产品、商店、促销等,将这些键作为促销商品的属性是不合适的,因为每个维度都有自己的属性集合。

        促销无事实事实表看起来与销售事实表相似,然而它们的粒度存在显著差别。假设促销是以一周为持续期,在促销范围事实表中,将为每周每个商店中促销的产品加载一行,无论产品是否卖出。该事实表能够确保看到被促销定义的键之间的关系,而与其它事件,如产品销售无关。

        下面以销售订单数据仓库为例,说明如何处理源数据中没有度量的需求。我们将建立一个无事实的事实表,用来统计每天发布的新产品数量。产品源数据不包含产品数量信息,如果系统需要得到历史某一天新增产品的数量,很显然不能简单地从数据仓库中得到。这时就要用到无事实的事实表技术。使用此技术可以通过持续跟踪产品发布事件来计算产品的数量。可以创建一个只有产品(计什么数)和日期(什么时候计数)维度代理键的事实表。之所以叫做无事实的事实表是因为表本身并没有数字型度量值。这里定义的新增产品是指在某一给定日期,源产品表中新插入的产品记录,不包括由于SCD2新增的产品版本记录。

1. 建立新产品发布的无事实事实表

        在数据仓库模式中新建一个产品发布的无事实事实表product_count_fact,该表中只包含两个字段,分别是引用日期维度表和产品维度表的外键,同时这两个字段也构成了无事实事实表的逻辑主键。图9-11显示了跟踪产品发布数量的数据仓库模式(只显示与无事实事实表相关的表)。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-11 无事实的事实表

        执行下面的脚本在数据仓库模式中创建产品发布无事实事实表。

use dw;    
create table product_count_fact (    
    product_sk int,    
    product_launch_date_sk int)
row format delimited fields terminated by ','
stored as textfile;

从字段定义看,产品维度表中的生效日期明显就是新产品的发布日期。本示例中无事实事实表的数据装载没有行级更新需求,所以该表使用CSV文本存储格式。

2. 初始装载无事实事实表

        创建如图9-12所示的Kettle转换,向无事实事实表装载已有的产品发布日期代理键。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-12 初始装载无事实事实表的转换

“日期维度”表输入步骤中的SQL查询日期维度表的代理键和日期值:

select date_sk, dt from dw.date_dim

“产品维度”表输入步骤中的SQL查询产品维度表的代理键和产品首次发布日期:

select product_sk, effective_date from dw.product_dim where version=1

“流查询”步骤的设置如图9-13所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-13 流查询步骤

        该步骤从“产品维度”获得产品首次发布日期,在“日期维度”步骤中寻找匹配的行,从而将date_sk字段从“日期维度”步骤传递到“流查询”步骤的输出流中。本例中因为每个产品发布日期在日期维度表中都能找到,每次查询都会成功,所以不需要设置date_sk的默认值。现实场景中可能要查询的数据在查找表中没有。为处理这种情况,建议使用一个容易识别的默认值。

        最后的“Hadoop file output”步骤将输出流数据生成无事实事实表所对应的HDFS文件:/user/hive/warehouse/dw.db/product_count_fact/product_count_fact,字段为product_sk和date_sk。内容标签页中,分隔符为逗号,格式选择LF terminated (Unix),编码选择UTF-8,其它属性为空。

        转换中使用的流查询步骤支持从各种数据源和其它步骤查询数据,但只允许做等值查询。在一些场景下,如维度数据和事实数据能同时准备好,先使用“表输入”步骤获取每个业务键最后一个版本的维度数据,然后再用“流查询”步骤把“表输入”步骤的结果作为输入,是查询大型维度表的最快方式。

        这种方法速度快,是因为查询集里只包括了实际需要的记录。若客户维度包括了三百万行记录(包括了历史记录),当前最新版本的数据可能只有总数的1/3(这是很普遍的情况),所以只要用流查询步骤在一百万行数据中查找就可以。即使是同样多的数据行,使用流查询步骤也快一些。

        与流复制相关的有一种不太常见但是跟重要的情况:一个步骤可以导致整个转换被挂起,如图9-14所示的例子。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-14 一个会被挂起的转换

        这个转换要把一个客户的消费合计数追加到客户记录里,可能是用于统计每个客户的消费百分比。该转换的问题是从Customers步骤出来的数据既用于主数据流,也用于流查询步骤使用的查询数据流。在这种情况下,流查询步骤会在全部接收完查询数据流里的所有数据后才开始进行查询,如果查询数据流里的数据没有结束,流查询步骤就会一直读取。在转换开始后,流查询步骤会阻塞主数据流,一直接收查询数据流里的数据,等待查询数据流里的数据直到数据结束。而此时,流查询步骤和Customers步骤之间的缓存会被填满,导致Customers步骤不能再继续发送数据给查询数据流。这样整个转换就进入了死锁状态,两个或多个步骤在互相等待。

        可以把转换改成图9-15的样子,让转换读取数据源两次,避免循环引用。也可以把一个转换分成两个转换,将查询数据保存在临时文件或数据库表中。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-15 通过读两次数据源解决死锁问题

3. 修改定期装载Kettle作业

        只需要将初始装载产品数量事实表的转换合并到定期装载Kettle作业中,如图9-16所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-16 增加装载产品数量事实表的转换

注意需要加在装载维度表作业项之后,因为需要使用最新的产品维度表数据。转换要做两点修改,一是只查询新发布的产品,因此“产品维度”步骤中的SQL改为:

select product_sk, effective_date from dw.product_dim where version=1 and effective_date = '${PRE_DATE}'

并且勾选“替换sql语句中的变量”属性。二是“Hadoop file output”步骤生成的文件名中添加${PRE_DATE}变量以实现增量装载。

4. 测试定期装载作业

(1)修改源数据库的产品表数据。可以做两点修改:新增一个产品;更改一个已有产品的名称。

-- mysql
use source;
update product set product_name = 'Regular Hard Disk Drive' where product_code=1;
insert into product values (5, 'High End Hard Disk Drive', 'Storage');
commit;

(2)执行定期装载作业。

(3)上一步执行成功后,查询产品发布无事实事实表,确认定期装载执行正确。此时的结果应该只是增加了一条新产品记录,原有数据没有变化。

        无事实事实表是没有任何度量的事实表,它本质上是一组维度的交集。用这种事实表记录相关维度之间存在多对多关系,但是关系上没有数字或者文本的事实。无事实事实表为数据仓库设计提供了更多的灵活性。再次考虑学生上课的应用场景,使用一个由学生、时间、课程三个维度键组成的无事实事实表,可以很容易地回答如下问题:

  • 有多少学生在某天上了给定的一门课程?
  • 在某段时间里,一名给定学生每天所上课程的平均数是多少?

五、迟到的事实

        数据仓库通常建立于一种理想的假设情况下,这就是数据仓库的度量(事实记录)与度量的环境(维度记录)同时出现在数据仓库中。当同时拥有事实记录和正确的当前维度行时,就能够从容地首先维护维度键,然后在对应的事实表行中使用这些最新的键。然而,各种各样的原因会导致需要ETL系统处理迟到的事实数据。例如,某些线下的业务,数据进入操作型系统的时间会滞后于事务发生的时间。再或者出现某些极端情况,如源数据库系统出现故障,直到恢复后才能补上故障期间产生的数据。

        在销售订单示例中,晚于订单日期进入源数据的销售订单可以看做是一个迟到事实的例子。销售订单数据被装载进其对应的事实表时,装载日期晚于销售订单产生的日期,因此是一个迟到的事实。本例中因为定期装载的是前一天的数据,所以这里的“晚于”指的是事务数据延迟两天及其以上才到达ETL系统。

        必须对标准的ETL过程进行特殊修改以处理迟到的事实。首先,当迟到度量事件出现时,不得不反向搜索维度表历史记录,以确定事务发生时间点的有效的维度代理键,因为当前的维度内容无法匹配输入行的情况。此外,还需要调整后续事实行中的所有半可加度量,例如由于迟到的事实导致客户当前余额的改变。迟到事实可能还会引起周期快照事实表的数据更新。前面第二节讨论的月销售周期快照表,如果2020年10月的销售订单金额已经计算并存储在month_end_sales_order_fact快照表中,这时一个迟到的10月订单在11月某天被装载,那么2020年10月的快照金额必须因迟到事实而重新计算。

        下面就以销售订单数据仓库为例,说明如何处理迟到的事实。

1. 修改数据仓库模式

        回忆一下第二节中建立的月销售周期快照表,其数据源自已经处理过的销售订单事务事实表。为了确定事实表中的一条销售订单记录是否是迟到的,需要把源数据中的登记日期列装载进销售订单事实表。为此在要销售订单事实表上添加登记日期代理键列。累积快照事实表数据由事务事实表行转列而来,因此也要增加登记日期代理键列。

        执行下面的脚本在销售订单事实表和累积快照事实表里添加名为entry_date_sk的日期代理键列。对于销售订单事实表,新增列会加到表的最后一列。对于累积快照事实表,由于它是分区表,新增列会加到分区列flag之前,其它已存在的普通列之后。

use dw;
alter table sales_order_fact add columns (entry_date_sk int comment '登记日期代理键'); 
alter table sales_order_fact_accumulate add columns (entry_date_sk int comment '登记日期代理键'); 
alter table sales_order_fact_accumulate_tmp add columns (entry_date_sk int comment '登记日期代理键');

2. 修改定期装载Kettle转换

        在给事实表添加了登记日期代理键列以后,需要修改数据仓库定期装载转换来装载登记日期。定期装载销售订单事实表转换中数据库连接步骤的SQL修改如下:

select a.order_number, 
       b.customer_sk, 
       c.product_sk, 
       d.date_sk, 
       a.order_amount, 
       a.quantity order_quantity,
       e.date_sk request_delivery_date_sk,
       f.sales_order_attribute_sk,
       g.customer_zip_code_sk,
       h.shipping_zip_code_sk,
       a.order_status,
       j.date_sk entry_date_sk
  from rds.sales_order a,
       dw.customer_dim b,
       dw.product_dim c,
       dw.date_dim d,
       dw.date_dim e,
       dw.sales_order_attribute_dim f,
       dw.customer_zip_code_dim g,
       dw.shipping_zip_code_dim h,
       rds.customer i,
       dw.date_dim j
 where a.customer_number = b.customer_number
   and a.status_date >= b.effective_date and a.status_date < b.expiry_date
   and a.product_code = c.product_code
   and a.status_date >= c.effective_date and a.status_date < c.expiry_date
   and to_date(a.status_date) = d.dt
   and to_date(a.request_delivery_date) = e.dt
   and a.verification_ind = f.verification_ind 
   and a.credit_check_flag = f.credit_check_flag    
   and a.new_customer_ind = f.new_customer_ind    
   and a.web_order_flag = f.web_order_flag 
   and a.customer_number = i.customer_number
   and i.customer_zip_code = g.customer_zip_code
   and a.status_date >= g.effective_date and a.status_date < g.expiry_date
   and i.shipping_zip_code = h.shipping_zip_code
   and a.status_date >= h.effective_date and a.status_date < h.expiry_date
   and a.entry_date >= ? and a.entry_date < ?
   and to_date(a.entry_date) = j.dt

增加了dw.date_dim j表别名、to_date(a.entry_date) = j.dt连接条件用于获取登记日期代理键j.date_sk。注意sales_order源数据表及其对应的过渡表中都已经含有登记日期,只是以前没有将其装载进数据仓库。相应的“ORC output”步骤的“Fields”中最后增加entry_date_sk字段。

        累积快照事实表的装载同样也要增加登记日期代理键字段。“读取活动分区转换”中,表输入步骤的SQL改为:

select order_number, 
       customer_sk, 
       product_sk, 
       order_date_sk, 
       order_amount, 
       order_quantity, 
       request_delivery_date_sk, 
       sales_order_attribute_sk, 
       customer_zip_code_sk, 
       shipping_zip_code_sk, 
       allocate_date_sk, 
       allocate_quantity, 
       packing_date_sk, 
       packing_quantity, 
       ship_date_sk, 
       ship_quantity, 
       receive_date_sk, 
       receive_quantity, 
       entry_date_sk
  from dw.sales_order_fact_accumulate
 where flag='active'

        “ORC output”步骤增加entry_date_sk字段。“数据合并与分区”转换中的“查询事实表增量数据”数据库连接步骤、“字段选择”步骤、“查询活动分区数据”表输入步骤、“分组”步骤中的构成分组的字段、“ORC output”和“ORC output 2”步骤均增加entry_date_sk字段。

        本节开头曾经提到,需要为迟到的事实行获取事务发生时间点的有效的维度代理键。在SQL中使用销售订单过渡表的状态日期字段限定当时的维度代理键。例如,为了获取事务发生时的客户代理键,筛选条件为:

status_date >= customer_dim.effective_date and status_date < customer_dim.expiry_date

        之所以可以这样做,原因在于本示例满足以下两个前提条件:在最初源数据库的销售订单表中,status_date存储的是状态发生时的时间;维度的生效时间与过期时间构成一条连续且不重叠的时间轴,任意status_date日期只能落到唯一的生效时间、过期时间区间内。

3. 修改装载月销售周期快照事实表的作业

        迟到的事实记录会对周期快照中已经生成的月销售汇总数据产生影响,因此必须做适当的修改。月销售周期快照表存储的是某月某产品汇总的销售数量和销售金额,表中有月份代理键、产品代理键、销售金额、销售数量四个字段。由于迟到事实的出现,需要将事务事实表中的数据划分为三类:非迟到的事实记录;迟到的事实,但周期快照表中尚不存在相关记录;迟到的事实,并且周期快照表中已经存在相关记录。对这三类事实数据的处理逻辑各不相同,前两类数据需要汇总后插入快照表,而第三种情况需要更新快照表中的现有数据。图9-2所示装载周期快照表作业中的SQL作业项脚本改为:

use dw;

drop table if exists tmp;  
create table tmp as  
select a.order_month_sk order_month_sk,   
       a.product_sk product_sk,  
       a.month_order_amount + b.order_amount month_order_amount,  
       a.month_order_quantity + b.order_quantity month_order_quantity  
  from month_end_sales_order_fact a,  
       (select d.month_sk month_sk,  
               a.product_sk product_sk,  
               sum(order_amount) order_amount,  
               sum(order_quantity) order_quantity  
          from sales_order_fact a,  
               date_dim b,  
               date_dim c,  
               month_dim d  
         where a.order_date_sk = b.date_sk  
           and a.entry_date_sk <=> c.date_sk  
           and c.month = ${MONTH}   
           and c.year = ${YEAR}  
           and b.month = d.month
           and b.year = d.year     
           and b.dt <> c.dt  
         group by d.month_sk , a.product_sk) b  
 where a.product_sk = b.product_sk  
   and a.order_month_sk = b.month_sk; 
  
delete from month_end_sales_order_fact   
 where exists 
 (select 1   
    from tmp t2   
   where month_end_sales_order_fact.order_month_sk = t2.order_month_sk   
     and month_end_sales_order_fact.product_sk = t2.product_sk);
     
insert into month_end_sales_order_fact select * from tmp;  

insert into month_end_sales_order_fact
select d.month_sk, a.product_sk, sum(order_amount), sum(order_quantity)  
  from sales_order_fact a,    
       date_dim b,    
       date_dim c,    
       month_dim d  
 where a.order_date_sk = b.date_sk   
   and a.entry_date_sk <=> c.date_sk   
   and c.month = ${MONTH}   
   and c.year = ${YEAR}  
   and b.month = d.month  
   and b.year = d.year  
   and not exists (select 1   
                     from month_end_sales_order_fact p 
                    where p.order_month_sk = d.month_sk  
                      and p.product_sk = a.product_sk)
 group by d.month_sk, a.product_sk;

        按事务发生时间的先后顺序,先处理第三种情况。为了更新周期快照表数据,需要创建一个临时表。子查询用于从销售订单事实表中获取所有上个月录入的,并且是迟到的数据行的汇总。用b.dt <> c.dt作为判断迟到的条件。本示例中实际可以去掉这条判断语句,因为只有迟到事实会对已有的快照数据造成影响。外层查询把具有相同产品代理键和月份代理键的迟到事实的汇总数据加到已有的快照数据行上。临时表中存储这个查询的结果。注意产品代理键和月份代理键共同构成了周期快照表的逻辑主键,可以唯一标识一条记录。之后使用先删除再插入的方式更新周期快照表。从周期快照表删除数据的操作也是以逻辑主键匹配作为过滤条件。

        之后对第一、二类数据统一处理。使用相关子查询获取所有上个月新录入的,并且在周期快照事实表中尚未存在的产品销售月汇总数据,插入到周期快照表中。销售订单事实表的粒度是每天,而周期快照事实表的粒度是每月,因此必须使用订单日期代理键对应的月份代理键进行比较。注意脚本中的a.entry_date_sk <=> c.date_sk,销售订单事实表中已有数据的entry_date_sk为NULL,而对于含有NULL的等值比较使用<=>操作符。关于该操作符的比较规则详见“https://wxy0327.blog.csdn.net/article/details/109570543#3.%20%E4%BF%AE%E6%94%B9%E5%AE%9A%E6%9C%9F%E8%A3%85%E8%BD%BD%E7%BB%B4%E5%BA%A6%E8%A1%A8%E7%9A%84%E8%BD%AC%E6%8D%A2”。        本示例中迟到事实对月周期快照表数据的影响逻辑并不是很复杂。当逻辑主键,即月份代理键和产品代理键的组合匹配时,将从销售订单事实表中获取的销售数量和销售金额汇总值累加到月周期快照表对应的数据行上,否则将新的汇总数据添加到月周期快照表中。这个逻辑非常适合使用merge into语句,例如在Oracle中,month_sum.sql文件可以写成如下的样子:

declare
pre_month_date date;
month1 int;
year1 int;

begin
select add_months(sysdate,-1) into pre_month_date from dual;
select extract(month from pre_month_date), extract(year from pre_month_date) into month1, year1
  from dual;

 merge into month_end_sales_order_fact t1
 using (select d.month_sk month_sk,  
               a.product_sk product_sk,  
               sum(order_amount) order_amount,  
               sum(order_quantity) order_quantity  
          from sales_order_fact a,  
               order_date_dim b,  
               entry_date_dim c,  
               month_dim d  
         where a.order_date_sk = b.order_date_sk  
           and a.entry_date_sk = c.entry_date_sk  
           and c.month = month1   
           and c.year = year1  
           and b.month = d.month 
           and b.year = d.year      
         group by d.month_sk , a.product_sk) t2
    on (t1.order_month_sk = t2.month_sk and t1.product_sk = t2.product_sk)
  when matched then 
       update set t1.month_order_amount = t1.month_order_amount + t2.order_amount,
                 t1.month_order_quantity = t1.month_order_quantity + t2.order_quantity
  when not matched then 
       insert (order_month_sk, product_sk, month_order_amount, month_order_quantity)
       values (t2.month_sk, t2.product_sk, t2.order_amount, t2.order_quantity);

commit;

end;
/

        Hive从2.2版本开始支持merge into语句,而CDH 6.3中的Hive是2.1.1版本,并不支持merge into。

4.    测试

(1)把销售订单事实表的entry_date_sk字段修改为order_date_sk字段的值。这些登记日期键是后面测试月快照数据装载所需要的。
先创建一个临时表存储销售订单事实表全量数据。

use dw;
drop table if exists tmp;  
create table tmp as select * from sales_order_fact;

然后删除销售订单事实表对应的HDFS目录下的所有文件。

hdfs dfs -rm -skipTrash /user/hive/warehouse/dw.db/sales_order_fact/*

最后执行如图9-17所示的转换重新装载销售订单事实表数据。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-17 重新全量装载销售订单事实表的转换

(2)在执行定期装载脚本前先查询周期快照事实表和销售订单事实表。之后可以对比‘前’(不包含迟到事实)‘后’(包含了迟到事实)的数据,以确认装载的正确性。

select year,
       month,
       product_name,
       month_order_amount amt,
       month_order_quantity qty
  from month_end_sales_order_fact a,
       month_dim b,
       product_dim c
 where a.order_month_sk = b.month_sk
   and a.product_sk = c.product_sk
 order by year , month , product_name;

结果:

...
2020    10    flat panel    45431.00    215
2020    10    floppy drive    14928.00    51
2020    10    hard disk drive    76179.00    248
2020    10    keyboard    40246.00    159
select product_name, sum(order_amount)
  from sales_order_fact a, product_dim b
 where a.product_sk = b.product_sk
 group by product_name
 order by product_name;

结果:

flat panel    55820.00
floppy drive    296566.00
hard disk drive    390421.00
keyboard    42357.00

(3)准备销售订单测试数据。可以在销售订单源数据表中插入三个新的订单记录,第一个是迟到的订单,并且销售的产品在周期快照表中已经存在,第二个也是迟到的订单,但销售的产品在周期快照表中不存在,第三个是非迟到的正常订单。这里需要注意,产品维度是SCD2处理的,所以在添加销售订单源数据时,新增订单时间一定要在产品维度的生效与过期时间区间内。

use source;
insert into sales_order values
/* late arrival                                                       */
  (151, 145, 6, 2, 'y', 'y', 'y', 'n', '2020-10-25 01:01:01', 'N', '2020-10-30', '2020-11-22 01:01:01', 1000, 10),
  (152, 146, 6, 1, 'y', 'y', 'y', 'n', '2020-10-26 01:01:01', 'N', '2020-10-30', '2020-11-22 01:01:01', 1000, 10),
/* normal                                                             */
  (153, 147, 6, 5, 'y', 'n', 'y', 'n', '2020-11-22 01:01:01', 'N', '2020-11-30', '2020-11-22 01:01:01', 2000, 20);
 
commit;

(4)执行定期装载作业。
        修改时间戳表,将last_load改为前一天,然后执行定期装载作业。

update rds.cdc_time set last_load='2020-11-22';

(5)设置Kettle所在服务器的系统日期为下月1号 date -s "2020-12-01 `date +%T`" ,然后手工执行装载周期快照表的Kettle作业。

(6)执行与第(2)步相同的查询获取包含了迟到事实的月底销售汇总数据,对比‘前’‘后’查询的结果,确认数据装载正确。

select year,
       month,
       product_name,
       month_order_amount amt,
       month_order_quantity qty
  from month_end_sales_order_fact a,
       month_dim b,
       product_dim c
 where a.order_month_sk = b.month_sk
   and a.product_sk = c.product_sk
 order by year , month , product_name;

结果:

...
2020    10    flat panel    45431.00    215
2020    10    floppy drive    15928.00    61
2020    10    hard disk drive    77179.00    258
2020    10    keyboard    40246.00    159
2020    11    High End Hard Disk Drive    2000.00    20
2020    11    flat panel    10389.00    108
2020    11    floppy drive    9080.00    165
2020    11    hard disk drive    47860.00    520
2020    11    keyboard    2111.00    28
select product_name, sum(order_amount)
  from sales_order_fact a, product_dim b
 where a.product_sk = b.product_sk
 group by product_name
 order by product_name;

结果:

High End Hard Disk Drive    2000.00
flat panel    55820.00
floppy drive    297566.00
hard disk drive    391421.00
keyboard    42357.00

        从查询结果看到,10月的快照数据由于迟到事实已经更新,11月快照正常装载。测试后同步NTP服务器还原系统日期:

ntpdate 182.118.58.129

六、累积度量

        累积度量指的是聚合从序列内第一个元素到当前元素的数据,例如统计从每年的一月到当前月份的累积销售额。本节说明如何在销售订单示例中实现累积月销售数量和金额,并对数据仓库模式、初始装载、定期装载Kettle作业和转换做相应地修改。累积度量是半可加的,而且它的初始装载比前面实现的要复杂。

1. 修改模式

        建立一个新的名为month_end_balance_fact的事实表,用来存储销售订单金额和数量的月累积值。month_end_balance_fact表在模式中构成了另一个星型模式。新的星型模式除了包括这个新的事实表,还包括两个其它星型模式中已有的维度表,即产品维度表与月份维度表。图9-18所示为新的模式,这里只显示了相关的表。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-18 累积度量

        下面的脚本用于创建month_end_balance_fact表。

use dw;    
create table month_end_balance_fact 
( month_sk int,    
  product_sk int,    
  month_end_amount_balance decimal(10,2),    
  month_end_quantity_balance int )
row format delimited fields terminated by ','
stored as textfile;

因为对此事实表只有追加数据的操作操作,没有update、delete等行级更新需求,所以这里没有用ORC文件格式,而是采用了CSV文本存储格式。

2. 初始装载

        现在要把month_end_sales_order_fact表里的数据装载进month_end_balance_fact表,图9-19显示了初始装载month_end_balance_fact表的转换。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-19 初始装载累积度量表

转换中表输入步骤的SQL如下:

select a.month_sk,    
       b.product_sk,    
       sum(b.month_order_amount) month_order_amount,    
       sum(b.month_order_quantity) month_order_quantity    
  from dw.month_dim a,    
       (select a.*,   
               b.year,   
               b.month,   
               max(a.order_month_sk) over () max_month_sk  
          from dw.month_end_sales_order_fact a, dw.month_dim b   
         where a.order_month_sk = b.month_sk) b  
 where a.month_sk <= b.max_month_sk 
   and a.year = b.year and b.month <= a.month  
 group by a.month_sk , b.product_sk

        此脚本查询累积的月销售订单汇总数据,从每年的一月累积到当月,累积数据不跨年。子查询获取month_end_sales_order_fact表的数据,及其年月和最大月份代理键。外层查询汇总每年一月到当月的累积销售数据,a.month_sk <= b.max_month_sk条件用于限定只统计到现存的最大月份为止。

        在关系数据库中,出于性能方面的考虑,此类需求往往使用自连接查询方法,而不用这种子查询的方式。但是在Hive中,子查询是唯一的选择,原因有两个:第一,Hive中两个表join连接时,不支持关联字段的非相等操作,而累积度量需求显然需要类似<=的比较条件,当join中有非相等操作时,会报“Both left and right aliases encountered in JOIN ...”错误。第二,如果是内连接,我们可以将<=比较放到where子句中,避开Hive的限制。但是这不适合累积度量的场景。假设有产品1在一月有销售,二月没有销售,那么产品1在二月的累积销售值应该从一月继承。而如果使用内连接,用a.product_sk=b.product_sk做连接条件,会过滤掉产品1在二月的累积数据行,这显然是不合理的。

        这里也没有使用Kettle里的数据库连接或流查询步骤。如果使用数据库连接步骤,对数据流中的每一行执行一次Hive查询速度太慢。流查询步骤又只支持等值连接,不适用于累积度量。所以只能在一个表输入步骤中,利用SQL查询执行所有逻辑。“Hadoop file output”步骤将查询结果输出到month_end_balance_fact表所对应的HDFS目录。可在执行完转换后分别查询month_end_sales_order_fact和month_end_balance_fact表,确认初始装载是否正确。

-- 周期快照
select b.year year,  
       b.month month,  
       a.product_sk psk,  
       a.month_order_amount amt,  
       a.month_order_quantity qty  
  from month_end_sales_order_fact a,  
       month_dim b  
 where a.order_month_sk = b.month_sk  
   and a.product_sk > 2
cluster by year, month, psk;

+-------+--------+------+-----------+------+
| year  | month  | psk  |    amt    | qty  |
+-------+--------+------+-----------+------+
| 2020  | 10     | 4    | 45431.00  | 215  |
| 2020  | 10     | 5    | 40246.00  | 159  |
| 2020  | 11     | 4    | 10389.00  | 108  |
| 2020  | 11     | 5    | 2111.00   | 28   |
| 2020  | 11     | 7    | 2000.00   | 20   |
+-------+--------+------+-----------+------+

-- 累积度量
select b.year year,  
       b.month month,  
       a.product_sk psk,  
       a.month_end_amount_balance amt,  
       a.month_end_quantity_balance qty  
  from month_end_balance_fact a,  
       month_dim b  
 where a.month_sk = b.month_sk  
   and a.product_sk > 2
cluster by year, month, psk;

+-------+--------+------+-----------+------+
| year  | month  | psk  |    amt    | qty  |
+-------+--------+------+-----------+------+
| 2020  | 10     | 4    | 45431.00  | 215  |
| 2020  | 10     | 5    | 40246.00  | 159  |
| 2020  | 11     | 4    | 55820.00  | 323  |
| 2020  | 11     | 5    | 42357.00  | 187  |
| 2020  | 11     | 7    | 2000.00   | 20   |
+-------+--------+------+-----------+------+

        可以看到,2020年10月的商品销售金额和数量被累积到了2020年11月。产品4和5累加了10、11两个月的销售数据,产品7只有11月有销售。

3. 定期装载

        定期装载转换的步骤和初始装载一样,只需要做两点修改。其一是表输入步骤的SQL改为:

select order_month_sk month_sk,    
       product_sk,    
       sum(month_order_amount) month_order_amount,    
       sum(month_order_quantity) month_order_quantity   
  from (select a.*    
          from dw.month_end_sales_order_fact a,   
               dw.month_dim b    
         where a.order_month_sk = b.month_sk    
           and b.year = ${YEAR}    
           and b.month = ${MONTH}  
         union all    
        select month_sk + 1 order_month_sk,  
               product_sk product_sk,  
               month_end_amount_balance month_order_amount,  
               month_end_quantity_balance month_order_quantity   
          from dw.month_end_balance_fact a    
         where a.month_sk in (select max(case when ${MONTH} = 1 then 0 else month_sk end)    
                                from dw.month_end_balance_fact)) t  
  group by order_month_sk, product_sk

        子查询将累积度量表和月周期快照表做并集操作,增加上月的累积数据。最外层查询执行销售数据按月和产品的分组聚合。最内层的case语句用于在每年一月时重新归零再累积。第二个修改点是将“Hadoop file output”步骤输出的文件名中加上年月值:month_end_balance_fact_${YEAR}${MONTH}。

        该转换每个月执行一次,装载上个月的数据。可以在执行完月周期快照表定期装载后执行该脚本,年月参数值由周期快照表装载作业提供。修改后的定期装载作业如图9-20所示。

Kettle构建Hadoop ETL实践(九):事实表技术
图9-20 添加装载累积度量的转换

4. 测试定期装载

        使用下面步骤测试非1月的装载:
(1)执行下面的命令向month_end_sales_order_fact表添加两条记录。

insert into dw.month_end_sales_order_fact values (36,5,1000,10),(36,7,1000,10);

(2)将转换中的${YEAR}、${MONTH}替换为2020、12,手工执行累积度量定期装载转换。(3)查询month_end_balance_fact表,确认累积度量数据装载正确。

select b.year year,  
       b.month month,  
       a.product_sk psk,  
       a.month_end_amount_balance amt,  
       a.month_end_quantity_balance qty  
  from month_end_balance_fact a,  
       month_dim b  
 where a.month_sk = b.month_sk  
   and a.product_sk > 2
cluster by year, month, psk;

+-------+--------+------+-----------+------+
| year  | month  | psk  |    amt    | qty  |
+-------+--------+------+-----------+------+
| 2020  | 10     | 4    | 45431.00  | 215  |
| 2020  | 10     | 5    | 40246.00  | 159  |
| 2020  | 11     | 4    | 55820.00  | 323  |
| 2020  | 11     | 5    | 42357.00  | 187  |
| 2020  | 11     | 7    | 2000.00   | 20   |
| 2020  | 12     | 4    | 55820.00  | 323  |
| 2020  | 12     | 5    | 43357.00  | 197  |
| 2020  | 12     | 7    | 3000.00   | 30   |
+-------+--------+------+-----------+------+

        使用下面步骤测试1月的装载:
(1)使用下面的命令向month_end_sales_order_fact表添加一条记录,month_sk的值是37,指的是2021年1月。

insert into dw.month_end_sales_order_fact values (37,3,1000,10);

(2)使用下面的命令向month_end_balance_fact表添加一条记录。

insert into dw.month_end_balance_fact values (36,3,1000,10);

(3)将转换中的${YEAR}、${MONTH}替换为2021、1,手工执行累积度量定期装载转换。(4)查询month_end_balance_fact表,确认累积度量数据装载正确。

select b.year year,  
       b.month month,  
       a.product_sk psk,  
       a.month_end_amount_balance amt,  
       a.month_end_quantity_balance qty  
  from month_end_balance_fact a,  
       month_dim b  
 where a.month_sk = b.month_sk  
   and a.product_sk > 2
cluster by year, month, psk;

+-------+--------+------+-----------+------+
| year  | month  | psk  |    amt    | qty  |
+-------+--------+------+-----------+------+
| 2020  | 10     | 4    | 45431.00  | 215  |
| 2020  | 10     | 5    | 40246.00  | 159  |
| 2020  | 11     | 4    | 55820.00  | 323  |
| 2020  | 11     | 5    | 42357.00  | 187  |
| 2020  | 11     | 7    | 2000.00   | 20   |
| 2020  | 12     | 3    | 1000.00   | 10   |
| 2020  | 12     | 4    | 55820.00  | 323  |
| 2020  | 12     | 5    | 43357.00  | 197  |
| 2020  | 12     | 7    | 3000.00   | 30   |
| 2021  | 1      | 3    | 1000.00   | 10   |
+-------+--------+------+-----------+------+

5. 查询

        累积度量必须要小心使用,因为它是“半可加”的。一个半可加度量在某些维度(通常是时间维度)上是不可加的。例如可以通过产品正确地累加月底累积销售金额。

select year, month, sum(month_end_amount_balance) s  
  from month_end_balance_fact a,  
       month_dim b  
 where a.month_sk = b.month_sk  
 group by year, month  
cluster by year, month;

+-------+--------+------------+
| year  | month  |     s      |
+-------+--------+------------+
| 2020  | 3      | 98109.00   |
| 2020  | 4      | 149874.00  |
| 2020  | 5      | 239345.00  |
| 2020  | 6      | 382840.00  |
| 2020  | 7      | 470511.00  |
| 2020  | 8      | 528575.00  |
| 2020  | 9      | 538940.00  |
| 2020  | 10     | 717724.00  |
| 2020  | 11     | 789164.00  |
| 2020  | 12     | 792164.00  |
| 2021  | 1      | 1000.00    |
+-------+--------+------------+

而通过月份累加月底金额:

select product_name, sum(month_end_amount_balance) s  
  from month_end_balance_fact a,  
       product_dim b  
 where a.product_sk = b.product_sk  
 group by product_name; 

+---------------------------+-------------+
|       product_name        |      s      |
+---------------------------+-------------+
| High End Hard Disk Drive  | 5000.00     |
| flat panel                | 157071.00   |
| floppy drive              | 2112829.00  |
| hard disk drive           | 2305386.00  |
| keyboard                  | 125960.00   |
| lcd panel                 | 2000.00     |
+---------------------------+-------------+

以上查询结果是错误的。正确的结果应该和下面的在month_end_sales_order_fact表上进行的查询结果相同。

select product_name, sum(month_order_amount) s  
  from month_end_sales_order_fact a,  
       product_dim b  
 where a.product_sk = b.product_sk  
 group by product_name;

+---------------------------+------------+
|       product_name        |     s      |
+---------------------------+------------+
| High End Hard Disk Drive  | 3000.00    |
| flat panel                | 55820.00   |
| floppy drive              | 297566.00  |
| hard disk drive           | 391421.00  |
| keyboard                  | 43357.00   |
| lcd panel                 | 1000.00    |
+---------------------------+------------+

七、小结

  • 事务事实表、周期快照事实表和累积快照事实表是多维数据仓库中常见的三种事实表。定期历史数据可以通过周期快照获取,细节数据被保存到事务粒度事实表中,而对于具有多个定义良好里程碑的处理工作流,则可以使用累积快照。
  • 无事实事实表是没有任何度量的事实表,它本质上是一组维度的交集。用这种事实表记录相关维度之间存在多对多关系,但是关系上没有数字或者文本的事实。无事实事实表为数据仓库设计提供了更多的灵活性。
  • 迟到的事实指的是到达ETL系统的时间晚于事务发生时间的度量数据。必须对标准的ETL过程进行特殊修改以处理迟到的事实。需要确定事务发生时间点的有效的维度代理键,还要调整后续事实行中的所有半可加度量。此外,迟到事实可能还会引起周期快照事实表的数据更新。
  • 累积度量指的是聚合从序列内第一个元素到当前元素的数据。累积度量是半可加的,因此对累积度量执行聚合计算时要格外注意分组的维度。