欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hudi-hive-sync

程序员文章站 2022-07-14 21:48:10
...

hudi-hive-sync

Syncing to Hive 有两种方式:

  1. 在hudi 写时同步
  2. 使用run_sync_tool.sh 脚本进行同步

1. 代码同步

val spark = SparkSession
      .builder()
      .config(sparkConf)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

    val insertData = spark.read.parquet(inputDir)

    //非分区表
    // 设置主键列名
    var data = insertData.write.format("org.apache.hudi")
      //copy_on_writer or merge_on_read
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, allHudiConfig("hoodie.datasource.write.storage.type"))
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, allHudiConfig("hoodie.datasource.write.recordkey.field"))
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, allHudiConfig("hoodie.datasource.write.precombine.field"))
      //merge逻辑class
      .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, allHudiConfig("hoodie.datasource.write.payload.class"))
      //是否开启hive同步
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      //hive库
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, databaseName)
      //hive表
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
      //hive连接jdbcurl,我这边的环境存在kerberos认证,所以url要加一些配置
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://master:10000/xh;principal=hive/[email protected]")


    //1.判断当前表是否为分区表
    if ("true".equals(flag)) {
      //该表为分区表
      data
        //hudi表分区字段
        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "year")
        //表数据发生变更时,分区是否发生变更
        .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
        //设置全局索引
        .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
        //hudi表主键生成
        .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
        //hive表分区列名
        .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
        //hive_sync.partition_extractor_class
        .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
    } else {
      //分区表与非分区表的主键生成策略不同,需要注意
      data.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[NonpartitionedKeyGenerator].getName)
      //hive_sync.partition_extractor_class
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[NonPartitionedExtractor].getName)
    }

    // 表名称设置
    data
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", allHudiConfig("hoodie.insert.shuffle.parallelism"))
      .option("hoodie.upsert.shuffle.parallelism", allHudiConfig("hoodie.insert.shuffle.parallelism"))
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .mode(if (exists) SaveMode.Append else SaveMode.Overwrite)
      // 写入路径设置
      .save(writePath)

上面代码可以同步 hive表

2. 脚本同步

下载hudi源码,编译打包

git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests -DskipITs

打包结束后查看脚本位置

cd hudi/hudi-hive-sync/
# 可以看见存在:run_sync_tool.sh脚本

因为我本地的hadoop环境是CDH-6.2.0,run_sync_tool.sh 不适合我使用,我修改了某些内容

将下列内容添加或修改部分配置

  1. 添加环境变量:HADOOP_HOME,HIVE_HOME
export HADOOP_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive
  1. 修改 :HADOOP_HIVE_JARS
# 大概在脚本的55行
#HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${H    ADOOP_HOME}/share/hadoop/hdfs/lib/*
HADOOP_HIVE_JARS=${HIVE_JARS}:/opt/cloudera/parcels/CDH-6.2.01.cdh6.2.0.p0.967373/lib/hadoop/client/*

​ 这样配置后,启动shell脚本,会报错:

Exception in thread "main" java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.getClass(MetaStoreUtils.java:1739)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:128)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:101)
	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3815)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3867)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3847)
	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:4101)
	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:254)
	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:237)
	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:394)
	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:338)
	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:318)
	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:294)
	at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:105)
	at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:65)
	at org.apache.hudi.hive.HiveSyncTool.main(HiveSyncTool.java:207)
Caused by: java.lang.ClassNotFoundException: com.facebook.fb303.FacebookService$Iface
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 30 more

​ 原因是没有:libfb303-0.9.3.jar

​ 本地全局搜索:find / -name libfb303-0.9.3.jar

​ 如果存在,将其复制到一个文件夹里面:

cp xxx/xxx/xx/libfb303-0.9.3.jar /opt/lib

# 重新修改run_sync_tool.sh脚本
HADOOP_HIVE_JARS=${HIVE_JARS}:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop/client/*:/opt/lib/*
  1. 启动脚本

    1. 分区表同步

      sh /opt/hudi/hudi-hive-sync/run_sync_tool.sh 
      --base-path /test/sys001/xh/partition  #hudi表路径
      --database xh 	#hive库名
      --table partition # hive表名
      --jdbc-url 'jdbc:hive2://master:10000/xh;principal=hive/[email protected]' #jdbcurl 
      --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor # hive_sync.partition_extractor_class
      --user hive #hive user
      --pass hive #hive pass
      --partitioned-by dt # hive 表分区
      
    2. 非分区表同步

      sh /opt/hudi/hudi-hive-sync/run_sync_tool.sh 
      --base-path /test/sys001/xh/partition  #hudi表路径
      --database xh 	#hive库名
      --table partition # hive表名
      --jdbc-url 'jdbc:hive2://master:10000/xh;principal=hive/[email protected]' #jdbcurl 
      --partition-value-extractor org.apache.hudi.hive.NonPartitionedExtractor # hive_sync.partition_extractor_class
      --user hive #hive user
      --pass hive #hive pass
      
相关标签: Spark hadoop

推荐阅读