欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hudi 简介

程序员文章站 2022-07-14 22:02:23
...

数据实时处理和实时的数据

实时分为处理的实时和数据的实时
即席分析是要求对数据实时的处理,马上要得到对应的结果
Flink、Spark Streaming是用来对实时数据的实时处理,数据要求实时,处理也要迅速
数据不实时,处理也不及时的场景则是我们的数仓T+1数据

而本文探讨的Apache Hudi,对应的场景是数据的实时,而非处理的实时。它旨在将Mysql中的时候以近实时的方式映射到大数据平台,比如Hive中。

业务场景和技术选型

传统的离线数仓,通常数据是T+1的,不能满足对当日数据分析的需求
而流式计算一般是基于窗口,并且窗口逻辑相对比较固定。
而笔者所在的公司有一类特殊的需求,业务分析比较熟悉现有事务数据库的数据结构,并且希望有很多即席分析,这些分析包含当日比较实时的数据。惯常他们是基于Mysql从库,直接通过Sql做相应的分析计算。但很多时候会遇到如下障碍

  • 数据量较大、分析逻辑较为复杂时,Mysql从库耗时较长
  • 一些跨库的分析无法实现

        因此,一些弥合在OLTP和OLAP之间的技术框架出现,典型有TiDB。它能同时支持OLTP和OLAP。而诸如Apache Hudi和Apache Kudu则相当于现有OLTP和OLAP技术的桥梁。他们能够以现有OLTP中的数据结构存储数据,支持CRUD,同时提供跟现有OLAP框架的整合(如Hive,Impala),以实现OLAP分析

Apache Hudi

Apache Hudi 在基于 HDFS 数据存储之上,提供了两种流原语:

  1. 插入更新
  2. 增量拉取

        一般来说,我们会将大量数据存储到HDFS,新数据增量写入,而旧数据鲜有改动,特别是在经过数据清洗,放入数据仓库的场景。而且在数据仓库如 hive中,对于update的支持非常有限,计算昂贵。另一方面,若是有仅对某段时间内新增数据进行分析的场景,则hive、presto、hbase等也未提供原生方式,而是需要根据时间戳进行过滤分析。

        在此需求下,Hudi可以提供这两种需求的实现。第一个是对record级别的更新,另一个是仅对增量数据的查询。且Hudi提供了对Hive、presto、Spark的支持,可以直接使用这些组件对Hudi管理的数据进行查询。

存储类型:cow、mor

我们看一下 Hudi 的两种存储类型:

COW

写时复制(copy on write):仅使用列式文件(parquet)存储数据。在写入/更新数据时,直接同步合并原文件,生成新版本的基文件(需要重写整个列数据文件,即使只有一个字节的新数据被提交)。此存储类型下,写入数据非常昂贵,而读取的成本没有增加,所以适合频繁读的工作负载,因为数据集的最新版本在列式文件中始终可用,以进行高效的查询。

简称COW。顾名思义,他是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据。正在读数据的请求,读取的是是近的完整副本,这类似Mysql 的MVCC的思想。

  • 优点 读取时,只读取对应分区的一个数据文件即可,较为高效
  • 缺点 数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后

MOR

        读时合并(merge on read):使用列式(parquet)与行式(avro)文件组合,进行数据存储。在更新记录时,更新到增量文件中(avro),然后进行异步(或同步)的compaction,创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以appending 的模式写入增量文件中。但是在读取数据集时,需要将增量文件与旧文件进行合并,生成列式文件。

简称MOR。新插入的数据存储在delta log 中。定期再将delta log合并进行parquet数据文件。读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。当然,MOR表也可以像COW表一样,忽略delta log,只读取最近的完整数据文件。下图演示了MOR的两种数据读写方式

  • 优点 由于写入数据先写delta log,且delta log较小,所以写入成本较低
  • 缺点 需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log 和 老数据文件合并

Hudi表数据结构

Hudi表的数据文件,可以使用操作系统的文件系统存储,也可以使用HDFS这种分布式的文件系统存储。为了后续分析性能和数据的可靠性,一般使用HDFS进行存储。以HDFS存储来看,一个Hudi表的存储文件分为两类。

Hudi 简介

  • 包含_partition_key相关的路径是实际的数据文件,按分区存储,当然分区的路径key是可以指定的,我这里使用的是_partition_key
  • .hoodie 由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的性能,Hudi设计了一套文件合并机制。 .hoodie文件夹中存放了对应的文件合并操作相关的日志文件。

数据文件

Hudi真实的数据文件使用Parquet文件格式存储

Hudi 简介

.hoodie文件

Hudi把随着时间流逝,对表的一系列CRUD操作叫做Timeline。Timeline中某一次的操作,叫做Instant。Instant包含以下信息

  • Instant Action 记录本次操作是一次数据提交(COMMITS),还是文件合并(COMPACTION),或者是文件清理(CLEANS)
  • Instant Time 本次操作发生的时间
  • state 操作的状态,发起(REQUESTED),进行中(INFLIGHT),还是已完成(COMPLETED)

.hoodie文件夹中存放对应操作的状态记录

Hudi 简介

使用Aapche Hudi整体思路 

