Spark SQL操作外部数据源(Parquet、Hive)
前言:在进行操作前需要把jdbc的jar包放到spark的jars文件夹下
我用的版本是
Parquet文件
Parquet文件是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据。
代码示例:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val spark = SparkSession.builder().master("local[1]").appName("parquet").getOrCreate()
val sc = spark.sparkContext
//定义列名以及其类型
val schama = StructType(Array(
StructField("name", StringType),
StructField("color", StringType),
StructField("number", ArrayType(IntegerType))
))
val list = List(("hulk", "red", Array(12, 34, 1)),
("steven", null, Array(6, 2)),
("john", "yello", null),
("mary", "bulue", Array(66, 1314)),
("john", "gree", Array(11))
)
val rdd = sc.makeRDD(list)
//把rdd转换成Row
val rowRdd = rdd.map(x=>Row(x._1,x._2,x._3))
val df = spark.createDataFrame(rowRdd,schama)
df.show()
我们可以把生成的parquet存入到本地文件中:
//写入
df.write.parquet("data/parquet")
生成文件:
数据内容已被二进制化:
当然,我们也可以读取parquet文件:
val par = spark.read.parquet("data/parquet")
par.show()
返回的是dataFrame类型
Hive操作
1、首先将Hive/conf下的hive-site.xml拷贝至${SPARK_HOME}/conf下;
2、nohup hive --service metastore &
启动元数据服务
3、启动Hive操作界面
4、创建一个新表:
create table toronto(
full_name string,
ssn string,
office_address string);
insert into toronto(full_name, ssn, office_address) values('John S. ', '111-222-333 ', '123 Yonge Street ');
5、然后启动spark;
6、可以直接在spark shell界面访问我们刚创建的表:
spark是在Hive中的库名。(连接默认的是default库)
scala>val df=spark.table("spark.toronto")
scala>df.show
7、接下来的操作我们到idea中进行。
7.1、首先导入新的依赖包:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
7.2、首先测试连接:
val spark = SparkSession.builder().master("local[1]").appName("sparkHive")
.config("hive.metastore.uris","thrift://192.168.136.10:9083")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql("show databases").collect().foreach(println)
注意这里定义的spark对象和以往有所不同,新加了config和enableHiveSupport用作与Hive的连接。成功会即会显示Hive中存在的表格。
补充:hive.metastore.uris
还可以自己配置,感兴趣的伙伴可以参考下这篇文章:
Hive metastore三种配置方式
配置完成后可以在linux中进行操作,如果Windows中没有配置spark环境则无法生效。
7.3、spark sql来进行简单的查询。(连接默认的是default库)
val df=spark.sql("select * from spark.toronto")
df.show()
我们可以使用dataFrame提供的方法来做操作,下面演示查找ssn中以111开头的数据:
df.filter(df("ssn").startsWith("111"))
df.where(df("ssn").startsWith("158"))
进阶操作
对CSV文件的操作。
样例:
1、Hive中创建表格,加载CSV文件:
create table emp(
id int,
name string,
job_titile string,
company string,
sdate string,
edate string)
row format serde
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with
SERDEPROPERTIES
("separatorChar"=",")
STORED as TEXTFILE;
//从本地加载文件
load data local inpath '/data/emp_basic.csv' overwrite into table emp;
2、spark查询表格数据:
val df = spark.sql("select * from test.emp")
df.show()
此时我们查看表格会发现表格中除了我们自定义的列名外还有数据自带的列名,我们要做的就是去除这些列名。
where
和filter
df.where(df("id").startsWith("emp_id").===(false)).show()
df.filter(df("id").startsWith("emp_id").===(false)).show()
删选出标题,生成新的dataFrame.
需求:按时间分组,统计每个时间段的顾客数量:
val emp = df.filter(df("id").startsWith("emp_id").===(false))
emp.groupBy("sdate").count().show()
RDBMS表(MySQL)
1、地址连接
val url="jdbc:mysql://192.168.136.10:3306/hive"
2、定义表名
val tbname="TBLS"
3、设置连接用户、密码、数据库驱动类
val prop=new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
4、取得表数据
val jdbcDF=spark.read.jdbc(url,tbname,prop)
jdbcDF.show()
5、我们也可以把spark中的表存入到mysql中
t1为表名。
jdbcDF.write.mode("append").jdbc(url,"t1",prop)
下一篇: virtualbox虚拟机网络设置