欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark SQL操作外部数据源

程序员文章站 2024-03-22 18:38:16
...

Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据

package nj.zb.kb09.sql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SparkSQL1 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]")
      .appName("sparksql").getOrCreate()
    val sc = spark.sparkContext

    //spark sql 写 parquet 文件
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("sex", StringType, true),
      StructField("age", IntegerType, true)
    ))
    val rowRDD = sc.parallelize(List(
      ("Alice", "F", 18), ("Bob", "M", 19), ("Rose", "F", 17)
    )).map(x => Row(x._1, x._2, x._3))

    val infoDF = spark.createDataFrame(rowRDD,schema)
//    infoDF.write.parquet("out/info")

    //spark sql 读 parquet 文件
    val df = spark.read.parquet("out/info")
    df.show()
    df.printSchema()
  }
}

Spark SQL操作外部数据源

hive表
首先需要将hive-site.xml拷贝到spark/conf目录下
cp /opt/hive/conf/hive-site.xml /opt/spark/conf
然后将jar包拷贝到spark/jars
cp /opt/hive/lib/mysql-connector.jar /opt/spark/jars

然后启动元数据服务
hive --service metastore &
直到输出jps后出现runJar进程后按ctrl+c

进入hive创建hive表

hive>create table toronto(full_name string, ssn string, office_address string);
hive>insert into toronto(full_name, ssn, office_address) values('John S. ', '111-222-333 ', '123 Yonge Street ');

集成hive后spark-shell可以直接访问hive表

val df=spark.table("toronto")
df.printSchema
df.show

通过idea建maven工程也能达到效果
需要在pom.xml下添加依赖

    <dependency>
      <groupId>mysql</groupId>
         <artifactId>mysql-connector-java</artifactId>
      <version>5.1.36</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
         <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
package nj.zb.kb09.sql

import org.apache.spark.sql.SparkSession

object SparkSQL2 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]")
      .appName("sparksqlHive")
      .config("hive.metastore.uris","thrift://192.168.153.10:9083")
      .enableHiveSupport().getOrCreate()

    spark.sql("select * from toronto").show()

  }
}

Spark SQL操作外部数据源
RDBMS(关系型数据库)表
同样需要拷贝jar包到spark/jars目录下
cp /opt/hive/lib/mysql-connector.jar /opt/spark/jars

package nj.zb.kb09.sql

import java.util.Properties

import org.apache.spark.sql.SparkSession

object SparkSQL2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("sparksqlMysql")
      .master("local[*]").getOrCreate()

    val url="jdbc:mysql://192.168.153.10:3306/hive"
    
    //设置mysql连接用户、密码、数据库驱动类
    val prop=new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","ok")
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    
    //获取TBLS表数据
    val df = spark.read.jdbc(url,"TBLS",prop)

    df.printSchema()
    df.show()
  }
}

Spark SQL操作外部数据源

相关标签: spark