Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用
程序员文章站
2022-06-15 14:16:24
...
进阶阶段实战目录
updateStateByKey算子的使用
import Spark.WordCount.ssc import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object UpdateStateByKey extends App{ val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)) // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制 // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份 // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在 // 内存数据丢失的时候,可以从checkpoint中恢复数据 // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可 ssc.checkpoint("E:/test") // 实现wordcount逻辑 val lines = ssc.socketTextStream("hadoop2", 9999) //val lines = ssc.textFileStream("E:/test") val workds = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{ //更新函数两个参数Seq[V], Option[S],前者是每个key新增的值的集合,后者是当前保存的状态, //创建一个变量,用于记录单词出现次数 var newValue=state.getOrElse(0) //getOrElse相当于if....else..... for(value <- values){ newValue +=value //将单词出现次数累计相加 } Option(newValue) }) workds.print() ssc.start() ssc.awaitTermination() }
推荐阅读
-
Spark Streaming实时流处理项目实战笔记——从词频统计功能着手入门Spark Streaming
-
Spark Streaming实时流处理项目实战笔记——updateStateByKey算子的使用
-
Spark Streaming实时流处理项目实战笔记——Push方式整合之本地环境联调
-
Spark Streaming实时流处理项目实战笔记——使用Flume采集Log4j产生的日志
-
Spark Streaming实时流处理项目实战笔记——Kafka实战之整合Flume和Kafka完成实时数据采集
-
Spark Streaming实时流处理项目实战笔记——实战之黑名单过滤
-
Spark Streaming实时流处理项目实战笔记—— Flume实战案例(一)