欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark + iceberg 注意事项

程序员文章站 2022-03-08 08:09:49
...
//切换版本
source change_spark_version spark-2.4.5.0

spark-shell

import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.SparkConf
import org.apache.spark.sql.connector.catalog.TableProvider;
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
val schema = SparkSchemaUtil.convert(data.schema)
//第一个参数:数据库,第二个参数:表名
val name = TableIdentifier.of("testdb", "ice_table3")
val table = catalog.createTable(name, schema)
// write the dataset to the table
data.write.format("iceberg").mode("append").save("testdb.ice_table3")

// read the table
spark.read.format("iceberg").load("testdb.ice_table3")

spark.read.format("iceberg").load("testdb.ice_table3").createOrReplaceTempView("ice_test")
spark.sql("""SELECT count(1) FROM ice_test""")
相关标签: iceberg-apache