Flume+Kafka+SparkStream+Hbase+mysql+Hive和Hbase之间映射
前言:
我是结合自己所学,然后在网上搜资料最后写成的,中间遇到得多问题,不过最后都解决了,我把其中的细节提前说明一下:
1、从SparkStream往Hbase上传输数据的时候,Hbase里面的表和行键,都需要手动建
2、从SparkStream往mysql上传输数据的时候,先需要提前在mysql中建好相应的表并且设置好字段
这篇文章,如果有问题或者你们有更简单的办法,请留言,互相交流!!
文章目录
一、环境
Hbase采用的是集群模式,我在之前文章里面讲过,链接:https://blog.csdn.net/qq_44472134/article/details/104143965
开发环境:
系统:Win10
开发工具:scala-eclipse-IDE
项目管理工具:Maven 3.6.0
JDK 1.8
Scala 2.11.8
Spark 2.1.1
HBase 1.4.10
作业运行环境:
系统:Linux CentOS15(三台机:主从节点,3核)
master : 192.168.224.132 192.168.224.133
slave1 : 192.168.224.132 192.168.224.133 192.168.224.134
JDK 1.8
Scala 2.11.8
Spark 2.1.1
HBase 1.4.10
Hadoop 2.7.1
ZooKeeper 3.4.10
二、实现过程代码
Flume端,输出到kafka
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/data/log.txt
a1.channels.c1.type=memory
a1.channles.c1.capacity=1000
a1.channles.c1.transactionCapacity=200
a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.s1.topic = 1705a
a1.sinks.s1.brokerList = node132:9092
a1.sinks.s1.requiredAcks = 1
a1.sinks.s1.batchSize = 20
a1.sources.r1.channels=c1
a1.sinks.s1.channel = c1
HbaseUtil工具类
package com.stream.com
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Connection
/**
* TODO
*
* @author 徐磊
* @email [email protected]
* @data2020/02/04 下午 07:45
*/
object HbaseUtil {
//配置信息
private val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.224.132,192.168.224.133,192.168.224.134") //写自己的hbase节点ip
//HBase连接
@volatile private var connection: Connection = _
//请求的连接数计数器(为0时关闭)
@volatile private var num = 0
//获取HBase连接
def getHBaseConn: Connection = {
synchronized {
if (connection == null || connection.isClosed() || num == 0) {
connection = ConnectionFactory.createConnection(conf)
println("conn is created! " + Thread.currentThread().getName())
}
//每请求一次连接,计数器加一
num = num + 1
println("request conn num: " + num + " " + Thread.currentThread().getName())
}
connection
}
//关闭HBase连接
def closeHbaseConn(): Unit = {
synchronized {
if (num <= 0) {
println("no conn to close!")
return
}
//每请求一次关闭连接,计数器减一
num = num - 1
println("request close num: " + num + " " + Thread.currentThread().getName())
//请求连接计数器为0时关闭连接
if (num == 0 && connection != null && !connection.isClosed()) {
connection.close()
println("conn is closed! " + Thread.currentThread().getName())
}
}
}
}
Kafka+SparkStream+Hbase/mysql(最后输出到hbase的时候需要再单独写一个HbaseUtil工具类)
package com.stream.com
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import scala.collection.Iterator
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
/**
* TODO
*
* @author 徐磊
* @email [email protected]
* @data2020/01/21 下午 05:03
*/
object kafka2spark { //从kafka发送消息到sparkstreaming,然后进行累加求和 并且把结果保存到数据库
def main(args: Array[String]): Unit = {
//累加的方法
val lj=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.flatMap{case(x,y,z)=>Some(y.sum+z.getOrElse(0)).map(su=>(x,su))}
}
//**************************************************8ssc(SparkStream配置)
val conf = new SparkConf()
.setAppName("kafka")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("d://out1705a")
val ssc = new StreamingContext(sc,Seconds(5))
//******************************************************kafka的基本配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node132:9092",
//反序列化消费数据
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//kafka中组的名字
"group.id" -> "g1",
//kafka之前的数据都不进行计算
"auto.offset.reset" -> "latest",
//自动提交offset的值
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//topic名字
val topics = Array("1705a")
//从kafka接收的数据
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val value =stream.map(_.value())
val res = value.flatMap(_.split(" ")).map((_,1)).updateStateByKey(lj,new HashPartitioner(sc.defaultParallelism),true)
//***************************************************************将数据保存在mysql数据库
/*res.foreachRDD(cs => {
var conn: Connection = null;
var ps: PreparedStatement = null;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();
cs.foreachPartition(f => {
conn = DriverManager.getConnection("jdbc:mysql://192.168.224.132:3306/kafka?useUnicode=true&characterEncoding=utf8", "root", "root");
ps = conn.prepareStatement("insert into xss values(?,?)");
f.foreach(s => {
ps.setString(1, s._1);
ps.setInt(2, s._2);
ps.executeUpdate();
})
})
} catch {
case t: Throwable => t.printStackTrace() // TODO: handle error
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close();
}
}
})*/
//*********************************************************************将数据保存到hbase(有一个单独的工具类HbaseUtil)
//在reduce聚合之后,输出结果至HBase(输出操作)
res.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
//RDD为空时,无需再向下执行,否则在分区中还需要获取数据库连接(无用操作)
if (!rdd.isEmpty()) {
//一个分区执行一批SQL
rdd.foreachPartition((partition: Iterator[(String, Int)]) => {
//每个分区都会创建一个task任务线程,分区多,资源利用率高
//可通过参数配置分区数:"--conf spark.default.parallelism=20"
if (!partition.isEmpty) {
//partition和record共同位于本地计算节点Worker,故无需序列化发送conn和statement
//如果多个分区位于一个Worker中,则共享连接(位于同一内存资源中)
//获取HBase连接
val conn = HbaseUtil.getHBaseConn
if (conn == null) {
println("conn is null.") //在Worker节点的Executor中打印
} else {
println("conn is not null." + Thread.currentThread().getName())
partition.foreach((record: (String, Int)) => {
//每个分区中的记录在同一线程中处理
println("record : " + Thread.currentThread().getName())
//设置表名
val tableName = TableName.valueOf("stohbase")
//获取表的连接
val table = conn.getTable(tableName)
try {
//设定行键(单词)
val put = new Put(Bytes.toBytes(record._1))
//添加列值(单词个数)
//三个参数:列族、列、列值
put.addColumn(Bytes.toBytes("statistics"),
Bytes.toBytes("cnt"),
Bytes.toBytes(record._2))
//执行插入
table.put(put)
println("insert (" + record._1 + "," + record._2 + ") into hbase success.")
} catch {
case e: Exception => e.printStackTrace()
} finally {
table.close()
}
})
//关闭HBase连接(此处每个partition任务结束都会执行,会频繁开关连接,耗费资源)
// HbaseUtil.closeHbaseConn()
}
}
})
//关闭HBase连接(此处只在Driver节点执行,故无效)
// HbaseUtil.closeHbaseConn()
}
})
//打印从DStream中生成的RDD的前10个元素到控制台中
res.print() //print() 是输出操作,默认前10条数据
//开始和等待
ssc.start()
ssc.awaitTermination()
}
}
总结
flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:
为什么不用kafka直接接收源数据,而用flume作为Kafka的源?
从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。
为什么用spark stream作为kafka的消费者而不是其他?
就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。
为什么是入到hbase而不是其他Nosql?
无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。
最后总结这段话摘抄自 杨铖 作者,原链接:https://blog.csdn.net/yc_1993/article/details/80865009
上一篇: 解决github下载慢的问题
下一篇: 解决Github访问慢问题。