使用sparkstreaming计算uv并存入redis集群
程序员文章站
2022-07-09 19:47:26
使用sparkstreaming计算uv并存入redis集群首先这是我存入kafka的待分析数据(\t隔开):192.168.101.2--2003717971594541195000POST/ibikeSeries/findNearAllHTTP/1.1https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.htmlMac OS X (iPhone)Mobile Safari192.168.101.2...
使用sparkstreaming计算uv并存入redis集群
首先这是我存入kafka的待分析数据(\t隔开):
192.168.101.2 - - 200 3717 97 1594541195000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.2 - - 200 3717 117 1594541196000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.2 - - 200 3161 96 1594541386000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.222 - - 200 19 127 1594541387000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.22 - - 200 10 293 1594541399000 POST /ibikeSeries/recharge HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.21 - - 200 10 116 1594541400000 POST /ibikeSeries/log/addPayLog HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.32 - - 200 10 312 1594541401000 POST /ibikeSeries/log/savelog HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.22 - - 200 3711 481 1594541401000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.23 - - 200 3715 172 1594541594000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.32 - - 200 3419 154 1594541594000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
192.168.101.23 - - 200 3412 88 1594541596000 POST /ibikeSeries/findNearAll HTTP/1.1 https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html Mac OS X (iPhone) Mobile Safari
需求就是记录不同ip访问次数,相同的ip不累加,简单来说就是记录不同用户的访问量uv
kafka和redis集群的配置
application.conf
kafka.group.id = "ibike_streaming_analysis"
kafka.topic = "accesslog"
kafka.broker.list = "node1:9092,node2:9092,node3:9092"
# redis
# host做了本地域名映射
redis.host="node1,node2,node3"
redis.db.index=1
util类
package com.yc.ibike.analysis.util
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer
object ConfUtil {
//解析配置文件
private lazy val config: Config = ConfigFactory.load()
val topic = config.getString("kafka.topic")
val groupId: String = config.getString("kafka.group.id")
val redisHost: String = config.getString("redis.host")
val selectDBIndex = config.getInt("redis.db.index")
val broker: String = config.getString("kafka.broker.list")
import scala.collection.JavaConversions._
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> broker,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "true"
)
}
package com.yc.ibike.analysis.util
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool}
object RedisPoolUtil {
private val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxIdle(5) //最大的空闲连接数,连接池中最大的空闲连接数,默认是8
poolConfig.setMaxTotal(2000) //只支持最大的连接数,连接池中最大的连接数,默认是8
//连接池是私有的不能对外公开访问
private lazy val jedisPool = new JedisPool(poolConfig, ConfUtil.redisHost) //单节点的redis使用
//redis集群
val hosts = ConfUtil.redisHost.split(",")
val jedisClusterNodes = new java.util.HashSet[HostAndPort]()
for (host <- hosts) {
jedisClusterNodes.add(new HostAndPort(host, 6379))
}
private lazy val jedisCluster = new JedisCluster(jedisClusterNodes)
def getJedis() = {
//jedisCluster.select(ConfUtil.selectDBIndex)
jedisCluster
//以下是单机redis带联接池
// val jedis = jedisPool.getResource
//jedis.select(ConfUtil.selectDBIndex)
//jedis
}
}
scala代码
import com.yc.ibike.analysis.util.{ConfUtil, RedisPoolUtil}
import com.yc.ibike.analysis.util.RedisPoolUtil.getJedis
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import redis.clients.jedis.JedisCluster
object AccessLogAnalysis {
/**
* String: 聚合的key
* Seq[Int]:当前批次阁下生批次该单词在每一个分区出现的次数
* Option:初始值或累加的中间结果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { // ("a",[1,1,1,1,1], 5 )
//方案一: 当成元组元素来操作
//iter.map( t=>(t._1,t._2.sum+t._3.getOrElse(0)))
iter.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) }
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
val conf = new SparkConf().setAppName("AccessLogAnalysis").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// ssc.cache()
//状态要更新的话,要将中间结果保存下来,
ssc.checkpoint("./chpoint") // 也可以是hdfs
// val kafkaParams = Map[String, Object](
// "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
// "key.deserializer" -> classOf[StringDeserializer],
// "value.deserializer" -> classOf[StringDeserializer],
// "group.id" -> "accesslogAnalysis",
// "auto.offset.reset" -> "latest",
// "enable.auto.commit" -> (true: java.lang.Boolean)
// )
val topics = Array( ConfUtil.topic )
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, ConfUtil.kafkaParams) //订阅一组主题,以获取消息
)
//需求2: 计算总的UV
val line:DStream[String]=stream.map(record=>(record.value()))
val words:DStream[String]=line.map( _.split("\t")(0))
words.foreachRDD { rdd =>
//统计人数
rdd.foreachPartition { partition =>
//从分区所属executor的redis线程池获取一个连接.
val jedisCluster1 :JedisCluster = getJedis()
partition.foreach { case ( words) =>
//统计当前userId
//这里也可以按照每天的访问情况分别统计
//jedisCluster1.pfadd(s"accesslog_analysis_total_uv:$date", words)
jedisCluster1.pfadd("accesslog_analysis_total_uv", words)
}
//jedisCluster1.close()
}
}
//启动sparkstreaming程序
ssc.start()
//优雅退出
ssc.awaitTermination()
}
}
这里的重点是我计算uv使用的是HyperLogLog方案,而不是sql语句或者bitmap方案
因为若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.
HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
这样最后在redis里查到的数据就是这个样子的:
参考资料:
用Spark Streaming实时计算海量用户UV.
SparkSQL 实现UV & PV计算.
spark状态stream统计uv(updateStateByKey).
本文地址:https://blog.csdn.net/weixin_43761767/article/details/107395520