欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Broadcast State Pattern

程序员文章站 2022-07-14 13:42:34
...

Broadcast State Pattern(广播状态)

广播状态是Flink提供的第三种状态共享的场景。通常需要将一个吞吐量比较小的流中状态数据进行广播给下游的任务,另外一个流可以以只读的形式读取广播状态。

使用场景:

  • DataStream链接BroadcastStream
//仅仅输出满足规则的数据
object FlinkBroadcastNonKeyStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //吞吐量高
    val inputs = env.socketTextStream("train", 9999)

    //定义需要广播流  吞吐量低
    val bcsd = new MapStateDescriptor[String, String]("bcsd", createTypeInformation[String], createTypeInformation[String])

    val broadcaststream = env.socketTextStream("train", 8888)
      .broadcast(bcsd)

    val tag = new OutputTag[String]("notmatch")

    val dataStream = inputs.connect(broadcaststream)
      .process(new UserDefineBroadcastProcessFunction(tag, bcsd))

    dataStream.print("满足条件")
    dataStream.getSideOutput(tag).print("不满足")

    env.execute("Window")

  }
}

class UserDefineBroadcastProcessFunction(tag:OutputTag[String],msd:MapStateDescriptor[String,String]) extends BroadcastProcessFunction[String,String,String] {
  //处理正常流  高吞吐,通常在该方法读取广播状态
  override def processElement(value: String,
                              ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext,
                              out: Collector[String]): Unit = {
    //获取状态  只读
    val readOnleMapstate = ctx.getBroadcastState(msd)
    if(readOnleMapstate.contains("rule")){
      val rule = readOnleMapstate.get("rule")
      if(value.contains(rule)){
        out.collect(rule+"\t"+value)
      }else{
        ctx.output(tag,value)
      }
    }else{ //边输出
      ctx.output(tag,value)
    }

  }
  //处理广播流,通常在这里修改需要广播的状态  低吞吐
  override def processBroadcastElement(value: String,
                                       ctx: BroadcastProcessFunction[String, String, String]#Context,
                                       collector: Collector[String]): Unit = {
    val mapstate = ctx.getBroadcastState(msd)
    mapstate.put("rule",value)
  }
}
  • KeyedStream链接BroadcastStream
case class OrderItem(id:String,name:String,category:String,count:Int,price:Double)
case class Rule(category:String,threshold:Double)
case class User(id:String,name:String)
//基于用户的消费总额进行一些奖励制度
object FlinkBroadcastKeyStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //id name 品类 数量 单价   --  订单项
    //1 zhangsan 水果 2 4.5
    //吞吐量高
    val inputs = env.socketTextStream("train", 9999)
      .map(line=>line.split(" "))
      .map(ts=>OrderItem(ts(0),ts(1),ts(2),ts(3).toInt,ts(4).toDouble))
      .keyBy(orderItem=>orderItem.category+":"+orderItem.id)


    //品类 阈值    水果 8.0    --  奖励
    //定义需要广播流  吞吐量低
    val bcsd = new MapStateDescriptor[String, Double]("bcsd", createTypeInformation[String], createTypeInformation[Double])

    val broadcaststream = env.socketTextStream("train", 8888)
      .map(line=>line.split(" "))
      .map(ts=>Rule(ts(0),ts(1).toDouble))
      .broadcast(bcsd)


    val dataStream = inputs.connect(broadcaststream)
      .process(new UserDefineKeyBroadcastProcessFunction(bcsd))
        .print("奖励")


    env.execute("Window")

  }
}

class UserDefineKeyBroadcastProcessFunction(msd:MapStateDescriptor[String,Double]) extends KeyedBroadcastProcessFunction[String,OrderItem,Rule,User] {
  var userTotalcCost:ReducingState[Double] = _

  //总消费策略
  override def open(parameters: Configuration): Unit = {
    val rsd = new ReducingStateDescriptor[Double]("userTotalcCost", new ReduceFunction[Double] {
      override def reduce(t: Double, t1: Double): Double = {
        t + t1
      }
    }, createTypeInformation[Double])


    userTotalcCost = getRuntimeContext.getReducingState(rsd)
  }


  override def processElement(value: OrderItem, ctx: KeyedBroadcastProcessFunction[String, OrderItem, Rule, User]#ReadOnlyContext, out: Collector[User]): Unit = {
    //计算出当前类别下用户的总消费
    userTotalcCost.add(value.count*value.price)

    val ruleState = ctx.getBroadcastState(msd)

    var u = User(value.id,value.name)

    //设定奖励规则
    if(ruleState!=null && ruleState.contains(value.category)){
      if(userTotalcCost.get() >= ruleState.get(value.category)){//达到了奖励阈值
        out.collect(u)
        userTotalcCost.clear()
      }else{
        println("不满足条件:"+u+" 当前总消费:"+userTotalcCost.get()+"阈值:"+ruleState.contains(value.category))
      }
    }else{
      println("该类别不执行奖励策略")
    }
  }

  override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, OrderItem, Rule, User]#Context, collector: Collector[User]): Unit = {
    val broadcastState = ctx.getBroadcastState(msd)
    broadcastState.put(value.category,value.threshold)
  }
}
相关标签: Flink Flink State