欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkSQL

程序员文章站 2022-06-01 15:45:40
...

一、什么是Spark SQL?

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrameDataSet,并且作为分布式SQL查询引擎的作用。

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

特点:

1)易整合       2)统一的数据访问方式        3)兼容Hive      4)标准的数据连接

二、DataFrame

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

SparkSQL

 上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待,DataFrame也是懒执行的。性能上比RDD要高。

1、创建案例

(一)Linux的spark-shell下:

  连接spark-shell

bin/spark-shell --master spark://hdp-1:7077 --executor-memory 512m --total-executor-cores 2

  创建了一个数据集,实现了并行化

val seq= Seq((“1”,“xiaoming”,15),(“2”,“xiaohong”,20),(“3”,“xiaobi”,10))
val rdd1 = sc.parallelize(seq)

  转换为DataFrame对象,空参的情况下。默认 _1 _2 _3.....列名

val df1 = rdd.toDF
val df2 = rdd1.toDF("id","name","age")

#查看数据 show 算子来打印,show是一个action类型算子,此时执行在集群上了
df1.show     df2.show

查询信息

(1)DSL 风格语法

df.select("name").show
df.select("name","age").show

#条件过滤
df.select("name","age").filter("age >10").show
df.select("name","age").filter(col("age") >10).show

#分组统计个数
df.groupBy("age").count().show()

#打印DataFrame结构信息
df2.printSchema

(2)Sql 风格语法:

将DataFrame注册成表(临时表),表会被存储

df.registerTempTable("t_person")

查询:

spark.sqlContext.sql("select name,age from t_person where age > 10").show
spark.sqlContext.sql("select name,age from t_person order by age desc limit 2").show

(二)在idea中

注意:依赖spark版本要和spark-sql版本对应,参照https://mvnrepository.com/,否则报错

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

方法一:DSL、SQL

import org.apache.spark.{SparkConf, SparkContext}

class SqlOperation {
}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object SqlOperation {
  def main(args: Array[String]): Unit = {
    val sc  =new SparkContext(new SparkConf().setAppName("SparkSQL").setMaster("local[*]"))

    //创建SQLContext对象
    val sqlc = new SQLContext(sc)
    val lineRDD: RDD[Array[String]] = sc.textFile("sql\\sql1.txt").map(_.split(" "))
    //将获取数据 关联到样例类中   Person自定义的类
    val personRDD: RDD[Person] = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))

    //toDF相当于反射,这里若要使用的话,需要导入包,且必须放在第一个toDF上面
    import sqlc.implicits._
    val personDF: DataFrame = personRDD.toDF()
//    val personDF: DataFrame = personRDD.toDF("ID","NAME","AGE")
//    一、DSL风格
    personDF.show()
    personDF.select("id","name","age").filter("age>15").show()

//    二、使用Sql语法
    //注册临时表,这个表相当于存储在 SQLContext中所创建对象中
    personDF.registerTempTable("t_person")
    val sql = "select * from t_person where age > 20 order by age"
    //查询
    val res = sqlc.sql(sql)
    res.show()  //默认打印是20行
    // 固化数据,将数据写到文件中mode是以什么形式写  写成什么文件
    //    res.write.mode("append").json("out3")
    //除了这两种还可以csv模式,json模式
    res.write.mode("append").save("out4")
  }

  //case class,编译器自动为你创建class和它的伴生 object,并实现了apply方法让你不需要通过 new 来创建类实例
  case class Person(id:Int,name:String,age:Int)
}


方法二:StructType对象创建DataFrame

class SqlOper2 {
}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object SqlOper2 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("SparkSQLStructTypeDemo").setMaster("local"))
    val sqlcontext = new SQLContext(sc)

    val lineRDD =  sc.textFile("sql\\sql1.txt").map(_.split(" "))
    //创建StructType对象  封装了数据结构(类似于表的结构)
    val structType: StructType = StructType {
      List(
        //列名   数据类型 是否为空(false是非空,即not nullable)
        StructField("id", IntegerType, false),
        StructField("name", StringType, true),
        StructField("age", IntegerType, false)
      )
    }
    //Row是spark.sql自带的,封装数据
    val rowRDD: RDD[Row] = lineRDD.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
    //将RDD转换为DataFrame
    val personDF: DataFrame = sqlcontext.createDataFrame(rowRDD,structType)
    personDF.show()
    println(personDF.orderBy("age").count())
  }
}

三、把数据写到mysql中

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object DataFormeInputJDBC {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("DataFormeInputJDBC").setMaster("local"))
    val sqlContext = new SQLContext(sc)
    val lines = sc.textFile("sql").map(_.split(" "))
    // StructType 存的表结构
    val structType: StructType = StructType(
      Array(
        StructField("id", IntegerType, false),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //开始映射
    val rowRDD: RDD[Row] = lines.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
    //将当前RDD转换为DataFrame
    val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,structType)

    //创建一个用于写入mysql配置信息
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","zxc123456")
    prop.put("driver","com.mysql.cj.jdbc.Driver")
    val jdbcurl = "jdbc:mysql://localhost/sparksql?characterEncoding=utf-8&serverTimezone=UTC"
    val table = "person"
    //propertities的实现是HashTable    表不存在会自动创建,存在会追加数据
    personDF.write.mode("append").jdbc(jdbcurl,table,prop)
    println("插入数据成功")
    sc.stop()
  }
}

 

 

 

相关标签: Sparksql