欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Queryable State

程序员文章站 2022-05-30 23:40:00
...

Queryable State

Archiecture

Client连接其中的⼀个代理服务器然后发送查询请求给Proxy服务器,查询指定key所对应的状态数据,底层Flink按照KeyGroup的⽅式管理Keyed State,这些KeyGroup被分配给了所有的TaskMnager的服务。每个TaskManage服务有多个KeyGroup状态的存储。为了找到查询key所在的KeyGroup所属地TaskManager服务,Proxy服务会去询问JobManager查询TaskManager的信息,然后直接访问TaskManager上的QueryableStateServer服务器获取状态数据,最后将获取的状态数据返回给Client端。
Flink Queryable State

  • QueryableStateClient
    运行在Flink集群以外,负责提交用户的查询给Flink集群。
  • QueryableStateClientProxy
    运行在Flink集群中的TaskManager中的一个代理服务,负责接收客户端的查询,代理负责相应TaskManager获取请求的state,并将其state返回给客户端。
  • QueryableStateServer
    运行在Flink集群中的TaskManager中的服务,仅仅负责读取当前TaskManager主机上存储到的状态数据。

总结:Client会向QueryableStateClientProxy发出查询请求,QueryableStateClientProxy会先在当前主机上查询key,如果没有就会去通知JobManagerJobManager可以查询元数据信息,进而查询到key所在的位置,并将结果告知QueryableStateClientProxy,之后会通知给QueryableStateServer,由QueryableStateServer去读取状态数据。

**Queryable State

1.将Flink的 opt/ 拷⻉ flink-queryable-state-runtime_2.11-1.10.0.jar 到Flink的lib/ ⽬录.

[root@CentOS flink-1.10.0]# cp opt/flink-queryable-state-runtime_2.11-1.10.0.jar lib/

2.在Flink的flink-conf.yaml配置⽂件中添加以下配置

queryable-state.enable: true

3.重启Flink服务,为了校验服务是否开启你可以查看task manager⽇志,可以看到 "Started theQueryable State Proxy Server @ ...".

Flink Queryable State

Making State Queryable

为了使State对外界可见,需要使用以下命令显示地使其可查询:

  • 创建QueryableStateStream,该QueryableStateStream充当一个Sink的输出,仅仅是将数据存储到state中。
  • 或者stateDescriptor.setQueryable(String queryableStateName)方法使得我们的状态可查询。

Queryable State Stream
用户可以调用keyedstream的.asQueryableState(stateName, stateDescriptor)方法,提供一个可以查询的状态。

// ValueState
QueryableStateStream asQueryableState(
 String queryableStateName,
 ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
 String queryableStateName,
 FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
 String queryableStateName,
 ReducingStateDescriptor stateDescriptor)

Note: There is no queryable ListState sink as it would result in an ever-growing list which may not be cleaned up and thus will eventually consume too much memory.
没有可查询的ListState接收器,因为它将导致不断增长的列表,该列表可能无法清除,因此最终将消耗过多的内存。

返回的QueryableStateStream可以看做是一个Sink,所以无法对QueryableStateStream进一步转换。在内部,QueryableStateStream被转换为运算符,该运算符使用所有传入记录来更新可查询状态实例。更新逻辑由asQueryableState调用中提供的StateDescriptor的类型隐含。在类似以下的程序中,keyedstream的所有记录将通过ValueState.update(value)用于更新状态实例:

stream.keyBy(0).asQueryableState("query-name")
object FlinkWordCountQueryableStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //间隔5s执行一次checkpoint  精准一次
    env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
    //设置检查点超时  4s
    env.getCheckpointConfig.setCheckpointTimeout(4000)
    //开启本次检查点  与上一次完成的检查点时间间隔不得小于2s  优先级高于 checkpoint interval
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
    //如果检查点失败,任务宣告退出
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
    //设置如果任务取消,系统该如何处理检查点数据
    //RETAIN_ON_CANCELLATION:如果取消任务的时候,没有加--savepoint,系统会保留检查点数据
    //DELETE_ON_CANCELLATION:取消任务,⾃动是删除检查点(不建议使⽤)
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)


    val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
      override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
        (t._1, t._2 + t1._2)
      }
    }, createTypeInformation[(String, Int)])

    env.socketTextStream("train",9999)
        .flatMap(line=>line.split(" "))
        .map(word=>(word,1))
        .keyBy(0)
        .asQueryableState("wordcount",rsd) //状态名字,后期查询需要

    env.execute("Window")

  }
}

