欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume+Kafka+SparkStream+Hbase+mysql+Hive和Hbase之间映射

程序员文章站 2022-07-13 17:54:41
...

前言:

我是结合自己所学,然后在网上搜资料最后写成的,中间遇到得多问题,不过最后都解决了,我把其中的细节提前说明一下:
1、从SparkStream往Hbase上传输数据的时候,Hbase里面的表和行键,都需要手动建
2、从SparkStream往mysql上传输数据的时候,先需要提前在mysql中建好相应的表并且设置好字段

这篇文章,如果有问题或者你们有更简单的办法,请留言,互相交流!!

一、环境

Hbase采用的是集群模式,我在之前文章里面讲过,链接:https://blog.csdn.net/qq_44472134/article/details/104143965

开发环境:
    系统:Win10
    开发工具:scala-eclipse-IDE
    项目管理工具:Maven 3.6.0
    JDK 1.8
    Scala 2.11.8
    Spark 2.1.1
    HBase 1.4.10
 
作业运行环境:
    系统:Linux CentOS15(三台机:主从节点,3核)
        master : 192.168.224.132  192.168.224.133  
        slave1 : 192.168.224.132 192.168.224.133 192.168.224.134
    JDK 1.8
    Scala 2.11.8
    Spark 2.1.1
    HBase 1.4.10
    Hadoop 2.7.1
    ZooKeeper 3.4.10

二、实现过程代码

Flume端,输出到kafka

a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /root/data/log.txt

a1.channels.c1.type=memory
a1.channles.c1.capacity=1000
a1.channles.c1.transactionCapacity=200

a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.s1.topic = 1705a
a1.sinks.s1.brokerList = node132:9092
a1.sinks.s1.requiredAcks = 1
a1.sinks.s1.batchSize = 20

a1.sources.r1.channels=c1
a1.sinks.s1.channel = c1

HbaseUtil工具类

package com.stream.com
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Connection
/**
 * TODO
 *
 * @author 徐磊
 * @email [email protected]
 * @data2020/02/04 下午 07:45
 */
object HbaseUtil {
  //配置信息
  private val conf = HBaseConfiguration.create()
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
  conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.224.132,192.168.224.133,192.168.224.134") //写自己的hbase节点ip
  //HBase连接
  @volatile private var connection: Connection = _
  //请求的连接数计数器(为0时关闭)
  @volatile private var num = 0

  //获取HBase连接
  def getHBaseConn: Connection = {
    synchronized {
      if (connection == null || connection.isClosed() || num == 0) {
        connection = ConnectionFactory.createConnection(conf)
        println("conn is created! " + Thread.currentThread().getName())
      }
      //每请求一次连接,计数器加一
      num = num + 1
      println("request conn num: " + num + " " + Thread.currentThread().getName())
    }
    connection
  }

  //关闭HBase连接
  def closeHbaseConn(): Unit = {
    synchronized {
      if (num <= 0) {
        println("no conn to close!")
        return
      }
      //每请求一次关闭连接,计数器减一
      num = num - 1
      println("request close num: " + num + " " + Thread.currentThread().getName())
      //请求连接计数器为0时关闭连接
      if (num == 0 && connection != null && !connection.isClosed()) {
        connection.close()
        println("conn is closed! " + Thread.currentThread().getName())
      }
    }
  }
}

Kafka+SparkStream+Hbase/mysql(最后输出到hbase的时候需要再单独写一个HbaseUtil工具类)

package com.stream.com
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Time
import scala.collection.Iterator
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

/**
 * TODO
 *
 * @author 徐磊
 * @email [email protected]
 * @data2020/01/21 下午 05:03
 */
object kafka2spark {   //从kafka发送消息到sparkstreaming,然后进行累加求和    并且把结果保存到数据库
  def main(args: Array[String]): Unit = {
    //累加的方法
    val lj=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
      iter.flatMap{case(x,y,z)=>Some(y.sum+z.getOrElse(0)).map(su=>(x,su))}
    }

