SparkSql概念和API操作
程序员文章站
2022-06-01 15:50:16
...
目录
一、学习Spark SQL必要性
Hive虽然简化了编写mapreduce的流程,但是有一个致命的缺点 慢!
所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行速度大大提升
二、SparkSQL的几大特点
易整合
将spark代码和Sql完美融和
统一的访问形式
在代码中进行非常简单的编写就可以访问各种数据源
兼容Hive
兼容hive意味着我们可以使用Hive中所有的语法
标准的数据连接
三、API操作
3.1、从mysql数据库获取数据
val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
#SparkSql跟SparkRDD的区别是它是 点sparkContext的,而RDD是 val sc = new SparkContext(conf)
val sc = conf.sparkContext
sc.setLogLevel("ERROR")
val pro = new Properties() # util包下的properties()
pro.setProperty("user","root")
pro.setProperty("password","root")
val df = conf.read.jdbc("jdbc:mysql://node132:3306/xss","sc",pro);
df.show()
3.2、 从HDFS/本地 上获取数据(因为获取的光是数据,所以需要额外创建schema)
object SparkSql extends App {
val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
val sc = conf.sparkContext
sc.setLogLevel("ERROR")
//获取数据并切割
val rddstring = sc.textFile("hdfs://node132:9000/stu/stu.txt").map(_.split(","))
//从本地获取数据,需要转义字符双斜杠
//val rdd1 = sc.textFile("C:\\Users\\Undo\\Desktop\\test.txt").map(x=>(x.split(",")))
//**************************************************创建schema的第一种方式
//把对象装到RDD中,创建表结构schema
val rddstu = rddstring.map(x=>Stu(x(0).toInt,x(1)))
//用最上面刚开始创建的conf来点
import conf.implicits._
val df = rddstu.toDF()
df.show()
}
case class Stu(id:Int,sex:String)
// *************************************************创建schema的第二种方式
val conf = SparkSession.builder().appName("sql").master("local[2]").getOrCreate()
val sc = conf.sparkContext
sc.setLogLevel("ERROR")
//获取数据并切割
val rddstring = sc.textFile("hdfs://node132:9000/stu/stu.txt").map(_.split(","))
//先创建行结构 Row
val rdd = rddstring.map(x=>Row(x(0).toInt,x(1)))
//再创建schema
val scheam = StructType(
List(
StructField("id",IntegerType,false),
StructField("name",StringType,false)
)
)
//用最上面的conf拼接到一块
val df = conf.createDataFrame(rdd,scheam)
df.show()
//**************************************************将最后结果输出到mysql数据库
//创建数据库连接池
val pro = new Properties()
pro.setProperty("user","root")
pro.setProperty("password","root")
//输出将结果输出mysql中,不用手动建表,他会自己建表
df.write.mode(saveMode = "append").jdbc("jdbc:mysql://node132:3306/xss","sssss",pro);
3.3、用spark展示hive表
object SparkSql extends App { //展示hive的话需要在resources包下几个有关hdfs和hive的包
//这个conf在后期有重要作用
val conf = SparkSession.builder()
.appName("hive")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
val sc = conf.sparkContext
sc.setLogLevel("ERROR")
//用hql语句进行查询
val df = conf.sql("select * from xy")
df.show()
}
四、SparkRDD和SparkSql的区别
推荐阅读
-
详解Python中映射类型(字典)操作符的概念和使用
-
详解Python中映射类型(字典)操作符的概念和使用
-
JS使用百度地图API自动获取地址和经纬度操作示例
-
Word/Excel文档操作API哪家强?一张表带你了解Aspose和Spire系列全功能对比
-
DBA_Oracle Startup / Shutdown启动和关闭过程详解(概念)(对数据库进行各种维护操作)
-
HDFS常用API操作 和 HDFS的I/O流操作
-
ES基本概念与API操作
-
PDF文档操作API哪家强?一张表带你了解Aspose和Spire系列PDF控件全功能对比
-
操作系统:进程的概念和与程序的区别
-
操作系统学习01-操作系统的概念功能和目标