欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

美团DB数据同步到数据仓库的架构与实践

程序员文章站 2022-09-24 15:43:37
背景 在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ods(operational data store)数据。在互联网企业中,常见的ods数据有业务日志...

背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ods(operational data store)数据。在互联网企业中,常见的ods数据有业务日志数据(log)和业务db数据(db)两类。对于业务db数据来说,从mysql等关系型数据库的业务数据进行采集,然后导入到hive中,是进行数据仓库生产的重要环节。

如何准确、高效地把mysql数据同步到hive中?一般常用的解决方案是批量取数并load:直连mysql去select表中的数据,然后存到本地文件作为中间存储,最后把文件load到hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

  • 性能瓶颈:随着业务规模的增长,select from mysql -> save to localfile -> load to hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。
  • 直接从mysql中select大量数据,对mysql的影响非常大,容易造成慢查询,影响业务线上的正常服务。
  • 由于hive本身的语法不支持更新、删除等sql原语,对于mysql中发生update/delete的数据无法很好地进行支持。

为了彻底解决这些问题,我们逐步转向cdc (change data capture) + merge的技术方案,即实时binlog采集 + 离线处理binlog还原业务数据这样一套解决方案。binlog是mysql的二进制日志,记录了mysql中发生的所有数据变更,mysql集群自身的主从同步就是基于binlog做的。

本文主要从binlog实时采集和离线处理binlog还原业务数据两个方面,来介绍如何实现db数据准确、高效地进入数仓。

整体架构

美团DB数据同步到数据仓库的架构与实践

整体的架构如上图所示。在binlog实时采集方面,我们采用了阿里巴巴的开源项目canal,负责从mysql实时拉取binlog并完成适当解析。binlog采集后会暂存到kafka上供下游消费。整体实时采集部分如图中红色箭头所示。

离线处理binlog的部分,如图中黑色箭头所示,通过下面的步骤在hive上还原一张mysql表:

  • 采用linkedin的开源项目camus,负责每小时把kafka上的binlog数据拉取到hive上。
  • 对每张ods表,首先需要一次性制作快照(snapshot),把mysql里的存量数据读取到hive上,这一过程底层采用直连mysql去select数据的方式。
  • 对每张ods表,每天基于存量数据和当天增量产生的binlog做merge,从而还原出业务数据。

我们回过头来看看,背景中介绍的批量取数并load方案遇到的各种问题,为什么用这种方案能解决上面的问题呢?

  • 首先,binlog是流式产生的,通过对binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对mysql的访问压力上,都会有明显地改善。
  • 第二,binlog本身记录了数据变更的类型(insert/update/delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

binlog实时采集

对binlog的实时采集包含两个主要模块:一是canalmanager,主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接;二是真正执行采集任务的canal和canalclient。

美团DB数据同步到数据仓库的架构与实践

当用户提交某个db的binlog采集请求时,canalmanager首先会调用dba平台的相关接口,获取这一db所在mysql实例的相关信息,目的是从中选出最适合binlog采集的机器。然后把采集实例(canal instance)分发到合适的canal服务器上,即canalserver上。在选择具体的canalserver时,canalmanager会考虑负载均衡、跨机房传输等因素,优先选择负载较低且同地域传输的机器。

canalserver收到采集请求后,会在zookeeper上对收集信息进行注册。注册的内容包括:

  • 以instance名称命名的永久节点。
  • 在该永久节点下注册以自身ip:port命名的临时节点。

这样做的目的有两个:

  • 高可用:canalmanager对instance进行分发时,会选择两台canalserver,一台是running节点,另一台作为standby节点。standby节点会对该instance进行监听,当running节点出现故障后,临时节点消失,然后standby节点进行抢占。这样就达到了容灾的目的。
  • 与canalclient交互:canalclient检测到自己负责的instance所在的running canalserver后,便会进行连接,从而接收到canalserver发来的binlog数据。

对binlog的订阅以mysql的db为粒度,一个db的binlog对应了一个kafka topic。底层实现时,一个mysql实例下所有订阅的db,都由同一个canal instance进行处理。这是因为binlog的产生是以mysql实例为粒度的。canalserver会抛弃掉未订阅的binlog数据,然后canalclient将接收到的binlog按db粒度分发到kafka上。

离线还原mysql数据

完成binlog采集后,下一步就是利用binlog来还原业务数据。首先要解决的第一个问题是把binlog从kafka同步到hive上。

美团DB数据同步到数据仓库的架构与实践

kafka2hive

整个kafka2hive任务的管理,在美团数据平台的etl框架下进行,包括任务原语的表达和调度机制等,都同其他etl类似。而底层采用linkedin的开源项目camus,并进行了有针对性的二次开发,来完成真正的kafka2hive数据传输工作。

对camus的二次开发

kafka上存储的binlog未带schema,而hive表必须有schema,并且其分区、字段等的设计,都要便于下游的高效消费。对camus做的第一个改造,便是将kafka上的binlog解析成符合目标schema的格式。

对camus做的第二个改造,由美团的etl框架所决定。在我们的任务调度系统中,目前只对同调度队列的任务做上下游依赖关系的解析,跨调度队列是不能建立依赖关系的。而在mysql2hive的整个流程中,kafka2hive的任务需要每小时执行一次(小时队列),merge任务每天执行一次(天队列)。而merge任务的启动必须要严格依赖小时kafka2hive任务的完成。

为了解决这一问题,我们引入了checkdone任务。checkdone任务是天任务,主要负责检测前一天的kafka2hive是否成功完成。如果成功完成了,则checkdone任务执行成功,这样下游的merge任务就可以正确启动了。

checkdone的检测逻辑

checkdone是怎样检测的呢?每个kafka2hive任务成功完成数据传输后,由camus负责在相应的hdfs目录下记录该任务的启动时间。checkdone会扫描前一天的所有时间戳,如果最大的时间戳已经超过了0点,就说明前一天的kafka2hive任务都成功完成了,这样checkdone就完成了检测。

此外,由于camus本身只是完成了读kafka然后写hdfs文件的过程,还必须完成对hive分区的加载才能使下游查询到。因此,整个kafka2hive任务的最后一步是加载hive分区。这样,整个任务才算成功执行。

每个kafka2hive任务负责读取一个特定的topic,把binlog数据写入original_binlog库下的一张表中,即前面图中的original_binlog.db,其中存储的是对应到一个mysql db的全部binlog。

美团DB数据同步到数据仓库的架构与实践

上图说明了一个kafka2hive完成后,文件在hdfs上的目录结构。假如一个mysql db叫做user,对应的binlog存储在original_binlog.user表中。ready目录中,按天存储了当天所有成功执行的kafka2hive任务的启动时间,供checkdone使用。每张表的binlog,被组织到一个分区中,例如userinfo表的binlog,存储在table_name=userinfo这一分区中。每个table_name一级分区下,按dt组织二级分区。图中的xxx.lzo和xxx.lzo.index文件,存储的是经过lzo压缩的binlog数据。

merge

binlog成功入仓后,下一步要做的就是基于binlog对mysql数据进行还原。merge流程做了两件事,首先把当天生成的binlog数据存放到delta表中,然后和已有的存量数据做一个基于主键的merge。delta表中的数据是当天的最新数据,当一条数据在一天内发生多次变更时,delta表中只存储最后一次变更后的数据。

把delta数据和存量数据进行merge的过程中,需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中,又出现在delta表中,说明这一条数据发生了更新,则选取delta表的数据作为最终结果;否则说明没有发生任何变动,保留原来存量表中的数据作为最终结果。merge的结果数据会insert overwrite到原表中,即图中的origindb.table。

merge流程举例

下面用一个例子来具体说明merge的流程。

美团DB数据同步到数据仓库的架构与实践

数据表共id、value两列,其中id是主键。在提取delta数据时,对同一条数据的多次更新,只选择最后更新的一条。所以对id=1的数据,delta表中记录最后一条更新后的值value=120。delta数据和存量数据做merge后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1和id=2),一条数据未变(id=3)。