Hudi 提供了Hudi 表的概念,这些表支持CRUD操作。我们可以基于这个特点,将Mysql Binlog的数据重放至Hudi表,然后基于Hive对Hudi表进行查询分析。数据流向架构如下

Hudi 简介

binlog数据写入Hudi表

  • binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
    --name hudi__goods \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    --num-executors 1 \
    --queue hudi \
    --conf spark.executor.memoryOverhead=2048 \
    --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
    --conf spark.core.connection.ack.wait.timeout=300 \
    --conf spark.locality.wait=100 \
    --conf spark.streaming.backpressure.enabled=true \
    --conf spark.streaming.receiver.maxRate=500 \
    --conf spark.streaming.kafka.maxRatePerPartition=200 \
    --conf spark.ui.retainedJobs=10 \
    --conf spark.ui.retainedStages=10 \
    --conf spark.ui.retainedTasks=10 \
    --conf spark.worker.ui.retainedExecutors=10 \
    --conf spark.worker.ui.retainedDrivers=10 \
    --conf spark.sql.ui.retainedExecutions=10 \
    --conf spark.yarn.submit.waitAppCompletion=false \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures=20 \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200

历史数据同步以及表元数据同步至hive

history_import_and_meta_sync 分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作

同步历史数据至hudi表

这里采用的思路是

  • 将mysql全量数据通过注入sqoop等工具,导入到hive表。
  • 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表

HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为

一个程序执行demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar 
--sync-hive-db-name hudi_temp 
--sync-hive-table-name crm__wx_user_info 
--base-save-path hdfs://192.168.2.2:8020/hudi_table/ 
--mapping-mysql-db-name crm 
--mapping-mysql-table-name "order" 
--primary-key "id" 
--partition-key created_date 
--hive-site-path /etc/lib/hive/conf/hive-site.xml 
--tmp-data-path /data/tmp > order.log &

同步hudi表结构至hive meta

需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下

参数名 含义 是否必填 默认值
 -hive-db-name 指定hudi表同步至哪个hive数据库
 -hive-table-name 指定hudi表同步至哪个hive表
 -hive-jdbc-url 指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000
 -hive-user-name 指定hive meta的链接用户名 默认hive
 -hive-pwd 指定hive meta的链接密码 默认hive
 -hudi-table-path 指定hudi表所在hdfs的文件路径
 -hive-site-path 指定hive的hive-site.xml路径

一个程序执行demo

java -jar hudi-learn-1.0-SNAPSHOT.jar 
--hive-db-name streaming 
--hive-table-name crm__order 
--hive-user-name hive 
--hive-pwd hive 
--hive-jdbc-url jdbc:hive2://192.168.16.181:10000 
--hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order 
--hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑、hive相关配置

有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

 

二、Hudi与Kudu、Hive、 HBase对比

Apache Kudu,需要单独部署集群。而Apache Hudi则不需要,它可以利用现有的大数据集群比如HDFS做数据文件存储,然后通过Hive做数据分析,相对来说更适合资源受限的环境

Kudu

Apache Kudu是一个与Hudi具有相似目标的存储系统,该系统通过对upserts支持来对PB级数据进行实时分析。 一个关键的区别是Kudu还试图充当OLTP工作负载的数据存储,而Hudi并不希望这样做。 因此,Kudu不支持增量拉取(截至2017年初),而Hudi支持以便进行增量处理。

Kudu与分布式文件系统抽象和HDFS完全不同,它自己的一组存储服务器通过RAFT相互通信。 与之不同的是,Hudi旨在与底层Hadoop兼容的文件系统(HDFS,S3或Ceph)一起使用,并且没有自己的存储服务器群,而是依靠Apache Spark来完成繁重的工作。 因此,Hudi可以像其他Spark作业一样轻松扩展,而Kudu则需要硬件和运营支持,特别是HBase或Vertica等数据存储系统。 到目前为止,我们还没有做任何直接的基准测试来比较Kudu和Hudi(鉴于RTTable正在进行中)。 但是,如果我们要使用CERN, 我们预期Hudi在摄取parquet上有更卓越的性能。

Hudi v.s. Kudu

两者是目的极为相似的存储系统,即通过对upserts的一流支持来提供对PB级数据的实时分析。其中一个主要的区别是,Kudu还尝试充当OLTP场景下的数据存储区,但这是Hudi所不希望的。所以Kudu不支持增量拉取(Incremental Pulling)(截至2017年初),Hoodie这样做的目的是赋能数据增量处理的场景用例。

Kudu剥离了HDFS(Hadoop Distribute File System)及其分布式文件系统抽象接口,通过RAFT一致性算法管理自己的一组存储服务器。然而Hudi并没有使用这么”不友好“的设计,它自身不带底层存储集群,而是依赖Apache Spark做到与HDFS及一众Hadoop兼容的文件系统,如S3、Ceph等等。得益于此,Hudi可以想其他通用的Spark作业一样易扩展。相对而言,Kudu则需要对应的底层硬件和运维支持,这对于HBase或者Vertica此类的数据存储来说是很典型的。

