hudi-hive-sync
程序员文章站
2022-07-14 21:48:10
...
hudi-hive-sync
Syncing to Hive 有两种方式:
- 在hudi 写时同步
- 使用
run_sync_tool.sh
脚本进行同步
1. 代码同步
val spark = SparkSession
.builder()
.config(sparkConf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val insertData = spark.read.parquet(inputDir)
//非分区表
// 设置主键列名
var data = insertData.write.format("org.apache.hudi")
//copy_on_writer or merge_on_read
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, allHudiConfig("hoodie.datasource.write.storage.type"))
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, allHudiConfig("hoodie.datasource.write.recordkey.field"))
// 设置数据更新时间的列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, allHudiConfig("hoodie.datasource.write.precombine.field"))
//merge逻辑class
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, allHudiConfig("hoodie.datasource.write.payload.class"))
//是否开启hive同步
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
//hive库
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, databaseName)
//hive表
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
//hive连接jdbcurl,我这边的环境存在kerberos认证,所以url要加一些配置
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://master:10000/xh;principal=hive/[email protected]")
//1.判断当前表是否为分区表
if ("true".equals(flag)) {
//该表为分区表
data
//hudi表分区字段
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "year")
//表数据发生变更时,分区是否发生变更
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
//设置全局索引
.option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
//hudi表主键生成
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
//hive表分区列名
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
//hive_sync.partition_extractor_class
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
} else {
//分区表与非分区表的主键生成策略不同,需要注意
data.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[NonpartitionedKeyGenerator].getName)
//hive_sync.partition_extractor_class
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName)
}
// 表名称设置
data
// 并行度参数设置
.option("hoodie.insert.shuffle.parallelism", allHudiConfig("hoodie.insert.shuffle.parallelism"))
.option("hoodie.upsert.shuffle.parallelism", allHudiConfig("hoodie.insert.shuffle.parallelism"))
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(if (exists) SaveMode.Append else SaveMode.Overwrite)
// 写入路径设置
.save(writePath)
上面代码可以同步 hive表
2. 脚本同步
下载hudi源码,编译打包
git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests -DskipITs
打包结束后查看脚本位置
cd hudi/hudi-hive-sync/
# 可以看见存在:run_sync_tool.sh脚本
因为我本地的hadoop环境是CDH-6.2.0,run_sync_tool.sh 不适合我使用,我修改了某些内容
将下列内容添加或修改部分配置
- 添加环境变量:HADOOP_HOME,HIVE_HOME
export HADOOP_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive
- 修改 :HADOOP_HIVE_JARS
# 大概在脚本的55行
#HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${H ADOOP_HOME}/share/hadoop/hdfs/lib/*
HADOOP_HIVE_JARS=${HIVE_JARS}:/opt/cloudera/parcels/CDH-6.2.01.cdh6.2.0.p0.967373/lib/hadoop/client/*
这样配置后,启动shell脚本,会报错:
Exception in thread "main" java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getClass(MetaStoreUtils.java:1739)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:128)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:101)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3815)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3867)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3847)
at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:4101)
at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:254)
at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:237)
at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:394)
at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:338)
at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:318)
at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:294)
at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:105)
at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:65)
at org.apache.hudi.hive.HiveSyncTool.main(HiveSyncTool.java:207)
Caused by: java.lang.ClassNotFoundException: com.facebook.fb303.FacebookService$Iface
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 30 more
原因是没有:libfb303-0.9.3.jar
本地全局搜索:find / -name libfb303-0.9.3.jar
如果存在,将其复制到一个文件夹里面:
cp xxx/xxx/xx/libfb303-0.9.3.jar /opt/lib
# 重新修改run_sync_tool.sh脚本
HADOOP_HIVE_JARS=${HIVE_JARS}:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop/client/*:/opt/lib/*
-
启动脚本
-
分区表同步
sh /opt/hudi/hudi-hive-sync/run_sync_tool.sh --base-path /test/sys001/xh/partition #hudi表路径 --database xh #hive库名 --table partition # hive表名 --jdbc-url 'jdbc:hive2://master:10000/xh;principal=hive/[email protected]' #jdbcurl --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor # hive_sync.partition_extractor_class --user hive #hive user --pass hive #hive pass --partitioned-by dt # hive 表分区
-
非分区表同步
sh /opt/hudi/hudi-hive-sync/run_sync_tool.sh --base-path /test/sys001/xh/partition #hudi表路径 --database xh #hive库名 --table partition # hive表名 --jdbc-url 'jdbc:hive2://master:10000/xh;principal=hive/[email protected]' #jdbcurl --partition-value-extractor org.apache.hudi.hive.NonPartitionedExtractor # hive_sync.partition_extractor_class --user hive #hive user --pass hive #hive pass
-
推荐阅读