SparkSQL
一、什么是Spark SQL?
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
特点:
1)易整合 2)统一的数据访问方式 3)兼容Hive 4)标准的数据连接
二、DataFrame
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待,DataFrame也是懒执行的。性能上比RDD要高。
1、创建案例
(一)Linux的spark-shell下:
连接spark-shell
bin/spark-shell --master spark://hdp-1:7077 --executor-memory 512m --total-executor-cores 2
创建了一个数据集,实现了并行化
val seq= Seq((“1”,“xiaoming”,15),(“2”,“xiaohong”,20),(“3”,“xiaobi”,10))
val rdd1 = sc.parallelize(seq)
转换为DataFrame对象,空参的情况下。默认 _1 _2 _3.....列名
val df1 = rdd.toDF
val df2 = rdd1.toDF("id","name","age")
#查看数据 show 算子来打印,show是一个action类型算子,此时执行在集群上了
df1.show df2.show
查询信息
(1)DSL 风格语法
df.select("name").show
df.select("name","age").show
#条件过滤
df.select("name","age").filter("age >10").show
df.select("name","age").filter(col("age") >10).show
#分组统计个数
df.groupBy("age").count().show()
#打印DataFrame结构信息
df2.printSchema
(2)Sql 风格语法:
将DataFrame注册成表(临时表),表会被存储
df.registerTempTable("t_person")
查询:
spark.sqlContext.sql("select name,age from t_person where age > 10").show
spark.sqlContext.sql("select name,age from t_person order by age desc limit 2").show
(二)在idea中
注意:依赖spark版本要和spark-sql版本对应,参照https://mvnrepository.com/,否则报错
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
方法一:DSL、SQL
import org.apache.spark.{SparkConf, SparkContext}
class SqlOperation {
}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object SqlOperation {
def main(args: Array[String]): Unit = {
val sc =new SparkContext(new SparkConf().setAppName("SparkSQL").setMaster("local[*]"))
//创建SQLContext对象
val sqlc = new SQLContext(sc)
val lineRDD: RDD[Array[String]] = sc.textFile("sql\\sql1.txt").map(_.split(" "))
//将获取数据 关联到样例类中 Person自定义的类
val personRDD: RDD[Person] = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
//toDF相当于反射,这里若要使用的话,需要导入包,且必须放在第一个toDF上面
import sqlc.implicits._
val personDF: DataFrame = personRDD.toDF()
// val personDF: DataFrame = personRDD.toDF("ID","NAME","AGE")
// 一、DSL风格
personDF.show()
personDF.select("id","name","age").filter("age>15").show()
// 二、使用Sql语法
//注册临时表,这个表相当于存储在 SQLContext中所创建对象中
personDF.registerTempTable("t_person")
val sql = "select * from t_person where age > 20 order by age"
//查询
val res = sqlc.sql(sql)
res.show() //默认打印是20行
// 固化数据,将数据写到文件中mode是以什么形式写 写成什么文件
// res.write.mode("append").json("out3")
//除了这两种还可以csv模式,json模式
res.write.mode("append").save("out4")
}
//case class,编译器自动为你创建class和它的伴生 object,并实现了apply方法让你不需要通过 new 来创建类实例
case class Person(id:Int,name:String,age:Int)
}
方法二:StructType对象创建DataFrame
class SqlOper2 {
}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object SqlOper2 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("SparkSQLStructTypeDemo").setMaster("local"))
val sqlcontext = new SQLContext(sc)
val lineRDD = sc.textFile("sql\\sql1.txt").map(_.split(" "))
//创建StructType对象 封装了数据结构(类似于表的结构)
val structType: StructType = StructType {
List(
//列名 数据类型 是否为空(false是非空,即not nullable)
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("age", IntegerType, false)
)
}
//Row是spark.sql自带的,封装数据
val rowRDD: RDD[Row] = lineRDD.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
//将RDD转换为DataFrame
val personDF: DataFrame = sqlcontext.createDataFrame(rowRDD,structType)
personDF.show()
println(personDF.orderBy("age").count())
}
}
三、把数据写到mysql中
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object DataFormeInputJDBC {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("DataFormeInputJDBC").setMaster("local"))
val sqlContext = new SQLContext(sc)
val lines = sc.textFile("sql").map(_.split(" "))
// StructType 存的表结构
val structType: StructType = StructType(
Array(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//开始映射
val rowRDD: RDD[Row] = lines.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
//将当前RDD转换为DataFrame
val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
//创建一个用于写入mysql配置信息
val prop = new Properties()
prop.put("user","root")
prop.put("password","zxc123456")
prop.put("driver","com.mysql.cj.jdbc.Driver")
val jdbcurl = "jdbc:mysql://localhost/sparksql?characterEncoding=utf-8&serverTimezone=UTC"
val table = "person"
//propertities的实现是HashTable 表不存在会自动创建,存在会追加数据
personDF.write.mode("append").jdbc(jdbcurl,table,prop)
println("插入数据成功")
sc.stop()
}
}