欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark学习笔记之Spark SQL的具体使用

程序员文章站 2022-06-02 10:33:06
1. spark sql是什么? 处理结构化数据的一个spark的模块 它提供了一个编程抽象叫做dataframe并且作为分布式sql查询引擎的作用...

1. spark sql是什么?

  • 处理结构化数据的一个spark的模块
  • 它提供了一个编程抽象叫做dataframe并且作为分布式sql查询引擎的作用

2. spark sql的特点

  • 多语言的接口支持(java python scala)
  • 统一的数据访问
  • 完全兼容hive
  • 支持标准的连接

3. 为什么学习sparksql?

我们已经学习了hive,它是将hive sql转换成mapreduce然后提交到集群上执行,大大简化了编写mapreduce的程序的复杂性,由于mapreduce这种计算模型执行效率比较慢。所有spark sql的应运而生,它是将spark sql转换成rdd,然后提交到集群执行,执行效率非常快!

4. dataframe(数据框)

  • 与rdd类似,dataframe也是一个分布式数据容器
  • 然而dataframe更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
  • dataframe其实就是带有schema信息的rdd

5. sparksql1.x的api编程

<dependency>
  <groupid>org.apache.spark</groupid>
  <artifactid>spark-sql_2.11</artifactid>
  <version>${spark.version}</version>
</dependency>

5.1 使用sqlcontext创建dataframe(测试用)

object ops3 {
  def main(args: array[string]): unit = {
    val conf = new sparkconf().setappname("ops3").setmaster("local[3]")
    val sc = new sparkcontext(conf)
    val sqlcontext = new sqlcontext(sc)
    val rdd1 = sc.parallelize(list(person("admin1", 14, "man"),person("admin2", 16, "man"),person("admin3", 18, "man")))
    val df1: dataframe = sqlcontext.createdataframe(rdd1)
    df1.show(1)
  }
}
case class person(name: string, age: int, sex: string);

5.2 使用sqlcontxet中提供的隐式转换函数(测试用)

import org.apache.spark
val conf = new sparkconf().setappname("ops3").setmaster("local[3]")
val sc = new sparkcontext(conf)
val sqlcontext = new sqlcontext(sc)
val rdd1 = sc.parallelize(list(person("admin1", 14, "man"), person("admin2", 16, "man"), person("admin3", 18, "man")))
import sqlcontext.implicits._
val df1: dataframe = rdd1.todf
df1.show()
5.3 使用sqlcontext创建dataframe(常用)
val conf = new sparkconf().setappname("ops3").setmaster("local[3]")
val sc = new sparkcontext(conf)
val sqlcontext = new sqlcontext(sc)
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/")
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val rowrdd: rdd[row] = linesrdd.map(line => {
 val linesplit: array[string] = line.split(",")
 row(linesplit(0), linesplit(1).toint, linesplit(2))
})
val rowdf: dataframe = sqlcontext.createdataframe(rowrdd, schema)
rowdf.show()

6. 使用新版本的2.x的api

val conf = new sparkconf().setappname("ops5") setmaster ("local[3]")
val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()
val sc = sparksession.sparkcontext
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowrdd: rdd[row] = linesrdd.map(line => {
  val splits: array[string] = line.split(",")
  row(splits(0), splits(1).toint, splits(2))
})
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val df: dataframe = sparksession.createdataframe(rowrdd, schema)

df.createorreplacetempview("p1")
val df2 = sparksession.sql("select * from p1")
df2.show()

7. 操作sparksql的方式

7.1 使用sql语句的方式对dataframe进行操作

val conf = new sparkconf().setappname("ops5") setmaster ("local[3]")
val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()//spark2.x新的api相当于spark1.x的sqlcontext
val sc = sparksession.sparkcontext
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowrdd: rdd[row] = linesrdd.map(line => {
  val splits: array[string] = line.split(",")
  row(splits(0), splits(1).toint, splits(2))
})
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val df: dataframe = sparksession.createdataframe(rowrdd, schema)

df.createorreplacetempview("p1")//这是sprk2.x新的api 相当于spark1.x的registtemptable()
val df2 = sparksession.sql("select * from p1")
df2.show()

7.2 使用dsl语句的方式对dataframe进行操作

dsl(domain specific language ) 特定领域语言

val conf = new sparkconf().setappname("ops5") setmaster ("local[3]")
val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()
val sc = sparksession.sparkcontext
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowrdd: rdd[row] = linesrdd.map(line => {
  val splits: array[string] = line.split(",")
  row(splits(0), splits(1).toint, splits(2))
})
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema)
import sparksession.implicits._
val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc)
df.show()

8. sparksql的输出

8.1 写出到json文件

val conf = new sparkconf().setappname("ops5") setmaster ("local[3]")
val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()
val sc = sparksession.sparkcontext
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowrdd: rdd[row] = linesrdd.map(line => {
  val splits: array[string] = line.split(",")
  row(splits(0), splits(1).toint, splits(2))
})
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema)
import sparksession.implicits._
val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")

8.2 写出到关系型数据库(mysql)

val conf = new sparkconf().setappname("ops5") setmaster ("local[3]")
val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()
val sc = sparksession.sparkcontext
val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowrdd: rdd[row] = linesrdd.map(line => {
  val splits: array[string] = line.split(",")
  row(splits(0), splits(1).toint, splits(2))
})
val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype)))
val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema)
import sparksession.implicits._
val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
//表会自动创建
val tbname = "person1";
val prop = new properties()
prop.put("user", "root")
prop.put("password", "root")
//savemode 默认为errorifexists
df.write.mode(savemode.append).jdbc(url, tbname, prop)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。