欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用sparkstreaming计算uv并存入redis集群

程序员文章站 2022-07-09 19:47:26
使用sparkstreaming计算uv并存入redis集群首先这是我存入kafka的待分析数据(\t隔开):192.168.101.2--2003717971594541195000POST/ibikeSeries/findNearAllHTTP/1.1https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.htmlMac OS X (iPhone)Mobile Safari192.168.101.2...
使用sparkstreaming计算uv并存入redis集群

首先这是我存入kafka的待分析数据(\t隔开):

192.168.101.2	-	-	200	3717	97	1594541195000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.2	-	-	200	3717	117	1594541196000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.2	-	-	200	3161	96	1594541386000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.222	-	-	200	19	127	1594541387000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.22	-	-	200	10	293	1594541399000	POST	/ibikeSeries/recharge	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.21	-	-	200	10	116	1594541400000	POST	/ibikeSeries/log/addPayLog	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.32	-	-	200	10	312	1594541401000	POST	/ibikeSeries/log/savelog	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.22	-	-	200	3711	481	1594541401000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.23	-	-	200	3715	172	1594541594000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.32	-	-	200	3419	154	1594541594000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.23	-	-	200	3412	88	1594541596000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari

需求就是记录不同ip访问次数,相同的ip不累加,简单来说就是记录不同用户的访问量uv

kafka和redis集群的配置
application.conf

kafka.group.id = "ibike_streaming_analysis"
kafka.topic = "accesslog"
kafka.broker.list = "node1:9092,node2:9092,node3:9092"

# redis
# host做了本地域名映射
redis.host="node1,node2,node3"
redis.db.index=1

util类

package com.yc.ibike.analysis.util

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer

object ConfUtil {
  //解析配置文件
  private lazy val config: Config = ConfigFactory.load()
  val topic = config.getString("kafka.topic")
  val groupId: String = config.getString("kafka.group.id")
  val redisHost: String = config.getString("redis.host")
  val selectDBIndex = config.getInt("redis.db.index")
  val broker: String = config.getString("kafka.broker.list")

  import scala.collection.JavaConversions._

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> broker,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "true"
  )
}

package com.yc.ibike.analysis.util


import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool}

object RedisPoolUtil {

  private val poolConfig = new GenericObjectPoolConfig()
  poolConfig.setMaxIdle(5) //最大的空闲连接数,连接池中最大的空闲连接数,默认是8
  poolConfig.setMaxTotal(2000) //只支持最大的连接数,连接池中最大的连接数,默认是8

  //连接池是私有的不能对外公开访问
  private lazy val jedisPool = new JedisPool(poolConfig, ConfUtil.redisHost)   //单节点的redis使用

  //redis集群
  val hosts = ConfUtil.redisHost.split(",")
  val jedisClusterNodes = new java.util.HashSet[HostAndPort]()
  for (host <- hosts) {
    jedisClusterNodes.add(new HostAndPort(host, 6379))
  }
  private lazy val jedisCluster = new JedisCluster(jedisClusterNodes)

  def getJedis() = {
    //jedisCluster.select(ConfUtil.selectDBIndex)
    jedisCluster

    //以下是单机redis带联接池
    // val jedis = jedisPool.getResource
    //jedis.select(ConfUtil.selectDBIndex)
    //jedis
  }
}

scala代码

import com.yc.ibike.analysis.util.{ConfUtil, RedisPoolUtil}
import com.yc.ibike.analysis.util.RedisPoolUtil.getJedis
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import redis.clients.jedis.JedisCluster

object AccessLogAnalysis {
  /**
   * String: 聚合的key
   * Seq[Int]:当前批次阁下生批次该单词在每一个分区出现的次数
   * Option:初始值或累加的中间结果
   */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {   //   ("a",[1,1,1,1,1],  5   )
    //方案一:  当成元组元素来操作
    //iter.map(  t=>(t._1,t._2.sum+t._3.getOrElse(0)))
    iter.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) }
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
    val conf = new SparkConf().setAppName("AccessLogAnalysis").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //  ssc.cache()
    //状态要更新的话,要将中间结果保存下来,
    ssc.checkpoint("./chpoint") //   也可以是hdfs

    //    val kafkaParams = Map[String, Object](
    //      "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
    //      "key.deserializer" -> classOf[StringDeserializer],
    //      "value.deserializer" -> classOf[StringDeserializer],
    //      "group.id" -> "accesslogAnalysis",
    //      "auto.offset.reset" -> "latest",
    //      "enable.auto.commit" -> (true: java.lang.Boolean)
    //    )
    val topics = Array(   ConfUtil.topic )
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, ConfUtil.kafkaParams) //订阅一组主题,以获取消息
    )
    //需求2: 计算总的UV
    val line:DStream[String]=stream.map(record=>(record.value()))
    val words:DStream[String]=line.map( _.split("\t")(0))

    words.foreachRDD { rdd =>
      //统计人数
      rdd.foreachPartition { partition =>
        //从分区所属executor的redis线程池获取一个连接.
        val jedisCluster1 :JedisCluster = getJedis()
        partition.foreach { case ( words) =>
          //统计当前userId
          //这里也可以按照每天的访问情况分别统计
  	      //jedisCluster1.pfadd(s"accesslog_analysis_total_uv:$date", words)
          jedisCluster1.pfadd("accesslog_analysis_total_uv", words)
        }
        //jedisCluster1.close()
      }
    }

    //启动sparkstreaming程序
    ssc.start()
    //优雅退出
    ssc.awaitTermination()
  }
}

这里的重点是我计算uv使用的是HyperLogLog方案,而不是sql语句或者bitmap方案
因为若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.
HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

这样最后在redis里查到的数据就是这个样子的:
使用sparkstreaming计算uv并存入redis集群

参考资料:
用Spark Streaming实时计算海量用户UV.
SparkSQL 实现UV & PV计算.
spark状态stream统计uv(updateStateByKey).

本文地址:https://blog.csdn.net/weixin_43761767/article/details/107395520