理解Spark SQL(三)—— Spark SQL程序举例
上一篇说到,在spark 2.x当中,实际上sqlcontext和hivecontext是过时的,相反是采用sparksession对象的sql函数来操作sql语句的。使用这个函数执行sql语句前需要先调用dataframe的createorreplacetempview注册一个临时表,所以关键是先要将rdd转换成dataframe。实际上,在spark中实际声明了
type dataframe = dataset[row]
所以,dataframe是dataset[row]的别名。rdd是提供面向低层次的api,而dataframe/dataset提供面向高层次的api(适合于sql等面向结构化数据的场合)。
下面提供一些spark sql程序的例子。
例子一:sparksqlexam.scala
1 package bruce.bigdata.spark.example 2 3 import org.apache.spark.sql.row 4 import org.apache.spark.sql.sparksession 5 import org.apache.spark.sql.types._ 6 7 object sparksqlexam { 8 9 case class offices(office:int,city:string,region:string,mgr:int,target:double,sales:double) 10 11 def main(args: array[string]) { 12 13 val spark = sparksession 14 .builder 15 .appname("sparksqlexam") 16 .getorcreate() 17 18 runsparksqlexam1(spark) 19 runsparksqlexam2(spark) 20 21 spark.stop() 22 23 } 24 25 26 private def runsparksqlexam1(spark: sparksession): unit = { 27 28 import spark.implicits._ 29 30 val rddoffices=spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble)) 31 val officesdataframe = spark.createdataframe(rddoffices) 32 33 officesdataframe.createorreplacetempview("offices") 34 spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println) 35 36 37 } 38 39 private def runsparksqlexam2(spark: sparksession): unit = { 40 41 import spark.implicits._ 42 import org.apache.spark.sql._ 43 import org.apache.spark.sql.types._ 44 45 val schema = new structtype(array(structfield("office", integertype, false), structfield("city", stringtype, false), structfield("region", stringtype, false), structfield("mgr", integertype, true), structfield("target", doubletype, true), structfield("sales", doubletype, false))) 46 val rowrdd = spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => row(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble)) 47 val dataframe = spark.createdataframe(rowrdd, schema) 48 49 dataframe.createorreplacetempview("offices2") 50 spark.sql("select city from offices2 where region='western'").map(t=>"city: " + t(0)).collect.foreach(println) 51 52 } 53 54 }
使用下面的命令进行编译:
[root@brucecentos4 scala]# scalac sparksqlexam.scala
在编译之前,需要在classpath中增加路径:
export classpath=$classpath:$spark_home/jars/*:$(/opt/hadoop/bin/hadoop classpath)
然后打包成jar文件:
[root@brucecentos4 scala]# jar -cvf spark_exam_scala.jar bruce
然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。
[root@brucecentos4 scala]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.sparksqlexam --master yarn --deploy-mode client spark_exam_scala.jar
运行结果截图:
例子二:sparksqlexam.scala(需要启动hive metastore)
1 package bruce.bigdata.spark.example 2 3 import org.apache.spark.sql.{savemode, sparksession} 4 5 object sparkhiveexam { 6 7 def main(args: array[string]) { 8 9 val spark = sparksession 10 .builder() 11 .appname("spark hive exam") 12 .config("spark.sql.warehouse.dir", "/user/hive/warehouse") 13 .enablehivesupport() 14 .getorcreate() 15 16 import spark.implicits._ 17 18 //使用hql查看hive数据 19 spark.sql("show databases").collect.foreach(println) 20 spark.sql("use orderdb") 21 spark.sql("show tables").collect.foreach(println) 22 spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println) 23 24 //将hql查询出的数据保存到另外一张新建的hive表 25 //找出订单金额超过1万美元的产品 26 spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 27 row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile""") 28 spark.sql("""select mfr_id,product_id,description 29 from products a inner join orders b 30 on a.mfr_id=b.mfr and a.product_id=b.product 31 where b.amount>10000""").write.mode(savemode.overwrite).saveastable("products_high_sales") 32 33 //将hdfs文件数据导入到hive表中 34 spark.sql("""create table if not exists offices2 (office int,city string,region string,mgr int,target double,sales double ) 35 row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile""") 36 spark.sql("load data inpath '/user/hive/warehouse/orderdb.db/offices/offices.txt' into table offices2") 37 38 spark.stop() 39 } 40 }
使用下面的命令进行编译:
[root@brucecentos4 scala]# scalac sparkhiveexam.scala
使用下面的命令打包:
[root@brucecentos4 scala]# jar -cvf spark_exam_scala.jar bruce
使用下面的命令运行:
[root@brucecentos4 scala]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.sparkhiveexam --master yarn --deploy-mode client spark_exam_scala.jar
程序运行结果:
另外上述程序运行后,hive中多了2张表:
例子三:spark_sql_exam.py
1 from __future__ import print_function 2 3 from pyspark.sql import sparksession 4 from pyspark.sql.types import * 5 6 7 if __name__ == "__main__": 8 spark = sparksession \ 9 .builder \ 10 .appname("python spark sql exam") \ 11 .config("spark.some.config.option", "some-value") \ 12 .getorcreate() 13 14 schema = structtype([structfield("office", integertype(), false), structfield("city", stringtype(), false), 15 structfield("region", stringtype(), false), structfield("mgr", integertype(), true), 16 structfield("target", doubletype(), true), structfield("sales", doubletype(), false)]) 17 18 rowrdd = spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("\t")) \ 19 .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip()))) 20 21 dataframe = spark.createdataframe(rowrdd, schema) 22 dataframe.createorreplacetempview("offices") 23 spark.sql("select city from offices where region='eastern'").show() 24 25 spark.stop()
执行命令运行程序:
[root@brucecentos4 spark]# $spark_home/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py
程序运行结果:
例子四:javasparksqlexam.java
1 package bruce.bigdata.spark.example; 2 3 import java.util.arraylist; 4 import java.util.list; 5 6 import org.apache.spark.api.java.javardd; 7 import org.apache.spark.api.java.function.function; 8 import org.apache.spark.api.java.function.mapfunction; 9 import org.apache.spark.sql.dataset; 10 import org.apache.spark.sql.row; 11 import org.apache.spark.sql.rowfactory; 12 import org.apache.spark.sql.sparksession; 13 import org.apache.spark.sql.types.datatypes; 14 import org.apache.spark.sql.types.structfield; 15 import org.apache.spark.sql.types.structtype; 16 import org.apache.spark.sql.analysisexception; 17 18 19 public class javasparksqlexam { 20 public static void main(string[] args) throws analysisexception { 21 sparksession spark = sparksession 22 .builder() 23 .appname("java spark sql exam") 24 .config("spark.some.config.option", "some-value") 25 .getorcreate(); 26 27 list<structfield> fields = new arraylist<>(); 28 fields.add(datatypes.createstructfield("office", datatypes.integertype, false)); 29 fields.add(datatypes.createstructfield("city", datatypes.stringtype, false)); 30 fields.add(datatypes.createstructfield("region", datatypes.stringtype, false)); 31 fields.add(datatypes.createstructfield("mgr", datatypes.integertype, true)); 32 fields.add(datatypes.createstructfield("target", datatypes.doubletype, true)); 33 fields.add(datatypes.createstructfield("sales", datatypes.doubletype, false)); 34 35 structtype schema = datatypes.createstructtype(fields); 36 37 38 javardd<string> officesrdd = spark.sparkcontext() 39 .textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1) 40 .tojavardd(); 41 42 javardd<row> rowrdd = officesrdd.map((function<string, row>) record -> { 43 string[] attributes = record.split("\t"); 44 return rowfactory.create(integer.valueof(attributes[0].trim()), attributes[1], attributes[2], integer.valueof(attributes[3].trim()), double.valueof(attributes[4].trim()), double.valueof(attributes[5].trim())); 45 }); 46 47 dataset<row> dataframe = spark.createdataframe(rowrdd, schema); 48 49 dataframe.createorreplacetempview("offices"); 50 dataset<row> results = spark.sql("select city from offices where region='eastern'"); 51 results.collectaslist().foreach(r -> system.out.println(r)); 52 53 spark.stop(); 54 } 55 }
编译打包后通过如下命令执行:
[root@brucecentos4 spark]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.javasparksqlexam --master yarn --deploy-mode client spark_exam_java.jar
运行结果:
上面是一些关于spark sql程序的一些例子,分别采用了scala/python/java来编写的。另外除了这三种语言,spark还支持r语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实api都是基本一致的,主要是采用dataframe和dataset的高层次api来调用和执行sql。使用这些api,可以轻松的将结构化数据转化成sql来操作,同时也能够方便的操作hive中的数据。
推荐阅读
-
Spark SQL join的三种实现方式
-
[Spark--sql]--所有函数举例(spark-2.x版本)
-
理解Spark SQL(二)—— SQLContext和HiveContext
-
理解Spark SQL(一)—— CLI和ThriftServer
-
理解Spark SQL(三)—— Spark SQL程序举例
-
【spark系列4】spark 3.0.1集成delta 0.7.0原理解析--delta自定义sql
-
spark-sql(三)---spark-sql性能测试
-
理解Spark SQL(一)—— CLI和ThriftServer
-
[Spark--sql]--所有函数举例(spark-2.x版本)
-
理解Spark SQL(二)—— SQLContext和HiveContext