欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hudi:初识Hudi

程序员文章站 2022-07-14 20:25:49
...

是什么?

Hudi是什么?可以说Hudi是一个数据湖或是数据库,但它又不是数据湖或是数据库。笔者理解为Hudi是除开计算引擎的Hive。

众所周知,Hive是一个计算框架,但是现在我们更多的是使用Spark基于Hive对HDFS中文件提供的Schema信息和元数据进行计算,而Hive作为计算引擎的功能逐渐被忽略,更多的是将Hive视作一个“数据库”(尽管它并不是),而Hudi则是完善了Hive的这部分功能,甚至可以提供近实时的数据抽取与查询。

使用Hudi对HDFS或是其他存储系统中的文件进行管理,使用Hudi创建相应的表,一样可以使用Hive或是Spark对这些表进行计算。但是却解决了Hadoop一直臭名昭著的小文件问题,查询缓慢问题等。

Hudi具有以下特性:

  1. 快速upsert,可插入索引

  2. 以原子方式操作数据并具有回滚功能

  3. 写入器之间的快照隔离

  4. savepoint用户数据恢复的保存点

  5. 管理文件大小,使用统计数据布局

  6. 数据行的异步压缩和柱状数据

  7. 时间轴数据跟踪血统

为什么?

为什么需要Hudi?

1.Hudi的高效率

Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。自2016年8月以来已投入生产,为庞大的100PB数据湖提供了支持,其中包括对业务至关重要的表,如核心旅行,搭便车,合作伙伴。它还为多个递增的Hive ETL管道提供支持,目前已集成到Uber的数据分散系统中。使用Hudi做实时数仓是一个很好的选择,实际上阿里和顺丰也这么做了。Hudi的高效体现在数据抽取与分析上。

近实时的数据抽取

将数据从外部系统(如数据库、日志文件、消息队列等)导入到Hadoop中,是一个众所周知的问题。大多情况下,一个数仓中不得不使用多个工具(如sqoop、canal、flume等)对不同数据源进行数据抽取。

对于传统数据库,Hudi通过Upserts(增量更新)以提高加载速度。与Hudi相比,Canal读取Mysql Binlog日志以及Sqoop增量导入这些方法就显得消耗性能却效率低下。

对于Cassandra / Voldemort / HBase这类nosql性数据库,动辄数十亿行数据量,一批次全量导入是完全不可行的。Hudi对此也提高了更高效的方法。

即使对于Kafka这样不可变的数据源,Hudi仍可帮助HDFS强制使用最小文件大小,从而通过解决Hadoop生态圈中一个诟病已久的小文件问题,以改善NameNode的运行状况。

无论对接何种数据源,Hudi都提供了数据提交的原子性,保证消费者不会受到抽取数据失败导致的影响。

近实时的数据分析

通常情况下,为了提高访问效率会使用Kylin、Impala、Presto等即席查询工具,将Hive上的数据在Hbase上存储一份,然后通过对Hbase查询以实现高效查询。但是如果使用Hudi,则直接就可以很快的进行查询,而不必多一块开销去运行与存储Hbase。

2.Hudi可以避免小文件问题

通常情况下,Hive或Spark计算时会生成大量小文件,然后再通过一些手段将它们合并在一起,这样只能解决由小文件引起的系统可伸缩性问题,但是无法解决未合并前,对小文件进行查询时效率低下的问题。而在Hudi中,一个关键的设计是避免创建小文件,并且总是生成大小合适的文件。Hudi在 ingest/writing 上花费更多的时间,以保持查询时始终高效。与常规解决方法不同,Hudi直接在生成端避免小文件问题,使小文件无法暴露给计算引擎,也就解决了小文件的低效查询问题。

怎么做?

Hudi安装

hudi安装需要通过Maven对从Git下载下来的源码进行编译。

安装maven:

1.把apache-maven-3.6.1-bin.tar.gz上传到linux的/opt/software目录下

2.解压apache-maven-3.6.1-bin.tar.gz到/opt/module/目录下面

[[email protected] software]$ tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/

3.修改apache-maven-3.6.1的名称为maven

mv apache-maven-3.6.1/ maven

4.添加环境变量到/etc/profile中

[[email protected] module]$ sudo vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/maven
export PATH=$PATH:$MAVEN_HOME/bin

5.测试安装结果

[[email protected] module]$ source /etc/profile
[[email protected] module]$ mvn -v

6.修改setting.xml,指定为阿里云

[[email protected] maven]$ cd conf
[[email protected] maven]$ vim settings.xml
<!-- 添加阿里云镜像-->
<mirror>
        <id>nexus-aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>

安装Git

sudo yum install git

