欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkStreaming的WordCount学习

程序员文章站 2022-07-06 14:15:22
...

SparkStreaming的入门WordCount:

object WordCount {
  def main(args: Array[String]): Unit = {
//local[2]是启动两个线程,一个是接收socket数据,一个打印到控制台
    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(1))

    /**
      * 数据的输入   socket: (IP : hadoop  端口: 9999)
      */
    val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)

    /**
      * 数据的处理
      */

    val resultStream = myDStream.flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)

    resultStream.print()

    /**
      *程序的结尾
      */

    ssc.start()
    ssc.awaitTermination()


  }

}

上面的WordCount只是统计一行数据中word和count,为了可以统计所有行WordCount,采用UpdateStateByKey,此算子需要加一个checkpoint目录,这里写了两种方式来实现UpdateStateByKey

UpdateStateByKeyTest:

object UpdateStateByKeyTest {
  def main(args: Array[String]): Unit = {

    //一定要有权限
    System.setProperty("HADOOP_USER_NAME","hadoop")
    /**
      * 创建程序入口
      */

    val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
    val ssc = new StreamingContext(conf,Seconds(1))

//    ssc.checkpoint("hdfs://hadoop:9000/sparkstreaming")
    ssc.checkpoint("D:\\tmp\\checkPoint")
    /**
      * 数据的输入
      */   
    val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)

    /**
      * 数据的处理
      */
//    val wordAndOneStream: DStream[(String, Int)] = myDStream.flatMap(_.split(","))
//      .map((_, 1))

    val wordDStream: DStream[String] = myDStream.flatMap(_.split(","))
    val wordAndOneStream: DStream[(String, Int)] = wordDStream.map((_,1))

     /**
      *
      *   def updateStateByKey
              updateFunc: (Seq[V], Option[S]) => Option[S]  传入的参数
      参数是一个函数:
               (Seq[V], Option[S]) => Option[S]
      每个key来说

      参数一:Seq[V]  本次输入的数据

      参数二:Option[S]   上一次数据的处理结果

      函数的返回值: Option[S]

      you,6


      you,1
      word,1
      you,1

      you,{1,1}
      word,{1}
      *
      */
    val wordCount = (values:Seq[Int], state:Option[Int]) =>{
      val currentCount = values.sum
      val lastCount = state.getOrElse(0)
      Some(currentCount+lastCount)
    }

    val resultDStream: DStream[(String, Int)] = wordAndOneStream.updateStateByKey(wordCount)
   
     /**
      * 数据的输出
      */
    resultDStream.print()

    /**
      * 程序的结尾
      */
    ssc.start()
    ssc.awaitTermination()
  }

}

还有一种方式:

object UpdateStateByKeyTestX {
  def main(args: Array[String]): Unit = {
    val checkPointDirectory = "D:\\tmp\\checkPoint"

    def functionToCreateContext():StreamingContext = {

     /**
        * 创建程序入口
        */
      val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
      val ssc = new StreamingContext(conf,Seconds(1))

      ssc.checkpoint(checkPointDirectory)

      val myDStream = ssc.socketTextStream("hadoop",9999)

      val wordAndOneDStream = myDStream.flatMap(_.split(",")).map((_,1))

      val wordCount = (values:Seq[Int], state:Option[Int]) =>{
        val currentCount = values.sum
        val lastCount = state.getOrElse(0)
        Some(currentCount+lastCount)
      }
      val resultDStream:DStream[(String,Int)] = wordAndOneDStream.updateStateByKey(wordCount)
      resultDStream.print()

      ssc
    }

    val ssc = StreamingContext.getOrCreate(checkPointDirectory,functionToCreateContext _)

    /**
      * 程序的结尾
      */
    ssc.start()
    ssc.awaitTermination()

  }

}

 

有些不合法的字段,我们在处理数据的时候,可以把它给过滤掉,比如:! ? * 等,我们可以使用transform算子,可以用来黑名单的过滤:

object TransformTest {
  def main(args: Array[String]): Unit = {
    /**
      * 创建程序入口
      */
    val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(1))

    /**
      * 数据的输入
      */

    val myDStream = ssc.socketTextStream("hadoop",9999)

    /**
      * 黑名单过滤
      * ? ! * 不进行单词计数
      * 模拟出来黑名单(应该是从redis或者mysql里面读取出来)
      */
    val wordblackRDD: RDD[(String,Boolean)] = sc.parallelize(List("?","!","*")).map((_,true))
    val blackList = sc.broadcast(wordblackRDD.collect())


    /**
      * 数据的处理
      */
    val wordAndOneDStream: DStream[(String, Int)] = myDStream.flatMap(line => line.split(","))
      .map((_, 1))

    val filterDStream: DStream[(String, Int)] = wordAndOneDStream.transform(rdd => {
    //RDD
      val wordBlackList: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackList.value)
    //(String, Int)  join   (String, Boolean)
      /**
        * String, (Int, Option[Boolean])
        * String: 单词
        * Int : 次数
        * filter(true)
        */

      val leftRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(wordBlackList)

      val filterRDD: RDD[(String, (Int, Option[Boolean]))] = leftRDD.filter(tuple => {
        val word = tuple._1
        val one = tuple._2._1
        if (tuple._2._2.isEmpty) {
          true
        } else {
          false
        }
      })
      filterRDD.map(tuple => (tuple._1, 1))

    })

    val wordCountDStream: DStream[(String, Int)] = filterDStream.reduceByKey((x:Int, y:Int) => x + y)


    /**
      * 数据的输出
      */
    wordCountDStream.print()

    /**
      * 程序的运行
      */
    ssc.start()
    ssc.awaitTermination()



  }

}

还可以每隔多少秒统计多少秒的单词情况(比如:每隔10秒统计30秒的单词请况),使用ReduceByKeyAndWindow。

ReduceByKeyAndWindowWordCount:

object ReduceByKeyAndWindowWordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 创建程序入口
      */
    val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(1))

    /**
      * 数据的输入
      */
    val myDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)

    /**
      * 数据的处理
      */
    val wordOneDStream: DStream[(String, Int)] = myDStream.flatMap(_.split(","))
      .map((_, 1))

     /**
      *   reduceFunc: (V, V) => V,
          windowDuration: Duration, windowLength(窗口长度)
         slideDuration: Duration  slideInterval(滑动的时间间隔).
      业务意义:
      每隔10秒 统计 最近 30秒的单词情况
      后面:
          统计广告点击的趋势
      */

    val resultDStream:DStream[(String , Int)] = wordOneDStream.reduceByKeyAndWindow((x:Int, y:Int) => x + y, Seconds(30), Seconds(10))

    /**
      * 数据的输出
      */
    resultDStream.print()

    /**
      * 程序的运行
      */
    ssc.start()
    ssc.awaitTermination()

  }

}