欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark:sparksql:jdbc测试(mysql)

程序员文章站 2022-06-01 15:45:34
...
/**
    *  数据源:JDBC
    * @param spark
    */
  def testJDBC(spark: SparkSession): Unit = {

    //    从机器1的mysql读取数据

    println("========================第一种读取mysql方式================================")
    //默认partation为1
    val url1: String = "jdbc:mysql://127.0.0.1/test?useUnicode=true&characterEncoding=utf-8"
    val table1 = "tb_score"
    val properties1: Properties = new Properties()
    properties1.setProperty("user","root")
    properties1.setProperty("password","root")
    properties1.setProperty("driver","com.mysql.jdbc.Driver")
    val jdbcDF1: DataFrame = spark.read.jdbc(url1, table1, properties1)
    jdbcDF1.show()
    val jdbcDF1New: DataFrame = jdbcDF1.union(jdbcDF1) //将两个dataframe做合并
    jdbcDF1New.show()
    logger.info("第一种读取mysql完成!")
    println("jdbcDF1分区数:"+jdbcDF1.rdd.getNumPartitions) //查看并发度
    import org.apache.spark.TaskContext
    jdbcDF1.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })

    println("========================第二种读取mysql方式(分区)================================")
    //自定义partation数量
    val url2: String = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8"
    val table2 = "tb_score"
    val colName: String = "userid" //分区字段,需要是数值类的(partitionColumn must be a numeric column from the table in question.),经测试,除整型外,float、double、decimal都是可以的
    val lowerBound = 1 //下界,必须为整数
    val upperBound = 10 //上界,必须为整数
    val numPartions = 5 //最大分区数量,必须为整数,当为0或负整数时,实际的分区数为1;并不一定是最终的分区数量,例如“upperBound - lowerBound< numPartitions”时,实际的分区数量是“upperBound - lowerBound”;
    //注意:在分区结果中,分区是连续的,虽然查看每条记录的分区,不是顺序的,但是将rdd保存为文件后,可以看出是顺序的。
    val properties2: Properties = new Properties()
    properties2.setProperty("user","root")
    properties2.setProperty("password","root")
    properties2.setProperty("driver","com.mysql.jdbc.Driver")
    val jdbcDF2: DataFrame = spark.read.jdbc(url2, table2,colName,lowerBound,upperBound,numPartions,properties2)
    jdbcDF2.show()
    println("jdbcDF2分区数:"+jdbcDF2.rdd.getNumPartitions) //查看并发度
    import org.apache.spark.TaskContext
    jdbcDF2.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })

    println("========================第三种读取mysql方式================================")
    val jdbcDF3: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8")
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "tb_score")
//      .option("dbtable", "(select subject,score from tb_score) s")//也可以写查询字句的语句,但要括起来并给一个别名
      .load()
    jdbcDF3.show()
    println(jdbcDF3.rdd.getNumPartitions) //查看并发度
    println("jdbcDF3分区数:"+jdbcDF3.rdd.getNumPartitions) //查看并发度
    import org.apache.spark.TaskContext
    jdbcDF3.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })


    println("--------------------------------------------------------------------------")

    jdbcDF1.createTempView("tb_score")
    //    jdbcDF1.createOrReplaceTempView("tb_score")
    //    jdbcDF1.createGlobalTempView("tb_score") //注册成全局临时表,可在同一spark应用程序的多个session*享,引用全局临时表时需用global_temp 进行标识 如下示例:
    val newspark :SparkSession = spark.newSession()
    //    val res :DataFrame = newspark.sql("SELECT userid,score FROM global_temp.tb_score") // 引用全局临时表(使用一个新的SparkSession测试,验证了全局临时表是可以跨会话进行查询的)
    val results:DataFrame = spark.sql("SELECT userid,score FROM tb_score") //引用局部临时表
    var scoreflag =90
    val results1:DataFrame = spark.sql("SELECT userid,score FROM tb_score").where(s"score >${scoreflag}")
    results.show()
    results1.show()

    //将mysql表中的数据装入集合
    //collect()
    val mysqlRows:Array[Row] = results.collect()
    var lst = List[String]() //定义一个集合用于封装表中的数据
    for (row <- mysqlRows){
      val userid: String = row.getAs[String]("userid")
      val score: Double = row.getAs[Double]("score")
      lst = userid :: lst
    }
    //得到集合lst,就可以进行接下来的操作
    println(lst.mkString(","))

    //collectAsList()
    val ls:util.List[Row] = results.collectAsList()
    var lis = List[String]()
    for(i <- 0 to ls.size()-1){
      val exe_pro:String = ls.get(i).get(0)+"-"+ls.get(i).get(1) //如001-90.0
      println(exe_pro)
      lis = exe_pro :: lis
    }
    println(lis.mkString(","))

    //stripMargin和多行字符串的使用
    //
    //    val tmpfiled2 = spark.sql(
    //      """
    //        |select brand,regionname,branchname,workno,employeename,entrydate,historyoffercount from t_cal_tmp1
    //      """.stripMargin     //https://blog.csdn.net/weixin_38750084/article/details/99643789
    //    )

    println("========================写出到mysql第一种方式================================")
    //注意:mode(SaveMode.Overwrite)参数 这里使用Overwrite时会删掉表然后自动创建表,但Append模式不会删掉表,所以使用Append模式时可以自己定义表(自己定义字段类型提前在数据库里创建表)
    //推送到mysql second
    val connProperties = new Properties()
    connProperties.setProperty("user", ConfigUtils.mysqlUser)
    connProperties.setProperty("password", ConfigUtils.mysqlPw)
    connProperties.setProperty("driver", ConfigUtils.mysqlDriver)
    results.show()
    //将结果df保存到mysql表
    results.write.mode(SaveMode.Overwrite).jdbc(ConfigUtils.mysqlUrl, "sparksql_auo_cre_new_tb_score1", connProperties) //Append 追加 Overwrite 覆盖 注意:表自动创建


    println("========================写出到mysql第二种方式================================")
    results.write.mode(SaveMode.Overwrite).option("createTableColumnTypes","userid Char(64),score VARCHAR(1024)") //自定义字段类型不生效,指定的类型应该是有效的SparkSql数据类型
      .jdbc(ConfigUtils.mysqlUrl,"sparksql_auo_cre_new_tb_score2",connProperties)

    println("========================写出到mysql第三种方式================================")
    results.write.mode(SaveMode.Overwrite).format("jdbc")
      .option("url", ConfigUtils.mysqlUrl)
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "sparksql_auo_cre_new_tb_score3")
      .save()
  }

