欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume+Kafka+SparkStreaming实时统计网站日志 -- 4、SparkStreaming实时统计

程序员文章站 2024-02-21 21:27:46
...

      在上一篇 Flume采集数据到Kafka 中,日志采集到了Kafka,接下来要使用Spark Streaming,根据不同需求来统计数据。
      开发时先在windows的idea上测试,那如何在windows上构建spark开发的环境,可以参考我的这篇文章 IDEA首次配置maven开发spark详细步骤。项目中需要的依赖可参考我的github中的pom.xml文件。

一、使用Kafka消费者API

      这里要注意Kafka的版本,使用老版本的API和新版本的API不相同。这里使用的是新版本的API,在新版本(kafka-0-10)中,取消了Receiver模式,只有Direct模式了。
      Spark使用Kafka消费者API的核心代码如下

val topic = config.getString("kafka.topic").split(",")
val kafkaParams = Map[String,Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false:java.lang.Boolean)
  )
val messages = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topic,kafkaParams))

    // x.value获得数据里的值
    val logs = messages.map(x => x.value())

二、清洗数据

原来的数据格式是这样的

23.198.48.147	2019-11-26 17:28:01	"GET /class/145.html HTTP/1.1"	200	https://www.baidu.com/s?wd=spark

为了统计的时候方便,要把数据转换一下,后续统计时直接拿数据。
要转换的格式如下:

  • 时间格式改为 yyyyMMddHHmmss
  • 第3位的url剔除不是/class开头的
  • 提取出/class开头中间的course id,如145
    Spark Streaming核心代码如下:
## logs是spark Streaming消费Kafka日志数据得到的RDD
val cleanData = logs.map(line => {

      val infos = line.split("\t")
     
      val courseHtml = infos(2).split(" ")(1)
      var courseID = 0

      // 筛选课程是class的,不要learn、course等其他的
      if (courseHtml.contains("class")) {
        val classCou = courseHtml.split("/")(2)
        courseID = classCou.substring(0,classCou.lastIndexOf(".")).toInt
      }
    ClickLogCase(infos(0),DateUtil.parseToMinute(infos(1)),courseID,infos(3).toInt,infos(4))
    }).filter(clicklog => clicklog.courseID != 0)

三、统计每个课程每天的访问量

  1. 由于统计后的结果数据要存进HBase中,所以先设计好hbase的表
  • hbase表设计:表名 weblog_course_click,列族 info
    创表语句 create ‘weblog_course_click’,'info’
  • rowkey设计:rowkey是由两个字段组成 , day_courseID
  • 列只有一个,click_count 存放点击量
  1. Spark Streaming代码开发
## 核心代码如下
## cleanData是清洗之后的数据的RDD
cleanData.map( x => {
      (x.time.substring(0,8)+"_"+x.courseID,1)  // 在所有数据中拿到日期和课程id拼接成的结构
    }).reduceByKey(_+_)   // wordcount操作,计算当天的count值
      .foreachRDD(rdd => {
      rdd.foreachPartition( partitionRecodes => {
        val list = new ListBuffer[CourseClickCountCase]
        partitionRecodes.foreach(pair => {
          list.append(CourseClickCountCase(pair._1,pair._2))
        })
        CourseClickCountDAO.save(list)
      })
    })

四、统计每天从不同搜索引擎引流的课程的点击量

  1. HBase表设计
  • 表名: weblog_course_search ,列族: info
    创表语句 create ‘weblog_course_search’,'info’
  • rowkey设计: rowkey由三个字段组成, day_search_courseID ,search是referer字段中的搜索引擎,如www.baidu.com
  • 列只有一个,click_count 存放点击量
  1. Spark Streaming核心代码如下
// 下面cleanData重复用到了,可以持久化这个RDD以提高效率
cleanData.map( x => {
      //  https://www.baidu.com/s?wd=Storm实战 中拿到 www.baidu.com
      val url = x.referer.replaceAll("//","/")
      val splits = url.split("/")
      var host = ""

      // 过滤掉不符合的搜索引擎地址
      if (splits.length > 2) {
        host = splits(1)
      }
      (host,x.courseID,x.time.substring(0,8))
    }).filter(_._1 != "") // 过滤掉host为空的数据
      .map( x => {        // 把数据合并成 pairRDD
      (x._3+"_"+x._1+"_"+x._2 , 1)
    }).reduceByKey(_+_)   // wordcount操作,计算当天的count值
      .foreachRDD(rdd => {
      rdd.foreachPartition( partitionRecodes => {
        val list = new ListBuffer[CourseSearchCountCase]
        partitionRecodes.foreach(pair => {
          list.append(CourseSearchCountCase(pair._1,pair._2))
        })
        CourseSearchCountDAO.save(list)
      })
    })

五、测试

      上述的代码均在一个类中,可以在我的github spark2.11-bigdata项目中查看完整代码。还有其他工具类,和访问HBase 的DAO层。
      测试开始前,要保证自动生成代码的任务在运行,flume agent正常运行,Kafka正常运行。
      idea中,运行LogStreamingApp 类,查看控制台,没有报错即可,然后等待一分钟(因为自动生成日志的任务是一分钟一次,spark Streaming的批次也是一分钟),一分钟后控制台输出日志。最终结果还是要在HBase中查看。
Flume+Kafka+SparkStreaming实时统计网站日志 -- 4、SparkStreaming实时统计
看到value值会变化之后,说明成功了。
最后,可以把Spark Streaming打包在Spark集群上运行,但要注意两点

  • 打包时要注意Kafka和HBase client依赖要带上
  • 代码中sparkConf的setMaster参数删掉
spark-submit --master yarn --class project.weblog.ylqdh.bigdata.streaming.LogStreamingApp ./Weblog-SparkStreaming-Count-jar-with-dependencies.jar

项目到此就完成了,后续的步骤就是写一个Java Web项目,读取HBase中的数据来进行展示了。

六、后续

按上述,项目已经完成,但是还有几点思考:

  1. 项目需求过于简单,可以增加需求,如 统计改为按月统计、按年统计,或者统计访问网站时状态码是 404 的数量等
  2. Kafka的offset,如何管理
相关标签: BigData