Spark + iceberg 注意事项
程序员文章站
2022-03-08 08:09:49
...
//切换版本
source change_spark_version spark-2.4.5.0
spark-shell
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.SparkConf
import org.apache.spark.sql.connector.catalog.TableProvider;
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
val schema = SparkSchemaUtil.convert(data.schema)
//第一个参数:数据库,第二个参数:表名
val name = TableIdentifier.of("testdb", "ice_table3")
val table = catalog.createTable(name, schema)
// write the dataset to the table
data.write.format("iceberg").mode("append").save("testdb.ice_table3")
// read the table
spark.read.format("iceberg").load("testdb.ice_table3")
spark.read.format("iceberg").load("testdb.ice_table3").createOrReplaceTempView("ice_test")
spark.sql("""SELECT count(1) FROM ice_test""")