构建Hudi

[[email protected] software]$ cd /opt/module/
[[email protected] module]$ git clone https://github.com/apache/hudi.git && cd hudi
[[email protected] hudi]$ vim pom.xml 
       <repository>
        <id>nexus-aliyun</id>
        <name>nexus-aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
[[email protected] hudi]$ mvn clean package -DskipTests -DskipITs

使用Spark-shell对接Hudi

启动spark-shell

spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。

[[email protected] hudi]# spark-shell \
--packages org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar 

设置表名

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow

scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow

scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = [email protected]

插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表

scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   mode(Overwrite).
     |   save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。

[[email protected] ~]# cd /tmp/hudi_trips_cow/
[[email protected] hudi_trips_cow]# ls
americas  asia

查询数据

scala> val tripsSnapshotDF = spark.
     |   read.
     |   format("hudi").
     |   load(basePath + "/*/*/*/*")
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+---+
|              fare|          begin_lon|          begin_lat| ts|
+------------------+-------------------+-------------------+---+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
+------------------+-------------------+-------------------+---+
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|     20200701105144|6007a624-d942-4e0...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
|     20200701105144|db7c6361-3f05-48d...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
|     20200701105144|dfd0e7d9-f10c-468...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
|     20200701105144|e36365c8-5b3a-415...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
|     20200701105144|fb92c00e-dea2-48e...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
|     20200701105144|98be3080-a058-47d...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
|     20200701105144|3dd6ef72-4196-469...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
|     20200701105144|20f9463f-1c14-4e6...|  americas/brazil/s...|rider-213|driver-213|34.158284716382845|
|     20200701105144|1585ad3a-11c9-43c...|    asia/india/chennai|rider-213|driver-213|17.851135255091155|
|     20200701105144|d40daa90-cf1a-4d1...|    asia/india/chennai|rider-213|driver-213| 41.06290929046368|
+-------------------+--------------------+----------------------+---------+----------+------------------+

由于测试数据分区是 区域/国家/城市,所以load(basePath  “/*/*/*/*”)

增量查询

Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。

scala> spark.
     |   read.
     |   format("hudi").
     |   load(basePath + "/*/*/*/*").
     |   createOrReplaceTempView("hudi_trips_snapshot")
scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
scala> val beginTime = commits(commits.length - 2)
beginTime: String = 20200701105144
scala> val tripsIncrementalDF = spark.read.format("hudi").
     |   option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
     |   option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
     |   load(basePath)
scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---+
|_hoodie_commit_time|              fare|           begin_lon|          begin_lat| ts|
+-------------------+------------------+--------------------+-------------------+---+
|     20200701110546|49.527694252432056|  0.5142184937933181| 0.7340133901254792|0.0|
|     20200701110546|  90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0|
|     20200701110546|  98.3428192817987|  0.3349917833248327| 0.4777395067707303|0.0|
|     20200701110546| 90.25710109008239|  0.4006983139989222|0.08528650347654165|0.0|
|     20200701110546| 63.72504913279929|   0.888493603696927| 0.6570857443423376|0.0|
|     20200701110546| 29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0|
+-------------------+------------------+--------------------+-------------------+---+

这将提供在beginTime提交后的数据,并且fare>20的数据

时间点查询

根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)

scala> val beginTime = "000"
beginTime: String = 000

scala> val endTime = commits(commits.length - 2)
endTime: String = 20200701105144
scala> val tripsPointInTimeDF = spark.read.format("hudi").
     |   option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
     |   option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
     |   option(END_INSTANTTIME_OPT_KEY, endTime).
     |   load(basePath)
scala> tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
+-------------------+------------------+-------------------+-------------------+---+
|_hoodie_commit_time|              fare|          begin_lon|          begin_lat| ts|
+-------------------+------------------+-------------------+-------------------+---+
|     20200701105144| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
|     20200701105144| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
|     20200701105144| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
|     20200701105144| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
|     20200701105144|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
|     20200701105144| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
|     20200701105144|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
|     20200701105144| 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
+-------------------+------------------+-------------------+-------------------+---+

删除数据

只有append模式,才支持删除功能

scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res12: Long = 10
scala> val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
scala> val deletes = dataGen.generateDeletes(ds.collectAsList())
scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
scala> df.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(OPERATION_OPT_KEY,"delete").
     |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     |   option(TABLE_NAME, tableName).
     |   mode(Append).
     |   save(basePath)
scala> val roAfterDeleteViewDF = spark.
     |   read.
     |   format("hudi").
     |   load(basePath + "/*/*/*/*")
scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res15: Long = 8

 

相关标签: hudi