Spark学习笔记之Spark SQL的具体使用
程序员文章站
2022-06-02 10:33:06
1. spark sql是什么?
处理结构化数据的一个spark的模块
它提供了一个编程抽象叫做dataframe并且作为分布式sql查询引擎的作用...
1. spark sql是什么?
- 处理结构化数据的一个spark的模块
- 它提供了一个编程抽象叫做dataframe并且作为分布式sql查询引擎的作用
2. spark sql的特点
- 多语言的接口支持(java python scala)
- 统一的数据访问
- 完全兼容hive
- 支持标准的连接
3. 为什么学习sparksql?
我们已经学习了hive,它是将hive sql转换成mapreduce然后提交到集群上执行,大大简化了编写mapreduce的程序的复杂性,由于mapreduce这种计算模型执行效率比较慢。所有spark sql的应运而生,它是将spark sql转换成rdd,然后提交到集群执行,执行效率非常快!
4. dataframe(数据框)
- 与rdd类似,dataframe也是一个分布式数据容器
- 然而dataframe更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
- dataframe其实就是带有schema信息的rdd
5. sparksql1.x的api编程
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version>${spark.version}</version> </dependency>
5.1 使用sqlcontext创建dataframe(测试用)
object ops3 { def main(args: array[string]): unit = { val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val rdd1 = sc.parallelize(list(person("admin1", 14, "man"),person("admin2", 16, "man"),person("admin3", 18, "man"))) val df1: dataframe = sqlcontext.createdataframe(rdd1) df1.show(1) } } case class person(name: string, age: int, sex: string);
5.2 使用sqlcontxet中提供的隐式转换函数(测试用)
import org.apache.spark val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val rdd1 = sc.parallelize(list(person("admin1", 14, "man"), person("admin2", 16, "man"), person("admin3", 18, "man"))) import sqlcontext.implicits._ val df1: dataframe = rdd1.todf df1.show() 5.3 使用sqlcontext创建dataframe(常用) val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowrdd: rdd[row] = linesrdd.map(line => { val linesplit: array[string] = line.split(",") row(linesplit(0), linesplit(1).toint, linesplit(2)) }) val rowdf: dataframe = sqlcontext.createdataframe(rowrdd, schema) rowdf.show()
6. 使用新版本的2.x的api
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val df: dataframe = sparksession.createdataframe(rowrdd, schema) df.createorreplacetempview("p1") val df2 = sparksession.sql("select * from p1") df2.show()
7. 操作sparksql的方式
7.1 使用sql语句的方式对dataframe进行操作
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()//spark2.x新的api相当于spark1.x的sqlcontext val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val df: dataframe = sparksession.createdataframe(rowrdd, schema) df.createorreplacetempview("p1")//这是sprk2.x新的api 相当于spark1.x的registtemptable() val df2 = sparksession.sql("select * from p1") df2.show()
7.2 使用dsl语句的方式对dataframe进行操作
dsl(domain specific language ) 特定领域语言 val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) df.show()
8. sparksql的输出
8.1 写出到json文件
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2 写出到关系型数据库(mysql)
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) val url = "jdbc:mysql://localhost:3306/test" //表会自动创建 val tbname = "person1"; val prop = new properties() prop.put("user", "root") prop.put("password", "root") //savemode 默认为errorifexists df.write.mode(savemode.append).jdbc(url, tbname, prop)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 深入了解java-jwt生成与校验
下一篇: django的本地化