spark:sparksql:jdbc测试(mysql)

spark:sparksql:jdbc测试(mysql)

数据存入Mysql注意事项

A. 尽量先设置好存储模式

默认为SaveMode.ErrorIfExists模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库.另外三种模式如下:

SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;

SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;

SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

B. 设置存储模式的步骤为:

org.apache.spark.sql.SaveMode

......

df.write.mode(SaveMode.Append)

C. 若提前在数据库中手动创建表,需要注意列名称和数据类型,

下面的源码说明了,需要保证Spark SQL中schema中的field name与Mysql中的列名称一致!

spark:sparksql:jdbc测试(mysql)

 

若提前手动创建Mysql表,需要注意Spark SQL 中Schema中的数据类型与Mysql中的数据类型的对应关系,如下图所示:

 

第二种读取mysql方式(分区)解析:

Spark采用分区方式读取数据库时partitionColumn, lowerBound, upperBound, numPartitions的理解与验证

关键概念

  • partitionColumn:分区字段,需要是数值类的(partitionColumn must be a numeric column from the table in question.),经测试,除整型外,float、double、decimal都是可以的
  • lowerBound:下界,必须为整数
  • upperBound:上界,必须为整数
  • numPartitions:最大分区数量,必须为整数,当为0或负整数时,实际的分区数为1;并不一定是最终的分区数量,例如“upperBound - lowerBound< numPartitions”时,实际的分区数量是“upperBound - lowerBound”;
  • 在分区结果中,分区是连续的,虽然查看每条记录的分区,不是顺序的,但是将rdd保存为文件后,可以看出是顺序的。

验证

数据准备

在Mysql中创建表partition并插入数据,如下所示(两个字段都为int):

spark:sparksql:jdbc测试(mysql)

在spark-shell中进行相关概念验证

1.    启动spark-shell
2.    在spark-shell中读取准备的mysql中的数据

scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->  "jdbc:mysql://172.20.20.11:3306/testdepdb?user=testdep&password=testzet2018","dbtable" -> "testdepdb.partition","fetchSize" -> "20","partitionColumn" -> "seq", "lowerBound" -> "0", "upperBound" -> "10", "numPartitions" -> "5")).load()

3.    查看分区数

scala> jdbcDF.rdd.getNumPartitions
res6: Int = 5

4.    查看数据

scala> jdbc.show
+---+---------+
|seq|clicksNum|
+---+---------+
|  1|        1|
|  2|        2|
|  3|        3|
|  4|        4|
|  5|        5|
|  6|        6|
|  7|        7|
|  8|        8|
|  9|        9|
| 10|       10|
| 11|       11|
| 12|       12|
+---+---------+

5.    引入所需的库

import org.apache.spark.TaskContext

6.    打印每一行的partitionID

jdbcDF.foreach(row => {
      println("partitionId:" + TaskContext.get.partitionId)
    })
 
partitionId:3
partitionId:3
partitionId:4
partitionId:4
partitionId:4
partitionId:4
partitionId:4
partitionId:2
partitionId:2
partitionId:0
partitionId:1
partitionId:1

7. 保存为文件

scala> jdbcDF.rdd.saveAsTextFile("file///tmp/test/partition")

8. 查看文件内容

[aaa@qq.com partition]# ls
part-00000  part-00001  part-00002  part-00003  part-00004  _SUCCESS
[aaa@qq.com partition]# cat part-00000
[1,1]
[aaa@qq.com partition]# cat part-00001
[2,2]
[3,3]
[aaa@qq.com partition]# cat part-00002
[4,4]
[5,5]
[aaa@qq.com partition]# cat part-00003
[6,6]
[7,7]
[aaa@qq.com partition]# cat part-00004
[8,8]
[9,9]
[10,10]
[11,11]
[12,12]

参考:https://blog.csdn.net/wiborgite/article/details/84944596