Hudi:初识Hudi
是什么?
Hudi是什么?可以说Hudi是一个数据湖或是数据库,但它又不是数据湖或是数据库。笔者理解为Hudi是除开计算引擎的Hive。
众所周知,Hive是一个计算框架,但是现在我们更多的是使用Spark基于Hive对HDFS中文件提供的Schema信息和元数据进行计算,而Hive作为计算引擎的功能逐渐被忽略,更多的是将Hive视作一个“数据库”(尽管它并不是),而Hudi则是完善了Hive的这部分功能,甚至可以提供近实时的数据抽取与查询。
使用Hudi对HDFS或是其他存储系统中的文件进行管理,使用Hudi创建相应的表,一样可以使用Hive或是Spark对这些表进行计算。但是却解决了Hadoop一直臭名昭著的小文件问题,查询缓慢问题等。
Hudi具有以下特性:
-
快速upsert,可插入索引
-
以原子方式操作数据并具有回滚功能
-
写入器之间的快照隔离
-
savepoint用户数据恢复的保存点
-
管理文件大小,使用统计数据布局
-
数据行的异步压缩和柱状数据
-
时间轴数据跟踪血统
为什么?
为什么需要Hudi?
1.Hudi的高效率
Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。自2016年8月以来已投入生产,为庞大的100PB数据湖提供了支持,其中包括对业务至关重要的表,如核心旅行,搭便车,合作伙伴。它还为多个递增的Hive ETL管道提供支持,目前已集成到Uber的数据分散系统中。使用Hudi做实时数仓是一个很好的选择,实际上阿里和顺丰也这么做了。Hudi的高效体现在数据抽取与分析上。
近实时的数据抽取:
将数据从外部系统(如数据库、日志文件、消息队列等)导入到Hadoop中,是一个众所周知的问题。大多情况下,一个数仓中不得不使用多个工具(如sqoop、canal、flume等)对不同数据源进行数据抽取。
对于传统数据库,Hudi通过Upserts(增量更新)以提高加载速度。与Hudi相比,Canal读取Mysql Binlog日志以及Sqoop增量导入这些方法就显得消耗性能却效率低下。
对于Cassandra / Voldemort / HBase这类nosql性数据库,动辄数十亿行数据量,一批次全量导入是完全不可行的。Hudi对此也提高了更高效的方法。
即使对于Kafka这样不可变的数据源,Hudi仍可帮助HDFS强制使用最小文件大小,从而通过解决Hadoop生态圈中一个诟病已久的小文件问题,以改善NameNode的运行状况。
无论对接何种数据源,Hudi都提供了数据提交的原子性,保证消费者不会受到抽取数据失败导致的影响。
近实时的数据分析:
通常情况下,为了提高访问效率会使用Kylin、Impala、Presto等即席查询工具,将Hive上的数据在Hbase上存储一份,然后通过对Hbase查询以实现高效查询。但是如果使用Hudi,则直接就可以很快的进行查询,而不必多一块开销去运行与存储Hbase。
2.Hudi可以避免小文件问题
通常情况下,Hive或Spark计算时会生成大量小文件,然后再通过一些手段将它们合并在一起,这样只能解决由小文件引起的系统可伸缩性问题,但是无法解决未合并前,对小文件进行查询时效率低下的问题。而在Hudi中,一个关键的设计是避免创建小文件,并且总是生成大小合适的文件。Hudi在 ingest/writing 上花费更多的时间,以保持查询时始终高效。与常规解决方法不同,Hudi直接在生成端避免小文件问题,使小文件无法暴露给计算引擎,也就解决了小文件的低效查询问题。
怎么做?
Hudi安装
hudi安装需要通过Maven对从Git下载下来的源码进行编译。
安装maven:
1.把apache-maven-3.6.1-bin.tar.gz上传到linux的/opt/software目录下
2.解压apache-maven-3.6.1-bin.tar.gz到/opt/module/目录下面
[[email protected] software]$ tar -zxvf apache-maven-3.6.1-bin.tar.gz -C /opt/module/
3.修改apache-maven-3.6.1的名称为maven
mv apache-maven-3.6.1/ maven
4.添加环境变量到/etc/profile中
[[email protected] module]$ sudo vim /etc/profile
#MAVEN_HOME
export MAVEN_HOME=/opt/module/maven
export PATH=$PATH:$MAVEN_HOME/bin
5.测试安装结果
[[email protected] module]$ source /etc/profile
[[email protected] module]$ mvn -v
6.修改setting.xml,指定为阿里云
[[email protected] maven]$ cd conf
[[email protected] maven]$ vim settings.xml
<!-- 添加阿里云镜像-->
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
安装Git
sudo yum install git
构建Hudi
[[email protected] software]$ cd /opt/module/
[[email protected] module]$ git clone https://github.com/apache/hudi.git && cd hudi
[[email protected] hudi]$ vim pom.xml
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
[[email protected] hudi]$ mvn clean package -DskipTests -DskipITs
使用Spark-shell对接Hudi
启动spark-shell
spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本好需要和spark版本对应,这里都是2.4.5。
[[email protected] hudi]# spark-shell \
--packages org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--jars /opt/module/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.0-SNAPSHOT.jar
设置表名
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow
scala> val dataGen = new DataGenerator
dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = [email protected]
插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Overwrite).
| save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。
[[email protected] ~]# cd /tmp/hudi_trips_cow/
[[email protected] hudi_trips_cow]# ls
americas asia
查询数据
scala> val tripsSnapshotDF = spark.
| read.
| format("hudi").
| load(basePath + "/*/*/*/*")
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+---+
| fare| begin_lon| begin_lat| ts|
+------------------+-------------------+-------------------+---+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0|
+------------------+-------------------+-------------------+---+
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
| 20200701105144|6007a624-d942-4e0...| americas/united_s...|rider-213|driver-213| 64.27696295884016|
| 20200701105144|db7c6361-3f05-48d...| americas/united_s...|rider-213|driver-213| 33.92216483948643|
| 20200701105144|dfd0e7d9-f10c-468...| americas/united_s...|rider-213|driver-213|19.179139106643607|
| 20200701105144|e36365c8-5b3a-415...| americas/united_s...|rider-213|driver-213| 27.79478688582596|
| 20200701105144|fb92c00e-dea2-48e...| americas/united_s...|rider-213|driver-213| 93.56018115236618|
| 20200701105144|98be3080-a058-47d...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014|
| 20200701105144|3dd6ef72-4196-469...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
| 20200701105144|20f9463f-1c14-4e6...| americas/brazil/s...|rider-213|driver-213|34.158284716382845|
| 20200701105144|1585ad3a-11c9-43c...| asia/india/chennai|rider-213|driver-213|17.851135255091155|
| 20200701105144|d40daa90-cf1a-4d1...| asia/india/chennai|rider-213|driver-213| 41.06290929046368|
+-------------------+--------------------+----------------------+---------+----------+------------------+
由于测试数据分区是 区域/国家/城市,所以load(basePath “/*/*/*/*”)
增量查询
Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。
scala> spark.
| read.
| format("hudi").
| load(basePath + "/*/*/*/*").
| createOrReplaceTempView("hudi_trips_snapshot")
scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
scala> val beginTime = commits(commits.length - 2)
beginTime: String = 20200701105144
scala> val tripsIncrementalDF = spark.read.format("hudi").
| option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
| option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
| load(basePath)
scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---+
|_hoodie_commit_time| fare| begin_lon| begin_lat| ts|
+-------------------+------------------+--------------------+-------------------+---+
| 20200701110546|49.527694252432056| 0.5142184937933181| 0.7340133901254792|0.0|
| 20200701110546| 90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0|
| 20200701110546| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|0.0|
| 20200701110546| 90.25710109008239| 0.4006983139989222|0.08528650347654165|0.0|
| 20200701110546| 63.72504913279929| 0.888493603696927| 0.6570857443423376|0.0|
| 20200701110546| 29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0|
+-------------------+------------------+--------------------+-------------------+---+
这将提供在beginTime提交后的数据,并且fare>20的数据
时间点查询
根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)
scala> val beginTime = "000"
beginTime: String = 000
scala> val endTime = commits(commits.length - 2)
endTime: String = 20200701105144
scala> val tripsPointInTimeDF = spark.read.format("hudi").
| option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
| option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
| option(END_INSTANTTIME_OPT_KEY, endTime).
| load(basePath)
scala> tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
+-------------------+------------------+-------------------+-------------------+---+
|_hoodie_commit_time| fare| begin_lon| begin_lat| ts|
+-------------------+------------------+-------------------+-------------------+---+
| 20200701105144| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
| 20200701105144| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
| 20200701105144| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
| 20200701105144| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
| 20200701105144| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
| 20200701105144| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
| 20200701105144|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
| 20200701105144| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0|
+-------------------+------------------+-------------------+-------------------+---+
删除数据
只有append模式,才支持删除功能
scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res12: Long = 10
scala> val ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
scala> val deletes = dataGen.generateDeletes(ds.collectAsList())
scala> val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
scala> df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(OPERATION_OPT_KEY,"delete").
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
scala> val roAfterDeleteViewDF = spark.
| read.
| format("hudi").
| load(basePath + "/*/*/*/*")
scala> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
scala> spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
res15: Long = 8