欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用

程序员文章站 2022-06-15 14:16:24
...

进阶阶段实战目录

Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用

 

 updateStateByKey算子的使用

import Spark.WordCount.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateStateByKey extends App{

  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
  val ssc = new StreamingContext(sparkConf,Seconds(5))
  // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  // 内存数据丢失的时候,可以从checkpoint中恢复数据
  // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  ssc.checkpoint("E:/test")

  // 实现wordcount逻辑
  val lines = ssc.socketTextStream("hadoop2", 9999)
  //val lines = ssc.textFileStream("E:/test")
  val workds = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{
    //更新函数两个参数Seq[V], Option[S],前者是每个key新增的值的集合,后者是当前保存的状态,
    //创建一个变量,用于记录单词出现次数
    var newValue=state.getOrElse(0) //getOrElse相当于if....else.....
    for(value <- values){
      newValue +=value //将单词出现次数累计相加
    }
    Option(newValue)
  })
  workds.print()

  ssc.start()
  ssc.awaitTermination()
}

 

相关标签: SparkStreaming