默认情况下,我们采用mysql表的主键作为这一判重的唯一键,业务也可以根据实际情况配置不同于mysql的唯一键。

上面介绍了基于binlog的数据采集和ods数据还原的整体架构。下面主要从两个方面介绍我们解决的实际业务问题。

实践一:分库分表的支持

随着业务规模的扩大,mysql的分库分表情况越来越多,很多业务的分表数目都在几千个这样的量级。而一般数据开发同学需要把这些数据聚合到一起进行分析。如果对每个分表都进行手动同步,再在hive上进行聚合,这个成本很难被我们接受。因此,我们需要在ods层就完成分表的聚合。

美团DB数据同步到数据仓库的架构与实践

首先,在binlog实时采集时,我们支持把不同db的binlog写入到同一个kafka topic。用户可以在申请binlog采集时,同时勾选同一个业务逻辑下的多个物理db。通过在binlog采集层的汇集,所有分库的binlog会写入到同一张hive表中,这样下游在进行merge时,依然只需要读取一张hive表。

第二,merge任务的配置支持正则匹配。通过配置符合业务分表命名规则的正则表达式,merge任务就能了解自己需要聚合哪些mysql表的binlog,从而选取相应分区的数据来执行。

这样通过两个层面的工作,就完成了分库分表在ods层的合并。

这里面有一个技术上的优化,在进行kafka2hive时,我们按业务分表规则对表名进行了处理,把物理表名转换成了逻辑表名。例如userinfo123这张表名会被转换为userinfo,其binlog数据存储在original_binlog.user表的table_name=userinfo分区中。这样做的目的是防止过多的hdfs小文件和hive分区造成的底层压力。

实践二:删除事件的支持

delete操作在mysql中非常常见,由于hive不支持delete,如果想把mysql中删除的数据在hive中删掉,需要采用“迂回”的方式进行。

对需要处理delete事件的merge流程,采用如下两个步骤:

  • 首先,提取出发生了delete事件的数据,由于binlog本身记录了事件类型,这一步很容易做到。将存量数据(表a)与被删掉的数据(表b)在主键上做左外连接(left outer join),如果能够全部join到双方的数据,说明该条数据被删掉了。因此,选择结果中表b对应的记录为null的数据,即是应当被保留的数据。
  • 然后,对上面得到的被保留下来的数据,按照前面描述的流程做常规的merge。

美团DB数据同步到数据仓库的架构与实践

展望

作为数据仓库生产的基础,美团数据平台提供的基于binlog的mysql2hive服务,基本覆盖了美团内部的各个业务线,目前已经能够满足绝大部分业务的数据同步需求,实现db数据准确、高效地入仓。在后面的发展中,我们会集中解决canalmanager的单点问题,并构建跨机房容灾的架构,从而更加稳定地支撑业务的发展。

本文主要从binlog流式采集和基于binlog的ods数据还原两方面,介绍了这一服务的架构,并介绍了我们在实践中遇到的一些典型问题和解决方案。希望能够给其他开发者一些参考价值,同时也欢迎大家和我们一起交流。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接