sparkstreaming的状态管理
程序员文章站
2022-07-06 14:14:58
...
今天我们主要来说一下sparkstreaming带状态的操作,updateStateByKey和mapWithState这两个方法,先看一下官网的介绍:
UpdateStateByKey操作
该updateStateByKey操作允许您在使用新信息持续更新时保持任意状态。要使用它,您必须执行两个步骤。
定义状态 - 状态可以是任意数据类型。
定义状态更新功能 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。
在每个批处理中,Spark都会对所有现有**应用状态更新功能,无论它们是否在批处理中都有新数据。如果更新函数返回,None则将删除键值对。
mapWithState
mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。效率更高,建议使用这个.
下面看下具体的代码:
package spark
import kafka.PropertiesScalaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/**
* sparkstreaming的状态管理;
*/
object windowFunction {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("Spark Streaming Jason")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.blockInterval","50ms")
@transient
val scc = new StreamingContext(conf, Seconds(1))
scc.checkpoint("/home/jason/test")
val topic = PropertiesScalaUtils.loadProperties("topic_combine")
val topicSet = Set(topic)
val kafkaParams = Map[String, Object](
"auto.offset.reset" -> "latest", //latest;earliest
"value.deserializer" -> classOf[StringDeserializer]
, "key.deserializer" -> classOf[StringDeserializer]
, "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
, "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
, "enable.auto.commit" -> (true: java.lang.Boolean)
)
val kafkaStreams = KafkaUtils.createDirectStream[String, String](
scc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
val lines = kafkaStreams.map(_.value())
val word = lines.flatMap(_.split(" ")).map((_,1))
//--------------------------------updateStateByKey-----------------------------------------
val result1 = word.cache().updateStateByKey((currValues:Seq[Int],preValue:Option[Int])=>{
val sum = currValues.sum
Some(sum + preValue.getOrElse(0))
})
result1.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
rdd.foreachPartition(partition=>{
partition.foreach(pair=>{
println(pair._1 + "----------" + pair._2)
})
})
}
})
//------------------------------mapWithState-----------------------------------------------
val initialRDD = scc.sparkContext.parallelize(List[(String, Int)]())
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val result2 = word.cache().mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
result2.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
rdd.foreachPartition(partition=>{
partition.foreach(pair=>{
println(pair._1 + "----------" + pair._2)
})
})
}
})
scc.start()
scc.awaitTermination()
}
}
注意:我们一般建议使用mapWithState,因为他不用全量的更新,只用更新增量的,所以效率要比updateStateByKey高很多,这两个方法都必须开启chckpoint,因为他们需要把状态保存在checkpoint里面.
今天我们就不分析源码了,等有时间,在接着更新.今天就先写到这里吧.
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