理解Spark SQL(二)—— SQLContext和HiveContext
使用spark sql,除了使用之前介绍的方法,实际上还可以使用sqlcontext或者hivecontext通过编程的方式实现。前者支持sql语法解析器(sql-92语法),后者支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器来运行hiveql不支持的语法,如:select 1。实际上hivecontext是sqlcontext的子类,因此在hivecontext运行过程中除了override的函数和变量,可以使用和sqlcontext一样的函数和变量。
因为spark-shell工具实际就是运行的scala程序片段,为了方便,下面采用spark-shell进行演示。
首先来看sqlcontext,因为是标准sql,可以不依赖于hive的metastore,比如下面的例子(没有启动hive metastore):
[root@brucecentos4 ~]# $spark_home/bin/spark-shell --master yarn --conf spark.sql.catalogimplementation=in-memory
scala> case class offices(office:int,city:string,region:string,mgr:int,target:double,sales:double)
defined class offices
scala> val rddoffices=sc.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble))
rddoffices: org.apache.spark.rdd.rdd[offices] = mappartitionsrdd[3] at map at <console>:26
scala> val officesdataframe = spark.createdataframe(rddoffices)
officesdataframe: org.apache.spark.sql.dataframe = [office: int, city: string ... 4 more fields]
scala> officesdataframe.createorreplacetempview("offices")
scala> spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println)
city: newyork
city: chicago
city: atlanta
scala>
执行上面的命令后,实际上在yarn集群中启动了一个yarn client模式的spark application,然后在scala>提示符后输入的语句会生成rdd的transformation,最后一条命令中的collect会生成rdd的action,即会触发job的提交和程序的执行。
命令行中之所以加上--conf spark.sql.catalogimplementation=in-memory选项,是因为spark-shell中的默认启动的sparksession对象spark是默认支持hive的,不带这个选项启动的话,程序就会去连接hive metastore,因为这里并没有启动hive metastore,因此程序在执行createdataframe函数时会报错。
程序中的第一行是1个case class语句,这里是定义后面的数据文件的模式的(定义模式除了这个方法,其实还有另外一种方法,后面再介绍)。第二行从hdfs中读取一个文本文件,并工通过map映射到了模式上面。第三行基于第二行的rdd生成dataframe,第四行基于第三行的dataframe注册了一个逻辑上的临时表,最后一行就可以通过sparksession的sql函数来执行sql语句了。
实际上,sqlcontext是spark 1.x中的sql入口,在spark 2.x中,使用sparksession作为sql的入口,但是为了向后兼容,spark 2.x仍然支持sqlcontext来操作sql,不过会提示deprecated,所以上面的例子是采用spark 2.x中的写法。
实际上还有另外一种方法来操作sql,针对同样的数据,例如:
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = new structtype(array(structfield("office", integertype, false), structfield("city", stringtype, false), structfield("region", stringtype, false), structfield("mgr", integertype, true), structfield("target", doubletype, true), structfield("sales", doubletype, false)))
schema: org.apache.spark.sql.types.structtype = structtype(structfield(office,integertype,false), structfield(city,stringtype,false), structfield(region,stringtype,false), structfield(mgr,integertype,true), structfield(target,doubletype,true), structfield(sales,doubletype,false))
scala> val rowrdd = sc.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => row(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble))
rowrdd: org.apache.spark.rdd.rdd[org.apache.spark.sql.row] = mappartitionsrdd[3] at map at <console>:30
scala> val dataframe = spark.createdataframe(rowrdd, schema)
dataframe: org.apache.spark.sql.dataframe = [office: int, city: string ... 4 more fields]
scala> dataframe.createorreplacetempview("offices")
scala> spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println)
city: newyork
city: chicago
city: atlanta
这个例子与之前的例子有一些不同,主要的地方有3个:
1. 之前的例子是采用case class定义模式,spark采用反射来推断schema;而这个例子采用structtype类型的对象来定义模式,它接收一个数组,数组成员是structfield对象,代表一个字段的定义,每个字段的定义由字段名称、字段类型和是否允许为空组成;
2. 对于代表数据的rdd,之前的例子是直接用case class定义的类型来分割字段,而这个例子是用的row类型;
3. 在使用createdataframe函数生成dataframe时,该函数的参数不一样,之前的例子只要传入rdd对象即可(对象中隐含了模式),而这个例子需要同时传入rdd和定义的schema;
实际编程中建议采用第二种方法,因为其更加灵活,schema信息可以不必是写死的,而是可以在程序运行的过程中生成。
下面接着来看hivecontext的用法,使用hivecontext之前需要确保:
- 使用的spark是支持hive的;
- hive的配置文件hive-site.xml已经在spark的conf目录下;
- hive metastore已经启动;
举例说明:
首先启动hive metastore:
[root@brucecentos ~]# nohup hive --service metastore &
然后仍然通过spark-shell来举例说明,启动spark-shell,如下所示:
[root@brucecentos4 ~]# $spark_home/bin/spark-shell --master yarn
scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]
scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.dataframe = []
scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]
scala> spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println)
city: newyork
city: chicago
city: atlanta
scala>
可以看到这次启动spark-shell没有带上最后那个选项,这是因为这里我们打算用hivecontext来操作hive中的数据,需要支持hive。前面说过spark-shell是默认开启了hive支持的。同sqlcontext类似,spark 2.x中也不需要再用hivecontext对象来操作sql了,直接用sparksession对象来操作就好了。可以看到这里可以直接操作表,不用再定义schema,这是因为schema是由外部的hive metastore定义的,spark通过连接到hive metastore来读取表的schema信息,因此这里能直接操作sql。
另外,除了上面的使用sqlcontext操作普通文件(需要额外定义模式)和使用hivecontext操作hive表数据(需要开启hive metastore)之外,sqlcontext还能操作json、parquet等文件,由于这两种数据文件自己带了模式信息,因此可以直接基于文件创建dataframe,例如:
scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.dataframe = [age: bigint, name: string]
scala> df.createorreplacetempview("people")
scala> spark.sql("select name,age from people where age>19").map(t=>"name :" + t(0) + ", age: " + t(1)).collect.foreach(println)
name :andy, age: 30
最后来看下dataframe的另一种叫做dsl(domain specific language)的用法。
scala> val df = spark.read.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.dataframe = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|michael|
| 30| andy|
| 19| justin|
+----+-------+
scala> df.select("name").show()
+-------+
| name|
+-------+
|michael|
| andy|
| justin|
+-------+
scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|michael| null|
| andy| 31|
| justin| 20|
+-------+---------+
scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|andy|
+---+----+
scala> df.groupby("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
scala>
以上是对spark sql的sqlcontext和hivecontext基本用法的一些总结,都是采用spark-shell工具举的例子。实际上由于spark-shell是运行scala程序片段的工具,上述例子完全可以改成独立的应用程序。我将在下一篇博文当中尝试使用scala、java和python来编写独立的程序来操作上面的示例hive数据库orderdb,可以适当使用一些较为复杂的sql来统计分析数据。