  //**************************************************8ssc(SparkStream配置)
    val conf = new SparkConf()
      .setAppName("kafka")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)

    sc.setCheckpointDir("d://out1705a")

    val ssc = new StreamingContext(sc,Seconds(5))

    //******************************************************kafka的基本配置
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node132:9092",
      //反序列化消费数据
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      //kafka中组的名字
      "group.id" -> "g1",
      //kafka之前的数据都不进行计算
      "auto.offset.reset" -> "latest",
      //自动提交offset的值
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //topic名字
    val topics = Array("1705a")
    //从kafka接收的数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val value =stream.map(_.value())
    val res = value.flatMap(_.split(" ")).map((_,1)).updateStateByKey(lj,new HashPartitioner(sc.defaultParallelism),true)

    //***************************************************************将数据保存在mysql数据库
    /*res.foreachRDD(cs => {
      var conn: Connection = null;
      var ps: PreparedStatement = null;
      try {
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        cs.foreachPartition(f => {
          conn = DriverManager.getConnection("jdbc:mysql://192.168.224.132:3306/kafka?useUnicode=true&characterEncoding=utf8", "root", "root");
          ps = conn.prepareStatement("insert into xss values(?,?)");
          f.foreach(s => {
            ps.setString(1, s._1);
            ps.setInt(2, s._2);
            ps.executeUpdate();
          })
        })
      } catch {
        case t: Throwable => t.printStackTrace() // TODO: handle error
      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close();
        }
      }
    })*/

    //*********************************************************************将数据保存到hbase(有一个单独的工具类HbaseUtil)
    //在reduce聚合之后,输出结果至HBase(输出操作)
    res.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
      //RDD为空时,无需再向下执行,否则在分区中还需要获取数据库连接(无用操作)
      if (!rdd.isEmpty()) {
        //一个分区执行一批SQL
        rdd.foreachPartition((partition: Iterator[(String, Int)]) => {
          //每个分区都会创建一个task任务线程,分区多,资源利用率高
          //可通过参数配置分区数:"--conf spark.default.parallelism=20"
          if (!partition.isEmpty) {
            //partition和record共同位于本地计算节点Worker,故无需序列化发送conn和statement
            //如果多个分区位于一个Worker中,则共享连接(位于同一内存资源中)
            //获取HBase连接
            val conn = HbaseUtil.getHBaseConn
            if (conn == null) {
              println("conn is null.")  //在Worker节点的Executor中打印
            } else {
              println("conn is not null." + Thread.currentThread().getName())
              partition.foreach((record: (String, Int)) => {
                //每个分区中的记录在同一线程中处理
                println("record : " + Thread.currentThread().getName())
                //设置表名
                val tableName = TableName.valueOf("stohbase")
                //获取表的连接
                val table = conn.getTable(tableName)
                try {
                  //设定行键(单词)
                  val put = new Put(Bytes.toBytes(record._1))
                  //添加列值(单词个数)
                  //三个参数:列族、列、列值
                  put.addColumn(Bytes.toBytes("statistics"),
                    Bytes.toBytes("cnt"),
                    Bytes.toBytes(record._2))
                  //执行插入
                  table.put(put)
                  println("insert (" + record._1 + "," + record._2 + ") into hbase success.")
                } catch {
                  case e: Exception => e.printStackTrace()
                } finally {
                  table.close()
                }
              })
              //关闭HBase连接(此处每个partition任务结束都会执行,会频繁开关连接,耗费资源)
              //              HbaseUtil.closeHbaseConn()
            }
          }
        })
        //关闭HBase连接(此处只在Driver节点执行,故无效)
        //        HbaseUtil.closeHbaseConn()
      }
    })
    //打印从DStream中生成的RDD的前10个元素到控制台中
    res.print()  //print() 是输出操作,默认前10条数据

    //开始和等待
    ssc.start()
    ssc.awaitTermination()

  }
}

总结

flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:
为什么不用kafka直接接收源数据,而用flume作为Kafka的源?
从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。
为什么用spark stream作为kafka的消费者而不是其他?
就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。
为什么是入到hbase而不是其他Nosql?
无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。
最后总结这段话摘抄自 杨铖 作者,原链接:https://blog.csdn.net/yc_1993/article/details/80865009

相关标签: 大保健 大数据