欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkSql概念和API操作

程序员文章站 2022-06-01 15:50:16
...

 

目录

一、学习Spark SQL必要性

二、SparkSQL的几大特点

易整合

统一的访问形式

兼容Hive

标准的数据连接


一、学习Spark SQL必要性

Hive虽然简化了编写mapreduce的流程,但是有一个致命的缺点 慢!

所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行速度大大提升

二、SparkSQL的几大特点

易整合

将spark代码和Sql完美融和

 

SparkSql概念和API操作

统一的访问形式

SparkSql概念和API操作在代码中进行非常简单的编写就可以访问各种数据源

兼容Hive

 

SparkSql概念和API操作

兼容hive意味着我们可以使用Hive中所有的语法

 

标准的数据连接

SparkSql概念和API操作

三、API操作

3.1、从mysql数据库获取数据

val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
#SparkSql跟SparkRDD的区别是它是 点sparkContext的,而RDD是 val sc = new SparkContext(conf)
 val sc = conf.sparkContext   
  sc.setLogLevel("ERROR")
  val pro = new Properties()  # util包下的properties()
  pro.setProperty("user","root")
  pro.setProperty("password","root")

  val df = conf.read.jdbc("jdbc:mysql://node132:3306/xss","sc",pro);
  df.show()

3.2、 从HDFS/本地 上获取数据(因为获取的光是数据,所以需要额外创建schema)

object SparkSql extends App {
val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
  val sc = conf.sparkContext
  sc.setLogLevel("ERROR")
  //获取数据并切割
val rddstring = sc.textFile("hdfs://node132:9000/stu/stu.txt").map(_.split(","))
//从本地获取数据,需要转义字符双斜杠
//val rdd1 = sc.textFile("C:\\Users\\Undo\\Desktop\\test.txt").map(x=>(x.split(",")))
//**************************************************创建schema的第一种方式
  //把对象装到RDD中,创建表结构schema
  val rddstu = rddstring.map(x=>Stu(x(0).toInt,x(1)))
  //用最上面刚开始创建的conf来点
  import conf.implicits._
  val df = rddstu.toDF()
  df.show()

}
case class Stu(id:Int,sex:String)


// *************************************************创建schema的第二种方式
val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
  val sc = conf.sparkContext
  sc.setLogLevel("ERROR")
  //获取数据并切割
val rddstring = sc.textFile("hdfs://node132:9000/stu/stu.txt").map(_.split(","))
//先创建行结构  Row
val rdd = rddstring.map(x=>Row(x(0).toInt,x(1)))
//再创建schema
  val scheam = StructType(
    List(
      StructField("id",IntegerType,false),
      StructField("name",StringType,false)
    )
  )
//用最上面的conf拼接到一块
  val df = conf.createDataFrame(rdd,scheam)
  df.show()
//**************************************************将最后结果输出到mysql数据库
  //创建数据库连接池
val pro = new Properties()
  pro.setProperty("user","root")
  pro.setProperty("password","root")
  //输出将结果输出mysql中,不用手动建表,他会自己建表
  df.write.mode(saveMode = "append").jdbc("jdbc:mysql://node132:3306/xss","sssss",pro);

3.3、用spark展示hive表

object SparkSql extends App {   //展示hive的话需要在resources包下几个有关hdfs和hive的包
  //这个conf在后期有重要作用
  val conf = SparkSession.builder()
    .appName("hive")
    .master("local[2]")
    .enableHiveSupport()
    .getOrCreate()
val sc = conf.sparkContext
  sc.setLogLevel("ERROR")
  //用hql语句进行查询
  val df = conf.sql("select * from xy")
  df.show()
}

 

四、SparkRDD和SparkSql的区别

SparkSql概念和API操作

 

相关标签: Spark SparkSql