Spark SQL操作外部数据源
程序员文章站
2024-03-22 18:38:16
...
Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据
package nj.zb.kb09.sql
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SparkSQL1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("sparksql").getOrCreate()
val sc = spark.sparkContext
//spark sql 写 parquet 文件
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("sex", StringType, true),
StructField("age", IntegerType, true)
))
val rowRDD = sc.parallelize(List(
("Alice", "F", 18), ("Bob", "M", 19), ("Rose", "F", 17)
)).map(x => Row(x._1, x._2, x._3))
val infoDF = spark.createDataFrame(rowRDD,schema)
// infoDF.write.parquet("out/info")
//spark sql 读 parquet 文件
val df = spark.read.parquet("out/info")
df.show()
df.printSchema()
}
}
hive表
首先需要将hive-site.xml拷贝到spark/conf目录下
cp /opt/hive/conf/hive-site.xml /opt/spark/conf
然后将jar包拷贝到spark/jars
cp /opt/hive/lib/mysql-connector.jar /opt/spark/jars
然后启动元数据服务
hive --service metastore &
直到输出jps后出现runJar进程后按ctrl+c
进入hive创建hive表
hive>create table toronto(full_name string, ssn string, office_address string);
hive>insert into toronto(full_name, ssn, office_address) values('John S. ', '111-222-333 ', '123 Yonge Street ');
集成hive后spark-shell可以直接访问hive表
val df=spark.table("toronto")
df.printSchema
df.show
通过idea建maven工程也能达到效果
需要在pom.xml下添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
package nj.zb.kb09.sql
import org.apache.spark.sql.SparkSession
object SparkSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("sparksqlHive")
.config("hive.metastore.uris","thrift://192.168.153.10:9083")
.enableHiveSupport().getOrCreate()
spark.sql("select * from toronto").show()
}
}
RDBMS(关系型数据库)表
同样需要拷贝jar包到spark/jars目录下
cp /opt/hive/lib/mysql-connector.jar /opt/spark/jars
package nj.zb.kb09.sql
import java.util.Properties
import org.apache.spark.sql.SparkSession
object SparkSQL2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("sparksqlMysql")
.master("local[*]").getOrCreate()
val url="jdbc:mysql://192.168.153.10:3306/hive"
//设置mysql连接用户、密码、数据库驱动类
val prop=new Properties()
prop.setProperty("user","root")
prop.setProperty("password","ok")
prop.setProperty("driver","com.mysql.jdbc.Driver")
//获取TBLS表数据
val df = spark.read.jdbc(url,"TBLS",prop)
df.printSchema()
df.show()
}
}
上一篇: Spark SQL自定义外部数据源