欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

理解Spark SQL(三)—— Spark SQL程序举例

程序员文章站 2022-05-03 22:25:11
上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的。使用这个函数执行SQL语句前需要先调用DataFrame的createOrReplaceTempView注册一个临时表,所以关键是先 ......

上一篇说到,在spark 2.x当中,实际上sqlcontext和hivecontext是过时的,相反是采用sparksession对象的sql函数来操作sql语句的。使用这个函数执行sql语句前需要先调用dataframe的createorreplacetempview注册一个临时表,所以关键是先要将rdd转换成dataframe。实际上,在spark中实际声明了

type dataframe = dataset[row]

所以,dataframe是dataset[row]的别名。rdd是提供面向低层次的api,而dataframe/dataset提供面向高层次的api(适合于sql等面向结构化数据的场合)。

下面提供一些spark sql程序的例子。

例子一:sparksqlexam.scala

 1 package bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.row
 4 import org.apache.spark.sql.sparksession
 5 import org.apache.spark.sql.types._
 6 
 7 object sparksqlexam {
 8 
 9     case class offices(office:int,city:string,region:string,mgr:int,target:double,sales:double)
10     
11     def main(args: array[string]) {
12 
13         val spark = sparksession
14           .builder
15           .appname("sparksqlexam")
16           .getorcreate()
17         
18         runsparksqlexam1(spark)
19         runsparksqlexam2(spark)
20         
21         spark.stop()
22     
23     }
24     
25     
26     private def runsparksqlexam1(spark: sparksession): unit = {
27     
28         import spark.implicits._
29         
30         val rddoffices=spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p=>offices(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble))
31         val officesdataframe = spark.createdataframe(rddoffices)
32         
33         officesdataframe.createorreplacetempview("offices")
34         spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println)
35         
36     
37     }
38     
39     private def runsparksqlexam2(spark: sparksession): unit = {
40     
41          import spark.implicits._
42          import org.apache.spark.sql._
43          import org.apache.spark.sql.types._
44         
45          val schema = new structtype(array(structfield("office", integertype, false), structfield("city", stringtype, false), structfield("region", stringtype, false), structfield("mgr", integertype, true), structfield("target", doubletype, true), structfield("sales", doubletype, false)))
46          val rowrdd = spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(_.split("\t")).map(p => row(p(0).trim.toint,p(1),p(2),p(3).trim.toint,p(4).trim.todouble,p(5).trim.todouble))
47          val dataframe = spark.createdataframe(rowrdd, schema)
48          
49          dataframe.createorreplacetempview("offices2")        
50          spark.sql("select city from offices2 where region='western'").map(t=>"city: " + t(0)).collect.foreach(println)
51         
52     }
53     
54 }

使用下面的命令进行编译:

[root@brucecentos4 scala]# scalac sparksqlexam.scala

在编译之前,需要在classpath中增加路径:

