大数据云计算——Spark实战(常用数据库的读写实战)
程序员文章站
2022-04-10 23:40:15
...
Spark常用数据库的数据的读写
Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如下两个场景:
1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析
1日志数据:电商网站的商家操作日志
2订单数据:保险行业订单数据
2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL
1表中网站基本分析(pv、uVv。。。。。)
HBase数据源
Spark可以从HBase表中读写(Read/Write)数据,底层采用TablelnputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输格式OutPutFormat
Spar数据的写入Hbase
package hbase
import java.util
import com.hankcs.hanlp.HanLP
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.hankcs.hanlp.seg.common.Term
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import search.SougouRecord
/**
* Hbase的数据连接
*/
object HbaseConnection {
def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
//TODO 构建一个spark的对象
//构建Spark Application 应用的入口实例
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
//TODO:1 加载搜狗的数据的集合使用小数据集合
val inputpath = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.sample"
val sougouRDD = sc.textFile(inputpath, minPartitions = 2)
print(s"count=${sougouRDD.count()}")
println(sougouRDD.first())
//TODO 2:数据的ETL操作的
val etlRDD: RDD[SougouRecord] = sougouRDD
.filter(line => null != line && line.trim.split("\\s+").length == 6)
.mapPartitions { iter =>
iter.map { line =>
val array = line.trim.split("\\s+")
//构建一个对象
SougouRecord(
array(0), array(1),
array(2).replace("\\[\\]", ""),
array(3).toInt, array(4).toInt,
array(5)
)
}
}
//由于数据使用多次 需要缓存数据
etlRDD.persist(StorageLevel.MEMORY_AND_DISK)
//TODO:搜索关键次统计
val resultRDD: RDD[(String, Int)] = etlRDD
.filter(recode => null != recode.queryWords && recode.queryWords.trim.length > 0)
.flatMap { record =>
//360安全卫士
val words = record.queryWords.trim
//使用的HanLP分词进行中文分词 360 安全 卫士
val terms: util.List[Term] = HanLP.segment(words)
//将java中的list转化为的scala中的list
import scala.collection.JavaConverters._
//封装到二元组的中的表示每一个搜索单词的出现的一次
val result = terms.asScala.map {
term => (term.word, 1)
}
//返回的结果
result
}
//分组聚合
.reduceByKey((tmp, item) => tmp + item)
//TODO:将来结果的数据保存都hbase
/**
* 表名:htb_wordCount
* rowkey word
* columFamily :info
* columns:count
*
* 创建表的语句是
*/
//第一步是将RDD转换为的RDD[(IMMutableByWriteTable,Put)]
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map { case (word, count) =>
//创建的rowkey
val rowkey = new ImmutableBytesWritable(Bytes.toBytes(word))
//创建put对象
val put: Put = new Put(rowkey.get())
//添加column
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count.toString))
//返回二元组
(rowkey, put)
}
//TODO 保存数据的Hbase
/**
* def saveAsNewAPIHadoopFile(
* path: String,
* keyClass: Class[_],
* valueClass: Class[_],
* outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
* conf: Configuration = self.context.hadoopConfiguration)
*/
//连接hbase的客户端的信息
val conf = HBaseConfiguration.create()
//TOD0:连接HBase表Zookeeper相关信息
conf.set("hbase.zookeeper. quorum", " ") //主机IP
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
//TOD0:表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
//调用的底层的信息保存tableOutPUtForamt来数据
putsRDD.saveAsNewAPIHadoopFile(
path = " ",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
conf
)
//TODO 关闭spark
sc.stop()
}
}
}
Spar数据的读取Hbase
此外,读取的数据封装到RDD中,Key和Value类型分别为: lmmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:
package hbase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Hbase的数据连接 并重读取数据
*/
object HbaseReadTest {
def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
//TODO 构建一个spark的对象
//构建Spark Application 应用的入口实例
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
//TOD0:设置使用Kryo序列化方式
.set("spark.serializer", "org.apache.spark.serializer. KryoSerializer")
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
SparkContext.getOrCreate(sparkConf)
}
//TODO 连接hbase的客户端的信息
val conf = HBaseConfiguration.create()
//TOD0:连接HBase表Zookeeper相关信息
conf.set("hbase.zookeeper. quorum", " ") //主机IP
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
//TOD0:表的名称 注意导入的包文件
conf set(TableInputFormat.INPUT_TABLE, "#表名")
//TODO 读取HBase的数据
/**
* def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
* path: String,
* fClass: Class[F],
* kClass: Class[K],
* vClass: Class[V],
* conf: Configuration = hadoopConfiguration): RDD[(K, V)]
*/
val HbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
//TODO 关闭spark
sc.stop()
}
}
}
Spar数据的写入Mysql
开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。
package mysql
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将相关的数据的保存早的mysql中
*/
object MysqlWriteTest {
def main(args: Array[String]): Unit = {
//TODO 构建一个spark的对象
//构建Spark Application 应用的入口实例
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
//TODO spark 连接数据库
val inputpath = ""
//1 读取数据
val inputRDD: RDD[String] = sc.textFile(inputpath)
//2处理分析数据的 调用的RDD中的transformation函数
val resultRDD: RDD[(String, Int)] = inputRDD
//过滤空数据
.filter(line => null != line && line.trim.length != 0)
//分割单词
.flatMap(line => line.trim.split("\\s+"))
//转为二元组 表示的是每一个单词的出现的次数
.map(word => word -> 1)
//分组聚合
.reduceByKey((tmp, item) => tmp + item)
resultRDD
//降低分区数
.coalesce(numPartitions = 1)
.foreachPartition({ iter => saveToMYSQL(iter)
})
//TODO 关闭的spark资源
sc.stop()
}
/**
* 将RDD每一个分区数据的保存到MYSQL表汇总
*
* @param datas
*/
def saveToMYSQL(datas: Iterator[(String, Int)]) = {
//加载驱动
Class.forName("")
var conn: Connection = null
var pstmt: PreparedStatement = null;
try {
//获取连接
conn = DriverManager.getConnection(
"",
"",
""
)
//获取事务级别
val autoConmmit = conn.getAutoCommit
conn.setAutoCommit(false)
//插入数据
val insertsql = ""
pstmt = conn.prepareStatement(insertsql)
datas.foreach({ case (word, count) =>
pstmt.setString(1, word)
pstmt.setString(2, count.toString)
//加入批量操作
pstmt.addBatch()
})
//批次插入
pstmt.executeBatch()
//手动提交
conn.commit()
//还原数据库的原来的事务级别状态
conn.setAutoCommit(autoConmmit)
} catch {
case e: Exception => e.printStackTrace()
} finally {
//关闭连接
if (null != pstmt) pstmt.close()
if (null != conn) conn.close()
}
}
}