查询
同步获取结果

object QueryableState {
  def main(args: Array[String]): Unit = {
    //链接proxy
    val client = new QueryableStateClient("train",9069)
    val jobID = JobID.fromHexString("8e430a949a7aff3f1ffc16e4224fc978")
    var queryName = "wordcount" //状态名字
    var queryKey = "this" //用户需要查询的key

    val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
      override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
        (t._1, t._2 + t1._2)
      }
    }, createTypeInformation[(String, Int)])

    val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],rsd)

    //同步获取结果
    val state = resultFuture.get()

    println("结果:"+state.get())

    client.shutdownAndWait()

  }
}

异步获取结果

object QueryableState {
  def main(args: Array[String]): Unit = {
    //链接proxy
    val client = new QueryableStateClient("train",9069)
    val jobID = JobID.fromHexString("8e430a949a7aff3f1ffc16e4224fc978")
    var queryName = "wordcount" //状态名字
    var queryKey = "this" //用户需要查询的key

    val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
      override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
        (t._1, t._2 + t1._2)
      }
    }, createTypeInformation[(String, Int)])

    val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],rsd)

    //同步获取结果
    val state = resultFuture.thenAccept(new Consumer[ReducingState[(String, Int)]] {
      override def accept(t: ReducingState[(String, Int)]): Unit = {
        println("结果:"+t.get())
      }
    })

    Thread.sleep(10000)

    client.shutdownAndWait()

  }
}

stateDescriptor.setQueryable(String queryableStateName)

object FlinkWordCountValueState {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("train",9999)

    val counts = text.flatMap(_.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
        .map(new WordCountMapFunction)

    counts.print()

    env.execute()
  }
}

class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)] {

  var vs:ValueState[Int] = _

  override def open(parameters: Configuration): Unit = {
    //创建对应状态描述符
    val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])

    vsd.setQueryable("query-wc")

    //获取RuntimeContext
    val context = getRuntimeContext

    //获取指定类型状态
    vs = context.getState(vsd)
  }
  override def map(value: (String, Int)): (String, Int) = {
    //获取历史值
    val historyData = vs.value()

    //更新状态
    vs.update(historyData+value._2)

    //返回最新状态
    (value._1,vs.value())
  }
}

同步获取结果

object QueryableState {
  def main(args: Array[String]): Unit = {
    //链接proxy
    val client = new QueryableStateClient("train",9069)
    val jobID = JobID.fromHexString("ccfd376e18bb81be5c6e1b40ddb66837")
    var queryName = "query-wc" //状态名字
    var queryKey = "this" //用户需要查询的key

    val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])

    val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],vsd)

    //同步获取结果
    val state:ValueState[Int] = resultFuture.get()
    
    println("结果:"+state.value())

    client.shutdownAndWait()

  }
}

异步获取结果

object QueryableState {
  def main(args: Array[String]): Unit = {
    //链接proxy
    val client = new QueryableStateClient("train",9069)
    val jobID = JobID.fromHexString("ccfd376e18bb81be5c6e1b40ddb66837")
    var queryName = "query-wc" //状态名字
    var queryKey = "this" //用户需要查询的key

    val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])

    val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],vsd)

    //同步获取结果
    val state = resultFuture.thenAccept(new Consumer[ValueState[Int]] {
      override def accept(value: ValueState[Int]): Unit = {
        println("结果:"+value.value())
      }

    })

    Thread.sleep(10000)

    client.shutdownAndWait()

  }
}
相关标签: Flink State