export classpath=$classpath:$spark_home/jars/*:$(/opt/hadoop/bin/hadoop classpath)

然后打包成jar文件:

[root@brucecentos4 scala]# jar -cvf spark_exam_scala.jar bruce

然后通过spark-submit提交程序到yarn集群执行,为了方便从客户端查看结果,这里采用yarn cient模式运行。

[root@brucecentos4 scala]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.sparksqlexam --master yarn --deploy-mode client spark_exam_scala.jar

运行结果截图:

理解Spark SQL(三)—— Spark SQL程序举例

理解Spark SQL(三)—— Spark SQL程序举例

 

例子二:sparksqlexam.scala(需要启动hive metastore)

 1 package  bruce.bigdata.spark.example
 2 
 3 import org.apache.spark.sql.{savemode, sparksession}
 4 
 5 object sparkhiveexam {
 6 
 7     def main(args: array[string]) {
 8         
 9         val spark = sparksession
10           .builder()
11           .appname("spark hive exam")
12           .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
13           .enablehivesupport()
14           .getorcreate()
15        
16         import spark.implicits._
17         
18         //使用hql查看hive数据
19         spark.sql("show databases").collect.foreach(println)
20         spark.sql("use orderdb")
21         spark.sql("show tables").collect.foreach(println)
22         spark.sql("select city from offices where region='eastern'").map(t=>"city: " + t(0)).collect.foreach(println)
23         
24         //将hql查询出的数据保存到另外一张新建的hive表
25         //找出订单金额超过1万美元的产品
26         spark.sql("""create table products_high_sales(mfr_id string,product_id string,description string) 
27                    row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile""")
28         spark.sql("""select mfr_id,product_id,description
29                    from products a inner join orders b
30                    on a.mfr_id=b.mfr and a.product_id=b.product
31                    where b.amount>10000""").write.mode(savemode.overwrite).saveastable("products_high_sales")
32         
33         //将hdfs文件数据导入到hive表中            
34         spark.sql("""create table if not exists offices2 (office int,city string,region string,mgr int,target double,sales double ) 
35                    row format delimited fields terminated by '\t' lines terminated by '\n' stored as textfile""")
36         spark.sql("load data inpath '/user/hive/warehouse/orderdb.db/offices/offices.txt' into table offices2")
37         
38         spark.stop()
39     }
40 }

使用下面的命令进行编译:

[root@brucecentos4 scala]# scalac sparkhiveexam.scala

使用下面的命令打包:

[root@brucecentos4 scala]# jar -cvf spark_exam_scala.jar bruce

使用下面的命令运行:

[root@brucecentos4 scala]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.sparkhiveexam --master yarn --deploy-mode client spark_exam_scala.jar

程序运行结果:

理解Spark SQL(三)—— Spark SQL程序举例  理解Spark SQL(三)—— Spark SQL程序举例

另外上述程序运行后,hive中多了2张表:

理解Spark SQL(三)—— Spark SQL程序举例

 理解Spark SQL(三)—— Spark SQL程序举例

 

例子三:spark_sql_exam.py

 1 from __future__ import print_function
 2 
 3 from pyspark.sql import sparksession
 4 from pyspark.sql.types import *
 5 
 6 
 7 if __name__ == "__main__":
 8     spark = sparksession \
 9         .builder \
10         .appname("python spark sql exam") \
11         .config("spark.some.config.option", "some-value") \
12         .getorcreate()
13 
14     schema = structtype([structfield("office", integertype(), false), structfield("city", stringtype(), false), 
15         structfield("region", stringtype(), false), structfield("mgr", integertype(), true), 
16         structfield("target", doubletype(), true), structfield("sales", doubletype(), false)])
17         
18     rowrdd = spark.sparkcontext.textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt").map(lambda p: p.split("\t")) \
19         .map(lambda p: (int(p[0].strip()), p[1], p[2], int(p[3].strip()), float(p[4].strip()), float(p[5].strip())))
20             
21     dataframe = spark.createdataframe(rowrdd, schema)
22     dataframe.createorreplacetempview("offices")
23     spark.sql("select city from offices where region='eastern'").show()
24     
25     spark.stop()

 执行命令运行程序:

[root@brucecentos4 spark]# $spark_home/bin/spark-submit --master yarn --deploy-mode client spark_sql_exam.py

程序运行结果:

理解Spark SQL(三)—— Spark SQL程序举例

 

例子四:javasparksqlexam.java

 1 package bruce.bigdata.spark.example;
 2 
 3 import java.util.arraylist;
 4 import java.util.list;
 5 
 6 import org.apache.spark.api.java.javardd;
 7 import org.apache.spark.api.java.function.function;
 8 import org.apache.spark.api.java.function.mapfunction;
 9 import org.apache.spark.sql.dataset;
10 import org.apache.spark.sql.row;
11 import org.apache.spark.sql.rowfactory;
12 import org.apache.spark.sql.sparksession;
13 import org.apache.spark.sql.types.datatypes;
14 import org.apache.spark.sql.types.structfield;
15 import org.apache.spark.sql.types.structtype;
16 import org.apache.spark.sql.analysisexception;
17 
18 
19 public class javasparksqlexam {
20     public static void main(string[] args) throws analysisexception {
21         sparksession spark = sparksession
22           .builder()
23           .appname("java spark sql exam")
24           .config("spark.some.config.option", "some-value")
25           .getorcreate();    
26         
27         list<structfield> fields = new arraylist<>();
28         fields.add(datatypes.createstructfield("office", datatypes.integertype, false));
29         fields.add(datatypes.createstructfield("city", datatypes.stringtype, false));
30         fields.add(datatypes.createstructfield("region", datatypes.stringtype, false));
31         fields.add(datatypes.createstructfield("mgr", datatypes.integertype, true));
32         fields.add(datatypes.createstructfield("target", datatypes.doubletype, true));
33         fields.add(datatypes.createstructfield("sales", datatypes.doubletype, false));
34         
35         structtype schema = datatypes.createstructtype(fields);
36         
37         
38         javardd<string> officesrdd = spark.sparkcontext()
39           .textfile("/user/hive/warehouse/orderdb.db/offices/offices.txt", 1)
40           .tojavardd();
41         
42         javardd<row> rowrdd = officesrdd.map((function<string, row>) record -> {
43           string[] attributes = record.split("\t");
44           return rowfactory.create(integer.valueof(attributes[0].trim()), attributes[1], attributes[2], integer.valueof(attributes[3].trim()), double.valueof(attributes[4].trim()), double.valueof(attributes[5].trim()));
45         });
46         
47         dataset<row> dataframe = spark.createdataframe(rowrdd, schema);
48         
49         dataframe.createorreplacetempview("offices");
50         dataset<row> results = spark.sql("select city from offices where region='eastern'");
51         results.collectaslist().foreach(r -> system.out.println(r));
52         
53         spark.stop();
54     }
55 }

编译打包后通过如下命令执行:

[root@brucecentos4 spark]# $spark_home/bin/spark-submit --class bruce.bigdata.spark.example.javasparksqlexam --master yarn --deploy-mode client spark_exam_java.jar

运行结果:

理解Spark SQL(三)—— Spark SQL程序举例

 

上面是一些关于spark sql程序的一些例子,分别采用了scala/python/java来编写的。另外除了这三种语言,spark还支持r语言编写程序,因为我自己也不熟悉,就不举例了。不管用什么语言,其实api都是基本一致的,主要是采用dataframe和dataset的高层次api来调用和执行sql。使用这些api,可以轻松的将结构化数据转化成sql来操作,同时也能够方便的操作hive中的数据。