欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据云计算——Spark实战(常用数据库的读写实战)

程序员文章站 2022-04-10 23:40:15
...

Spark常用数据库的数据的读写

大数据云计算——Spark实战(常用数据库的读写实战)

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如下两个场景:

1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

           1日志数据:电商网站的商家操作日志

           2订单数据:保险行业订单数据

2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL

           1表中网站基本分析(pv、uVv。。。。。)

HBase数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TablelnputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输格式OutPutFormat

大数据云计算——Spark实战(常用数据库的读写实战)

大数据云计算——Spark实战(常用数据库的读写实战)

Spar数据的写入Hbase

package hbase

import java.util

import com.hankcs.hanlp.HanLP
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.hankcs.hanlp.seg.common.Term
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import search.SougouRecord


/**
 * Hbase的数据连接
 */
object HbaseConnection {
  def main(args: Array[String]): Unit = {
    def main(args: Array[String]): Unit = {
      //TODO 构建一个spark的对象
      //构建Spark Application 应用的入口实例
      val sc: SparkContext = {
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[2]")
        SparkContext.getOrCreate(sparkConf)
      }
      //TODO:1 加载搜狗的数据的集合使用小数据集合
      val inputpath = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.sample"
      val sougouRDD = sc.textFile(inputpath, minPartitions = 2)
      print(s"count=${sougouRDD.count()}")
      println(sougouRDD.first())

      //TODO 2:数据的ETL操作的
      val etlRDD: RDD[SougouRecord] = sougouRDD
        .filter(line => null != line && line.trim.split("\\s+").length == 6)
        .mapPartitions { iter =>
          iter.map { line =>
            val array = line.trim.split("\\s+")
            //构建一个对象
            SougouRecord(
              array(0), array(1),
              array(2).replace("\\[\\]", ""),
              array(3).toInt, array(4).toInt,
              array(5)
            )
          }
        }
      //由于数据使用多次 需要缓存数据
      etlRDD.persist(StorageLevel.MEMORY_AND_DISK)

      //TODO:搜索关键次统计
      val resultRDD: RDD[(String, Int)] = etlRDD
        .filter(recode => null != recode.queryWords && recode.queryWords.trim.length > 0)
        .flatMap { record =>
          //360安全卫士
          val words = record.queryWords.trim
          //使用的HanLP分词进行中文分词 360  安全  卫士
          val terms: util.List[Term] = HanLP.segment(words)
          //将java中的list转化为的scala中的list
          import scala.collection.JavaConverters._
          //封装到二元组的中的表示每一个搜索单词的出现的一次
          val result = terms.asScala.map {
            term => (term.word, 1)
          }
          //返回的结果
          result
        }
        //分组聚合
        .reduceByKey((tmp, item) => tmp + item)

      //TODO:将来结果的数据保存都hbase
      /**
       * 表名:htb_wordCount
       * rowkey  word
       * columFamily :info
       * columns:count
       *
       * 创建表的语句是
       */
      //第一步是将RDD转换为的RDD[(IMMutableByWriteTable,Put)]
      val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map { case (word, count) =>
        //创建的rowkey
        val rowkey = new ImmutableBytesWritable(Bytes.toBytes(word))
        //创建put对象
        val put: Put = new Put(rowkey.get())
        //添加column
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count.toString))
        //返回二元组
        (rowkey, put)
      }

      //TODO 保存数据的Hbase
      /**
       * def saveAsNewAPIHadoopFile(
       * path: String,
       * keyClass: Class[_],
       * valueClass: Class[_],
       * outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
       * conf: Configuration = self.context.hadoopConfiguration)
       */
      //连接hbase的客户端的信息
      val conf = HBaseConfiguration.create()
      //TOD0:连接HBase表Zookeeper相关信息
      conf.set("hbase.zookeeper. quorum", " ") //主机IP
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      conf.set("zookeeper.znode.parent", "/hbase")
      //TOD0:表的名称
      conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")

