Flink Broadcast State Pattern
程序员文章站
2022-07-14 13:42:34
...
Broadcast State Pattern(广播状态)
广播状态是Flink提供的第三种状态共享的场景。通常需要将一个吞吐量比较小的流中状态数据进行广播给下游的任务,另外一个流可以以只读的形式读取广播状态。
使用场景:
- DataStream链接BroadcastStream
//仅仅输出满足规则的数据
object FlinkBroadcastNonKeyStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//吞吐量高
val inputs = env.socketTextStream("train", 9999)
//定义需要广播流 吞吐量低
val bcsd = new MapStateDescriptor[String, String]("bcsd", createTypeInformation[String], createTypeInformation[String])
val broadcaststream = env.socketTextStream("train", 8888)
.broadcast(bcsd)
val tag = new OutputTag[String]("notmatch")
val dataStream = inputs.connect(broadcaststream)
.process(new UserDefineBroadcastProcessFunction(tag, bcsd))
dataStream.print("满足条件")
dataStream.getSideOutput(tag).print("不满足")
env.execute("Window")
}
}
class UserDefineBroadcastProcessFunction(tag:OutputTag[String],msd:MapStateDescriptor[String,String]) extends BroadcastProcessFunction[String,String,String] {
//处理正常流 高吞吐,通常在该方法读取广播状态
override def processElement(value: String,
ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext,
out: Collector[String]): Unit = {
//获取状态 只读
val readOnleMapstate = ctx.getBroadcastState(msd)
if(readOnleMapstate.contains("rule")){
val rule = readOnleMapstate.get("rule")
if(value.contains(rule)){
out.collect(rule+"\t"+value)
}else{
ctx.output(tag,value)
}
}else{ //边输出
ctx.output(tag,value)
}
}
//处理广播流,通常在这里修改需要广播的状态 低吞吐
override def processBroadcastElement(value: String,
ctx: BroadcastProcessFunction[String, String, String]#Context,
collector: Collector[String]): Unit = {
val mapstate = ctx.getBroadcastState(msd)
mapstate.put("rule",value)
}
}
- KeyedStream链接BroadcastStream
case class OrderItem(id:String,name:String,category:String,count:Int,price:Double)
case class Rule(category:String,threshold:Double)
case class User(id:String,name:String)
//基于用户的消费总额进行一些奖励制度
object FlinkBroadcastKeyStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//id name 品类 数量 单价 -- 订单项
//1 zhangsan 水果 2 4.5
//吞吐量高
val inputs = env.socketTextStream("train", 9999)
.map(line=>line.split(" "))
.map(ts=>OrderItem(ts(0),ts(1),ts(2),ts(3).toInt,ts(4).toDouble))
.keyBy(orderItem=>orderItem.category+":"+orderItem.id)
//品类 阈值 水果 8.0 -- 奖励
//定义需要广播流 吞吐量低
val bcsd = new MapStateDescriptor[String, Double]("bcsd", createTypeInformation[String], createTypeInformation[Double])
val broadcaststream = env.socketTextStream("train", 8888)
.map(line=>line.split(" "))
.map(ts=>Rule(ts(0),ts(1).toDouble))
.broadcast(bcsd)
val dataStream = inputs.connect(broadcaststream)
.process(new UserDefineKeyBroadcastProcessFunction(bcsd))
.print("奖励")
env.execute("Window")
}
}
class UserDefineKeyBroadcastProcessFunction(msd:MapStateDescriptor[String,Double]) extends KeyedBroadcastProcessFunction[String,OrderItem,Rule,User] {
var userTotalcCost:ReducingState[Double] = _
//总消费策略
override def open(parameters: Configuration): Unit = {
val rsd = new ReducingStateDescriptor[Double]("userTotalcCost", new ReduceFunction[Double] {
override def reduce(t: Double, t1: Double): Double = {
t + t1
}
}, createTypeInformation[Double])
userTotalcCost = getRuntimeContext.getReducingState(rsd)
}
override def processElement(value: OrderItem, ctx: KeyedBroadcastProcessFunction[String, OrderItem, Rule, User]#ReadOnlyContext, out: Collector[User]): Unit = {
//计算出当前类别下用户的总消费
userTotalcCost.add(value.count*value.price)
val ruleState = ctx.getBroadcastState(msd)
var u = User(value.id,value.name)
//设定奖励规则
if(ruleState!=null && ruleState.contains(value.category)){
if(userTotalcCost.get() >= ruleState.get(value.category)){//达到了奖励阈值
out.collect(u)
userTotalcCost.clear()
}else{
println("不满足条件:"+u+" 当前总消费:"+userTotalcCost.get()+"阈值:"+ruleState.contains(value.category))
}
}else{
println("该类别不执行奖励策略")
}
}
override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, OrderItem, Rule, User]#Context, collector: Collector[User]): Unit = {
val broadcastState = ctx.getBroadcastState(msd)
broadcastState.put(value.category,value.threshold)
}
}
推荐阅读
-
状态模式-State Pattern(Java实现)
-
Flink State 有可能代替数据库吗? hbase游戏数据结构
-
[flink]#31_扩展库:State Processor API
-
[flink]Flink State
-
Flink Broadcast State Pattern
-
flink的broadcast
-
Flink Kafka BroadCast
-
Flink Broadcast
-
1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast & Accumulators & Counters &Distributed Cache
-
设计模式之策略模式和状态模式(strategy pattern & state pattern)