欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark SQL操作外部数据源(Parquet、Hive)

程序员文章站 2022-04-11 10:22:16
...

前言:在进行操作前需要把jdbc的jar包放到spark的jars文件夹下
我用的版本是

Parquet文件

Parquet文件是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据。
代码示例:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
 val spark = SparkSession.builder().master("local[1]").appName("parquet").getOrCreate()
    val sc = spark.sparkContext
//定义列名以及其类型
val schama = StructType(Array(
      StructField("name", StringType),
      StructField("color", StringType),
      StructField("number", ArrayType(IntegerType))
    ))
    
    val list = List(("hulk", "red", Array(12, 34, 1)),
      ("steven", null, Array(6, 2)),
      ("john", "yello", null),
      ("mary", "bulue", Array(66, 1314)),
      ("john", "gree", Array(11))
    )
    val rdd = sc.makeRDD(list)
    //把rdd转换成Row
    val rowRdd = rdd.map(x=>Row(x._1,x._2,x._3))
    val df = spark.createDataFrame(rowRdd,schama)
    df.show()

Spark SQL操作外部数据源(Parquet、Hive)
我们可以把生成的parquet存入到本地文件中:

//写入
df.write.parquet("data/parquet")

生成文件:
Spark SQL操作外部数据源(Parquet、Hive)
数据内容已被二进制化:
Spark SQL操作外部数据源(Parquet、Hive)
当然,我们也可以读取parquet文件:

val par = spark.read.parquet("data/parquet")
    par.show()

返回的是dataFrame类型
Spark SQL操作外部数据源(Parquet、Hive)

Hive操作

1、首先将Hive/conf下的hive-site.xml拷贝至${SPARK_HOME}/conf下;
2、nohup hive --service metastore &启动元数据服务
3、启动Hive操作界面
4、创建一个新表:

create table toronto(
full_name string, 
ssn string, 
office_address string);
insert into toronto(full_name, ssn, office_address) values('John S. ', '111-222-333 ', '123 Yonge Street ');

5、然后启动spark;
6、可以直接在spark shell界面访问我们刚创建的表:
spark是在Hive中的库名。(连接默认的是default库)

scala>val df=spark.table("spark.toronto")
scala>df.show

Spark SQL操作外部数据源(Parquet、Hive)
7、接下来的操作我们到idea中进行。
7.1、首先导入新的依赖包:

	<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.36</version>
    </dependency>

7.2、首先测试连接:

val spark = SparkSession.builder().master("local[1]").appName("sparkHive")
      .config("hive.metastore.uris","thrift://192.168.136.10:9083")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql("show databases").collect().foreach(println)

注意这里定义的spark对象和以往有所不同,新加了config和enableHiveSupport用作与Hive的连接。成功会即会显示Hive中存在的表格。
补充:hive.metastore.uris还可以自己配置,感兴趣的伙伴可以参考下这篇文章:
Hive metastore三种配置方式
配置完成后可以在linux中进行操作,如果Windows中没有配置spark环境则无法生效。
Spark SQL操作外部数据源(Parquet、Hive)
7.3、spark sql来进行简单的查询。(连接默认的是default库)

val df=spark.sql("select * from spark.toronto")
    df.show()

Spark SQL操作外部数据源(Parquet、Hive)
我们可以使用dataFrame提供的方法来做操作,下面演示查找ssn中以111开头的数据:

 df.filter(df("ssn").startsWith("111"))
df.where(df("ssn").startsWith("158"))

进阶操作

对CSV文件的操作。
样例:

Spark SQL操作外部数据源(Parquet、Hive)

1、Hive中创建表格,加载CSV文件:

create table emp(
id int,
name string,
job_titile string,
company string,
sdate string,
edate string)
row format serde
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with
SERDEPROPERTIES
("separatorChar"=",")
STORED as TEXTFILE;
//从本地加载文件
load data local inpath '/data/emp_basic.csv' overwrite into table emp;

2、spark查询表格数据:

val df = spark.sql("select * from test.emp")
df.show()

此时我们查看表格会发现表格中除了我们自定义的列名外还有数据自带的列名,我们要做的就是去除这些列名。
Spark SQL操作外部数据源(Parquet、Hive)

wherefilter

df.where(df("id").startsWith("emp_id").===(false)).show()
df.filter(df("id").startsWith("emp_id").===(false)).show()

删选出标题,生成新的dataFrame.
Spark SQL操作外部数据源(Parquet、Hive)
需求:按时间分组,统计每个时间段的顾客数量:

val emp = df.filter(df("id").startsWith("emp_id").===(false))
    emp.groupBy("sdate").count().show()

RDBMS表(MySQL)

1、地址连接

val url="jdbc:mysql://192.168.136.10:3306/hive"

2、定义表名

val tbname="TBLS"

3、设置连接用户、密码、数据库驱动类

val prop=new java.util.Properties
 prop.setProperty("user","root")
 prop.setProperty("password","ok")
  prop.setProperty("driver","com.mysql.jdbc.Driver")

4、取得表数据

val jdbcDF=spark.read.jdbc(url,tbname,prop)
jdbcDF.show()

5、我们也可以把spark中的表存入到mysql中
t1为表名。

jdbcDF.write.mode("append").jdbc(url,"t1",prop)