      //调用的底层的信息保存tableOutPUtForamt来数据
      putsRDD.saveAsNewAPIHadoopFile(
        path = " ",
        classOf[ImmutableBytesWritable],
        classOf[Put],
        classOf[TableOutputFormat[ImmutableBytesWritable]],
        conf
      )
      //TODO 关闭spark
      sc.stop()
    }
  }
}

Spar数据的读取Hbase

此外,读取的数据封装到RDD中,Key和Value类型分别为: lmmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:

package hbase


import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 * Hbase的数据连接 并重读取数据
 */
object HbaseReadTest {
  def main(args: Array[String]): Unit = {
    def main(args: Array[String]): Unit = {
      //TODO 构建一个spark的对象
      //构建Spark Application 应用的入口实例
      val sc: SparkContext = {
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[2]")
          //TOD0:设置使用Kryo序列化方式  
          .set("spark.serializer", "org.apache.spark.serializer. KryoSerializer")
          .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

        SparkContext.getOrCreate(sparkConf)
      }

      //TODO 连接hbase的客户端的信息
      val conf = HBaseConfiguration.create()
      //TOD0:连接HBase表Zookeeper相关信息
      conf.set("hbase.zookeeper. quorum", " ") //主机IP
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      conf.set("zookeeper.znode.parent", "/hbase")
      //TOD0:表的名称  注意导入的包文件
      conf set(TableInputFormat.INPUT_TABLE, "#表名")

      //TODO 读取HBase的数据
      /**
       * def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       * path: String,
       * fClass: Class[F],
       * kClass: Class[K],
       * vClass: Class[V],
       * conf: Configuration = hadoopConfiguration): RDD[(K, V)]
       */
      val HbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
        conf,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result]
      )
      //TODO 关闭spark
      sc.stop()
    }
  }
}

Spar数据的写入Mysql

开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

package mysql


import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 将相关的数据的保存早的mysql中
 */
object MysqlWriteTest {
  def main(args: Array[String]): Unit = {
    //TODO 构建一个spark的对象
    //构建Spark Application 应用的入口实例
    val sc: SparkContext = {
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      SparkContext.getOrCreate(sparkConf)
    }

    //TODO spark 连接数据库
    val inputpath = ""
    //1 读取数据
    val inputRDD: RDD[String] = sc.textFile(inputpath)
    //2处理分析数据的 调用的RDD中的transformation函数
    val resultRDD: RDD[(String, Int)] = inputRDD
      //过滤空数据
      .filter(line => null != line && line.trim.length != 0)
      //分割单词
      .flatMap(line => line.trim.split("\\s+"))
      //转为二元组 表示的是每一个单词的出现的次数
      .map(word => word -> 1)
      //分组聚合
      .reduceByKey((tmp, item) => tmp + item)

    resultRDD
      //降低分区数
      .coalesce(numPartitions = 1)
      .foreachPartition({ iter => saveToMYSQL(iter)
      })

    //TODO 关闭的spark资源
    sc.stop()
  }

  /**
   * 将RDD每一个分区数据的保存到MYSQL表汇总
   *
   * @param datas
   */
  def saveToMYSQL(datas: Iterator[(String, Int)]) = {
    //加载驱动
    Class.forName("")
    var conn: Connection = null
    var pstmt: PreparedStatement = null;
    try {
      //获取连接
      conn = DriverManager.getConnection(
        "",
        "",
        ""
      )

      //获取事务级别
      val autoConmmit = conn.getAutoCommit
      conn.setAutoCommit(false)
      //插入数据
      val insertsql = ""
      pstmt = conn.prepareStatement(insertsql)
      datas.foreach({ case (word, count) =>
        pstmt.setString(1, word)
        pstmt.setString(2, count.toString)
        //加入批量操作
        pstmt.addBatch()
      })
      //批次插入
      pstmt.executeBatch()
      //手动提交
      conn.commit()
      //还原数据库的原来的事务级别状态
      conn.setAutoCommit(autoConmmit)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      //关闭连接
      if (null != pstmt) pstmt.close()
      if (null != conn) conn.close()
    }
  }

}