Spark SQL学习笔记
程序员文章站
2022-07-15 17:43:07
...
Spark SQL概述
MapReduce有Hive作为调用接口,可以不用每次都手写MapReduce,而是让Hive自动生成MapReduce代码自己执行
那么Spark框架就有类似的产品,Shark(完全照搬Hive的设计
Shark两个缺点
- 因为完全照搬Hive,Hive是针对MapRudece开发的,所以Shark照搬了Hive就不好对Spark进行新增优化策略
- Spark线程级运行,MapReduce进程级运行,这样会产生线程安全问题,需要打补丁
然后就出现了Spark SQL
然后有了新的数据类型,DataFrame,支持SQL查询—之前Spark对RDD进行操作
RDD和DataFrame的区别
DataFrame创建
Spark Shell 会自动帮你创建sc对象和SparkSession对象
可以在Spark SQL读写数据的操作中看到,我们启动spark-shell的时候,会输出下面两条语句
Spark context available as 'sc' (master = local[*], app id = local-1575509509014).
Spark session available as 'spark'.
## 终端1
scala> val peopleDF = spark.read.format("json").load("file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.select("name","age").write.format("csv").save("file:///root/newpeople.csv")
## 终端2
[aaa@qq.com ~]# cd newpeople.csv/
[aaa@qq.com newpeople.csv]# ls
part-00000-3eb8b2ce-61c2-4dc8-9faf-302d0ee00fb0.csv _SUCCESS
[aaa@qq.com newpeople.csv]# cat *.csv
Michael,
Andy,30
Justin,19
RDD转化成DataFrame
两种方法
直接看教材了
Spark2.1.0入门:从RDD转换得到DataFrame
Spark SQL读写数据
下载mysql-connector-java-5.1.40.tar.gz
我之前有拷贝,所有就没有下载了
########## 终端1 ############
[aaa@qq.com jars]# spark-shell --jars /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/mysql-connector-java-5.1.40-bin.jar
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/12/04 20:31:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.11:4040
Spark context available as 'sc' (master = local[*], app id = local-1575509509014).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> jdbcDF.show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+
scala> import java.util.Properties
import java.util.Properties
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(gender,StringType,true), StructField(age,IntegerType,true))
scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
studentRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:28
scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[5] at map at <console>:30
scala> val studentDF = spark.createDataFrame(rowRDD, schema)
studentDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> val prop = new Properties()
prop: java.util.Properties = {}
scala> prop.put("user", "root") //表示用户名是root
res1: Object = null
scala> prop.put("password", "root") //表示密码是hadooroot
res2: Object = null
scala> prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
res3: Object = null
scala> studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)
scala> jdbcDF.show()
+---+---------+------+---+
| id| name|gender|age|
+---+---------+------+---+
| 1| Xueqian| F| 23|
| 2| Weiliang| M| 24|
| 3|Rongcheng| M| 26|
| 4| Guanhua| M| 27|
+---+---------+------+---+
######### 终端2 ##########
mysql> select * from student;
+------+-----------+--------+------+
| id | name | gender | age |
+------+-----------+--------+------+
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
| 3 | Rongcheng | M | 26 |
| 4 | Guanhua | M | 27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)
上一篇: Java---转换流
下一篇: 785. 判断二分图(图)