Flume+Kafka+SparkStreaming实时统计网站日志 -- 4、SparkStreaming实时统计
在上一篇 Flume采集数据到Kafka 中,日志采集到了Kafka,接下来要使用Spark Streaming,根据不同需求来统计数据。
开发时先在windows的idea上测试,那如何在windows上构建spark开发的环境,可以参考我的这篇文章 IDEA首次配置maven开发spark详细步骤。项目中需要的依赖可参考我的github中的pom.xml文件。
一、使用Kafka消费者API
这里要注意Kafka的版本,使用老版本的API和新版本的API不相同。这里使用的是新版本的API,在新版本(kafka-0-10)中,取消了Receiver模式,只有Direct模式了。
Spark使用Kafka消费者API的核心代码如下
val topic = config.getString("kafka.topic").split(",")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topic,kafkaParams))
// x.value获得数据里的值
val logs = messages.map(x => x.value())
二、清洗数据
原来的数据格式是这样的
23.198.48.147 2019-11-26 17:28:01 "GET /class/145.html HTTP/1.1" 200 https://www.baidu.com/s?wd=spark
为了统计的时候方便,要把数据转换一下,后续统计时直接拿数据。
要转换的格式如下:
- 时间格式改为 yyyyMMddHHmmss
- 第3位的url剔除不是/class开头的
- 提取出/class开头中间的course id,如145
Spark Streaming核心代码如下:
## logs是spark Streaming消费Kafka日志数据得到的RDD
val cleanData = logs.map(line => {
val infos = line.split("\t")
val courseHtml = infos(2).split(" ")(1)
var courseID = 0
// 筛选课程是class的,不要learn、course等其他的
if (courseHtml.contains("class")) {
val classCou = courseHtml.split("/")(2)
courseID = classCou.substring(0,classCou.lastIndexOf(".")).toInt
}
ClickLogCase(infos(0),DateUtil.parseToMinute(infos(1)),courseID,infos(3).toInt,infos(4))
}).filter(clicklog => clicklog.courseID != 0)
三、统计每个课程每天的访问量
- 由于统计后的结果数据要存进HBase中,所以先设计好hbase的表
- hbase表设计:表名 weblog_course_click,列族 info
创表语句 create ‘weblog_course_click’,'info’ - rowkey设计:rowkey是由两个字段组成 , day_courseID
- 列只有一个,click_count 存放点击量
- Spark Streaming代码开发
## 核心代码如下
## cleanData是清洗之后的数据的RDD
cleanData.map( x => {
(x.time.substring(0,8)+"_"+x.courseID,1) // 在所有数据中拿到日期和课程id拼接成的结构
}).reduceByKey(_+_) // wordcount操作,计算当天的count值
.foreachRDD(rdd => {
rdd.foreachPartition( partitionRecodes => {
val list = new ListBuffer[CourseClickCountCase]
partitionRecodes.foreach(pair => {
list.append(CourseClickCountCase(pair._1,pair._2))
})
CourseClickCountDAO.save(list)
})
})
四、统计每天从不同搜索引擎引流的课程的点击量
- HBase表设计
- 表名: weblog_course_search ,列族: info
创表语句 create ‘weblog_course_search’,'info’ - rowkey设计: rowkey由三个字段组成, day_search_courseID ,search是referer字段中的搜索引擎,如www.baidu.com
- 列只有一个,click_count 存放点击量
- Spark Streaming核心代码如下
// 下面cleanData重复用到了,可以持久化这个RDD以提高效率
cleanData.map( x => {
// https://www.baidu.com/s?wd=Storm实战 中拿到 www.baidu.com
val url = x.referer.replaceAll("//","/")
val splits = url.split("/")
var host = ""
// 过滤掉不符合的搜索引擎地址
if (splits.length > 2) {
host = splits(1)
}
(host,x.courseID,x.time.substring(0,8))
}).filter(_._1 != "") // 过滤掉host为空的数据
.map( x => { // 把数据合并成 pairRDD
(x._3+"_"+x._1+"_"+x._2 , 1)
}).reduceByKey(_+_) // wordcount操作,计算当天的count值
.foreachRDD(rdd => {
rdd.foreachPartition( partitionRecodes => {
val list = new ListBuffer[CourseSearchCountCase]
partitionRecodes.foreach(pair => {
list.append(CourseSearchCountCase(pair._1,pair._2))
})
CourseSearchCountDAO.save(list)
})
})
五、测试
上述的代码均在一个类中,可以在我的github spark2.11-bigdata项目中查看完整代码。还有其他工具类,和访问HBase 的DAO层。
测试开始前,要保证自动生成代码的任务在运行,flume agent正常运行,Kafka正常运行。
idea中,运行LogStreamingApp 类,查看控制台,没有报错即可,然后等待一分钟(因为自动生成日志的任务是一分钟一次,spark Streaming的批次也是一分钟),一分钟后控制台输出日志。最终结果还是要在HBase中查看。
看到value值会变化之后,说明成功了。
最后,可以把Spark Streaming打包在Spark集群上运行,但要注意两点
- 打包时要注意Kafka和HBase client依赖要带上
- 代码中sparkConf的setMaster参数删掉
spark-submit --master yarn --class project.weblog.ylqdh.bigdata.streaming.LogStreamingApp ./Weblog-SparkStreaming-Count-jar-with-dependencies.jar
项目到此就完成了,后续的步骤就是写一个Java Web项目,读取HBase中的数据来进行展示了。
六、后续
按上述,项目已经完成,但是还有几点思考:
- 项目需求过于简单,可以增加需求,如 统计改为按月统计、按年统计,或者统计访问网站时状态码是 404 的数量等
- Kafka的offset,如何管理
上一篇: SBT安装入坑之默认镜像编译慢
推荐阅读