截止目前,尚没有一份官方的基准测试可以全面地评估两者的性能。

Hive事务

Hive事务/ACID是另一项类似的工作,它试图实现在ORC文件格式之上的存储读取时合并。 可以理解,此功能与Hive以及LLAP之类的其他工作紧密相关。 Hive事务不提供Hudi提供的读取优化存储选项或增量拉取。 在实现选择方面,Hudi充分利用了类似Spark的处理框架的功能,而Hive事务特性则在用户或Hive Metastore启动的Hive任务/查询的下实现。 根据我们的生产经验,与其他方法相比,将Hudi作为库嵌入到现有的Spark管道中要容易得多,并且操作不会太繁琐。 Hudi还设计用于与Presto/Spark等非Hive引擎合作,并计划引入除parquet以外的文件格式。

Hudi v.s. Hive Transactions / ACID

Hive Transactions / ACID是另一种类似的尝试,它试图基于ORC文件格式实现读取时合并(merge-on-read)的存储功能。 可想而知,这个功能一定与Hive重度的集成,并且要依赖LLAP等其他必要的特性。 与Hudi相比,Hive Transactions不不支持读时优化(Read-Optimized)存储和增量拉取(Incremental Pulling)。

在实现上,Hudi可获得Spark等处理框架的全部功能加持,而Hive Transactions却只能受限于Hive任务/查询来实现。根据Uber工程师的实际生产经验,与其他方法相比,将Hudi作为一个三方依赖库嵌入现有Spark管道要更加简单有效。除了Hive之外,Hudi也被设计用于像Presto / Spark这样的计算引擎。将来Hudi也计划支持出Parquet以外的其他文件格式。

 HBase

尽管HBase最终是OLTP工作负载的键值存储层,但由于与Hadoop的相似性,用户通常倾向于将HBase与分析相关联。 鉴于HBase经过严格的写优化,它支持开箱即用的亚秒级更新,Hive-on-HBase允许用户查询该数据。 但是,就分析工作负载的实际性能而言,Parquet/ORC之类的混合列式存储格式可以轻松击败HBase,因为这些工作负载主要是读取繁重的工作。 Hudi弥补了更快的数据与分析存储格式之间的差距。从运营的角度来看,与管理分析使用的HBase region服务器集群相比,为用户提供可更快给出数据的库更具可扩展性。 最终,HBase不像Hudi这样重点支持提交时间增量拉取之类的增量处理原语。

Hudi v.s Hbase

虽然HBase是面向OLTP场景的键值存储(key-value store),典型的应用场景就是不断插入新的记录且不怎么修改。但由于本身运行于HDFS之上,用户往往倾向于在HBase做一些分析相关的业务。鉴于HBase经过大量写入优化,它支持开箱即用的亚秒级upsert,而Hive-on-HBase则允许用户查询该数据。但就分析类业务场景的实际性能而言,由于这类场景负载主要在读取上,像Parquet/ORC这样的混合列式存储格式轻松击败HBase。 Hudi打破了数据快速入库和基于该数据进行分析业务之间的壁障。从可操作性上来说,相比于Hbase需要管理一个含有大量Region Server的集群来满足分析性业务场景,而Hudi主需要一个三方依赖库就可以实现,可维护性和可扩展性更强。最后,和Hudi相比,HBase不支持增量处理原语,如commit timesincremental pull

流式处理

一个普遍的问题:”Hudi与流处理系统有何关系?”,我们将在这里尝试回答。简而言之,Hudi可以与当今的批处理(写时复制存储)和流处理(读时合并存储)作业集成,以将计算结果存储在Hadoop中。 对于Spark应用程序,这可以通过将Hudi库与Spark/Spark流式DAG直接集成来实现。在非Spark处理系统(例如Flink、Hive)情况下,可以在相应的系统中进行处理,然后通过Kafka主题/DFS中间文件将其发送到Hudi表中。从概念上讲,数据处理 管道仅由三个部分组成:输入处理输出,用户最终针对输出运行查询以便使用管道的结果。Hudi可以充当将数据存储在DFS上的输入或输出。Hudi在给定流处理管道上的适用性最终归结为你的查询在Presto/SparkSQL/Hive的适用性。

Hudi v.s. Stream Processing

Hudi的开发者常常面对的一个问题就是,Hudi能和流式处理系统扯上什么关系?一言以蔽之的话,Hudi做的事情就是将批处理(copy-on-write storage)和流计算(merge-on-read storage)作业整合,并将计算结果存储在Hadoop中。对于Spark应用程序,依靠其同意的DAG模型可以将融入了Hudi库与Spark/Spark Steaming作业天然整合。对于非Spark处理系统(例如:Flink,Hive),处理过程可以在各自的系统中完成,然后以Kafka Topics 或者HDFS中间文件的形式发送到Hudi表中。从相对抽象的维度上来说,数据处理管道只包含三个组件:source, processingsink,用户最终面向sink运行查询以使用管道的结果。Hudi可以作为sourcesink,前者读取存储在HDFS上的Hudi表,后者将数据写人存储于HDFS的Hudi表。流式处理保存的Hudi表,最终交给Presto/Spark SQL/Hive做查询。