Spark SQL常见4种数据源详解
通用load/write方法
手动指定选项
spark sql的dataframe接口支持多种数据源的操作。一个dataframe可以进行rdds方式的操作,也可以被注册为临时表。把dataframe注册为临时表之后,就可以对该dataframe执行sql查询。
spark sql的默认数据源为parquet格式。数据源为parquet文件时,spark sql可以方便的执行所有的操作。
修改配置项spark.sql.sources.default,可修改默认数据源格式。
scala> val df = spark.read.load("hdfs://hadoop001:9000/namesandages.parquet") df: org.apache.spark.sql.dataframe = [age: bigint, name: string] scala> df.select("name").write.save("names.parquet")
当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。
可以通过sparksession提供的read.load方法用于通用加载数据,使用write和save保存数据。
scala> val peopledf = spark.read.format("json").load("hdfs://hadoop001:9000/people.json") peopledf: org.apache.spark.sql.dataframe = [age: bigint, name: string] scala> peopledf.write.format("parquet").save("hdfs://hadoop001:9000/namesandages.parquet") scala>
除此之外,可以直接运行sql在文件上:
val sqldf = spark.sql("select * from parquet.`hdfs://hadoop001:9000/namesandages.parquet`") sqldf.show()
文件保存选项
可以采用savemode执行存储操作,savemode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用overwrite方式执行时,在输出新数据之前原数据就已经被删除。savemode详细介绍如下表:
scala/java | any language | meaning |
---|---|---|
savemode.errorifexists(default) | “error”(default) | 如果文件存在,则报错 |
savemode.append | “append” | 追加 |
savemode.overwrite | “overwrite” | 覆写 |
savemode.ignore | “ignore” | 数据存在,则忽略 |
parquet文件
parquet读写
parquet格式经常在hadoop生态圈中被使用,它也支持spark sql的全部数据类型。spark sql 提供了直接读取和存储 parquet 格式文件的方法。
// encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopledf = spark.read.json("examples/src/main/resources/people.json") // dataframes can be saved as parquet files, maintaining the schema information peopledf.write.parquet("hdfs://hadoop001:9000/people.parquet") // read in the parquet file created above // parquet files are self-describing so the schema is preserved // the result of loading a parquet file is also a dataframe val parquetfiledf = spark.read.parquet("hdfs://hadoop001:9000/people.parquet") // parquet files can also be used to create a temporary view and then used in sql statements parquetfiledf.createorreplacetempview("parquetfile") val namesdf = spark.sql("select name from parquetfile where age between 13 and 19") namesdf.map(attributes => "name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |name: justin| // +------------+
解析分区信息
对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=us │ │ └── data.parquet │ ├── country=cn │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=us │ └── data.parquet ├── country=cn │ └── data.parquet └── ...
通过传递path/to/table给 sqlcontext.read.parque
或sqlcontext.read.load,spark sql将自动解析分区信息。
返回的dataframe的schema如下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:
spark.sql.sources.partitioncolumntypeinference.enabled
默认值为true。
如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。
schema合并
像protocolbuffer、avro和thrift那样,parquet也支持schema evolution(schema演变)。用户可以先定义一个简单的schema,然后逐渐的向schema中增加列描述。通过这种方式,用户可以获取多个有不同schema但相互兼容的parquet文件。现在parquet数据源能自动检测这种情况,并合并这些文件的schemas。
因为schema合并是一个高消耗的操作,在大多数情况下并不需要,所以spark sql从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能:
当数据源为parquet文件时,将数据源选项mergeschema设置为true。
设置全局sql选项:
spark.sql.parquet.mergeschema为true。
// sqlcontext from the previous example is used in this example. // this is used to implicitly convert an rdd to a dataframe. import spark.implicits._ // create a simple dataframe, stored into a partition directory val df1 = sc.makerdd(1 to 5).map(i => (i, i * 2)).todf("single", "double") df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1") // create another dataframe in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makerdd(6 to 10).map(i => (i, i * 3)).todf("single", "triple") df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2") // read the partitioned table val df3 = spark.read.option("mergeschema", "true").parquet("hdfs://hadoop001:9000/data/test_table") df3.printschema() // the final schema consists of all 3 columns in the parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
hive数据源
apache hive是hadoop上的sql引擎,spark sql编译时可以包含hive支持,也可以不包含。包含hive支持的spark sql可以支持hive表访问、udf(用户自定义函数)以及 hive 查询语言(hiveql/hql)等。需要强调的 一点是,如果要在spark sql中包含hive的库,并不需要事先安装hive。一般来说,最好还是在编译spark sql时引入hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 spark,它应该已经在编译时添加了 hive 支持。
若要把spark sql连接到一个部署好的hive上,你必须把hive-site.xml复制到 spark的配置文件目录中($spark_home/conf)。即使没有部署好hive,spark sql也可以运行。
需要注意的是,如果你没有部署好hive,spark sql会在当前的工作目录中创建出自己的hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 hiveql 中的 create table (并非 create external table)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 hdfs,否则就是本地文件系统)。
import java.io.file import org.apache.spark.sql.row import org.apache.spark.sql.sparksession case class record(key: int, value: string) // warehouselocation points to the default location for managed databases and tables val warehouselocation = new file("spark-warehouse").getabsolutepath val spark = sparksession .builder() .appname("spark hive example") .config("spark.sql.warehouse.dir", warehouselocation) .enablehivesupport() .getorcreate() import spark.implicits._ import spark.sql sql("create table if not exists src (key int, value string)") sql("load data local inpath 'examples/src/main/resources/kv1.txt' into table src") // queries are expressed in hiveql sql("select * from src").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // aggregation queries are also supported. sql("select count(*) from src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // the results of sql queries are themselves dataframes and support all normal functions. val sqldf = sql("select key, value from src where key < 10 order by key") // the items in dataframes are of type row, which allows you to access each column by ordinal. val stringsds = sqldf.map { case row(key: int, value: string) => s"key: $key, value: $value" } stringsds.show() // +--------------------+ // | value| // +--------------------+ // |key: 0, value: val_0| // |key: 0, value: val_0| // |key: 0, value: val_0| // ... // you can also use dataframes to create temporary views within a sparksession. val recordsdf = spark.createdataframe((1 to 100).map(i => record(i, s"val_$i"))) recordsdf.createorreplacetempview("records") // queries can then join dataframe data with data stored in hive. sql("select * from records r join src s on r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ...
内嵌hive应用
如果要使用内嵌的hive,什么都不用做,直接用就可以了。 –conf :
spark.sql.warehouse.dir=
注意:如果你使用的是内部的hive,在spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用hdfs作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要向使用hdfs,则需要将metastore删除,重启集群。
外部hive应用
如果想连接外部已经部署好的hive,需要通过以下几个步骤。
a 将hive中的hive-site.xml拷贝或者软连接到spark安装目录下的conf目录下。
b 打开spark shell,注意带*问hive元数据库的jdbc客户端。
$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar
json数据集
spark sql 能够自动推测 json数据集的结构,并将它加载为一个dataset[row]. 可以通过sparksession.read.json()去加载一个 dataset[string]或者一个json 文件.注意,这个json文件不是一个传统的json文件,每一行都得是一个json串。
{"name":"michael"} {"name":"andy", "age":30} {"name":"justin", "age":19} // primitive types (int, string, etc) and product types (case classes) encoders are // supported by importing this when creating a dataset. import spark.implicits._ // a json dataset is pointed to by path. // the path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopledf = spark.read.json(path) // the inferred schema can be visualized using the printschema() method peopledf.printschema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // creates a temporary view using the dataframe peopledf.createorreplacetempview("people") // sql statements can be run by using the sql methods provided by spark val teenagernamesdf = spark.sql("select name from people where age between 13 and 19") teenagernamesdf.show() // +------+ // | name| // +------+ // |justin| // +------+ // alternatively, a dataframe can be created for a json dataset represented by // a dataset[string] storing one json object per string val otherpeopledataset = spark.createdataset( """{"name":"yin","address":{"city":"columbus","state":"ohio"}}""" :: nil) val otherpeople = spark.read.json(otherpeopledataset) otherpeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[columbus,ohio]| yin| // +---------------+----+
jdbc
spark sql可以通过jdbc从关系型数据库中读取数据的方式创建dataframe,通过对dataframe一系列的计算后,还可以将数据再写回关系型数据库中。
注意,需要将相关的数据库驱动放到spark的类路径下。
$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar // note: jdbc loading and saving can be achieved via either the load/save or jdbc methods // loading data from a jdbc source val jdbcdf = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "hive").load() val connectionproperties = new properties() connectionproperties.put("user", "root") connectionproperties.put("password", "hive") val jdbcdf2 = spark.read .jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", connectionproperties) // saving data to a jdbc source jdbcdf.write .format("jdbc") .option("url", "jdbc:mysql://hadoop001:3306/rdd") .option("dbtable", "rddtable2") .option("user", "root") .option("password", "hive") .save() jdbcdf2.write .jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionproperties) // specifying create table column data types on write jdbcdf.write .option("createtablecolumntypes", "name char(64), comments varchar(1024)") .jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionproperties)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 微信小程序image图片加载完成监听
下一篇: 讲一讲关键字分布原则
推荐阅读
-
SQL Server中row_number函数的常见用法示例详解
-
Spark SQL常见4种数据源详解
-
PHP开发中常见的安全问题详解和解决方法(如Sql注入、CSRF、Xss、CC等)
-
SQL Server比较常见数据类型详解
-
springboot集成spark并使用spark-sql的示例详解
-
PHP开发中常见的安全问题详解和解决方法(如Sql注入、CSRF、Xss、CC等)
-
Spark SQL常见4种数据源详解
-
springboot集成spark并使用spark-sql的示例详解
-
Microsoft Sql server2005的安装步骤图文详解及常见问题解决方案
-
SQL Server比较常见数据类型详解