Flink Queryable State
Queryable State
Archiecture
Client连接其中的⼀个代理服务器然后发送查询请求给Proxy服务器,查询指定key所对应的状态数据,底层Flink按照KeyGroup的⽅式管理Keyed State,这些KeyGroup被分配给了所有的TaskMnager的服务。每个TaskManage服务有多个KeyGroup状态的存储。为了找到查询key所在的KeyGroup所属地TaskManager服务,Proxy服务会去询问JobManager查询TaskManager的信息,然后直接访问TaskManager上的QueryableStateServer服务器获取状态数据,最后将获取的状态数据返回给Client端。
- QueryableStateClient
运行在Flink集群以外,负责提交用户的查询给Flink集群。 - QueryableStateClientProxy
运行在Flink集群中的TaskManager中的一个代理服务,负责接收客户端的查询,代理负责相应TaskManager获取请求的state,并将其state返回给客户端。 - QueryableStateServer
运行在Flink集群中的TaskManager中的服务,仅仅负责读取当前TaskManager主机上存储到的状态数据。
总结:Client会向
QueryableStateClientProxy
发出查询请求,QueryableStateClientProxy
会先在当前主机上查询key,如果没有就会去通知JobManager
,JobManager
可以查询元数据信息,进而查询到key所在的位置,并将结果告知QueryableStateClientProxy
,之后会通知给QueryableStateServer
,由QueryableStateServer
去读取状态数据。
**Queryable State
1.将Flink的 opt/ 拷⻉ flink-queryable-state-runtime_2.11-1.10.0.jar 到Flink的lib/ ⽬录.
[root@CentOS flink-1.10.0]# cp opt/flink-queryable-state-runtime_2.11-1.10.0.jar lib/
2.在Flink的flink-conf.yaml配置⽂件中添加以下配置
queryable-state.enable: true
3.重启Flink服务,为了校验服务是否开启你可以查看task manager⽇志,可以看到 "Started theQueryable State Proxy Server @ ..."
.
Making State Queryable
为了使State对外界可见,需要使用以下命令显示地使其可查询:
- 创建QueryableStateStream,该QueryableStateStream充当一个Sink的输出,仅仅是将数据存储到state中。
- 或者stateDescriptor.setQueryable(String queryableStateName)方法使得我们的状态可查询。
Queryable State Stream
用户可以调用keyedstream的.asQueryableState(stateName, stateDescriptor)方法,提供一个可以查询的状态。
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
Note: There is no queryable ListState sink as it would result in an ever-growing list which may not be cleaned up and thus will eventually consume too much memory.
没有可查询的ListState接收器,因为它将导致不断增长的列表,该列表可能无法清除,因此最终将消耗过多的内存。
返回的QueryableStateStream可以看做是一个Sink,所以无法对QueryableStateStream进一步转换。在内部,QueryableStateStream被转换为运算符,该运算符使用所有传入记录来更新可查询状态实例。更新逻辑由asQueryableState调用中提供的StateDescriptor的类型隐含。在类似以下的程序中,keyedstream的所有记录将通过ValueState.update(value)用于更新状态实例:
stream.keyBy(0).asQueryableState("query-name")
object FlinkWordCountQueryableStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//间隔5s执行一次checkpoint 精准一次
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)
//设置检查点超时 4s
env.getCheckpointConfig.setCheckpointTimeout(4000)
//开启本次检查点 与上一次完成的检查点时间间隔不得小于2s 优先级高于 checkpoint interval
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(2000)
//如果检查点失败,任务宣告退出
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
//设置如果任务取消,系统该如何处理检查点数据
//RETAIN_ON_CANCELLATION:如果取消任务的时候,没有加--savepoint,系统会保留检查点数据
//DELETE_ON_CANCELLATION:取消任务,⾃动是删除检查点(不建议使⽤)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t._1, t._2 + t1._2)
}
}, createTypeInformation[(String, Int)])
env.socketTextStream("train",9999)
.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.asQueryableState("wordcount",rsd) //状态名字,后期查询需要
env.execute("Window")
}
}
查询
同步获取结果
object QueryableState {
def main(args: Array[String]): Unit = {
//链接proxy
val client = new QueryableStateClient("train",9069)
val jobID = JobID.fromHexString("8e430a949a7aff3f1ffc16e4224fc978")
var queryName = "wordcount" //状态名字
var queryKey = "this" //用户需要查询的key
val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t._1, t._2 + t1._2)
}
}, createTypeInformation[(String, Int)])
val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],rsd)
//同步获取结果
val state = resultFuture.get()
println("结果:"+state.get())
client.shutdownAndWait()
}
}
异步获取结果
object QueryableState {
def main(args: Array[String]): Unit = {
//链接proxy
val client = new QueryableStateClient("train",9069)
val jobID = JobID.fromHexString("8e430a949a7aff3f1ffc16e4224fc978")
var queryName = "wordcount" //状态名字
var queryKey = "this" //用户需要查询的key
val rsd = new ReducingStateDescriptor[(String, Int)]("reducestate", new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t._1, t._2 + t1._2)
}
}, createTypeInformation[(String, Int)])
val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],rsd)
//同步获取结果
val state = resultFuture.thenAccept(new Consumer[ReducingState[(String, Int)]] {
override def accept(t: ReducingState[(String, Int)]): Unit = {
println("结果:"+t.get())
}
})
Thread.sleep(10000)
client.shutdownAndWait()
}
}
stateDescriptor.setQueryable(String queryableStateName)
object FlinkWordCountValueState {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("train",9999)
val counts = text.flatMap(_.split(" "))
.map(word=>(word,1))
.keyBy(0)
.map(new WordCountMapFunction)
counts.print()
env.execute()
}
}
class WordCountMapFunction extends RichMapFunction[(String,Int),(String,Int)] {
var vs:ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
//创建对应状态描述符
val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])
vsd.setQueryable("query-wc")
//获取RuntimeContext
val context = getRuntimeContext
//获取指定类型状态
vs = context.getState(vsd)
}
override def map(value: (String, Int)): (String, Int) = {
//获取历史值
val historyData = vs.value()
//更新状态
vs.update(historyData+value._2)
//返回最新状态
(value._1,vs.value())
}
}
同步获取结果
object QueryableState {
def main(args: Array[String]): Unit = {
//链接proxy
val client = new QueryableStateClient("train",9069)
val jobID = JobID.fromHexString("ccfd376e18bb81be5c6e1b40ddb66837")
var queryName = "query-wc" //状态名字
var queryKey = "this" //用户需要查询的key
val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])
val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],vsd)
//同步获取结果
val state:ValueState[Int] = resultFuture.get()
println("结果:"+state.value())
client.shutdownAndWait()
}
}
异步获取结果
object QueryableState {
def main(args: Array[String]): Unit = {
//链接proxy
val client = new QueryableStateClient("train",9069)
val jobID = JobID.fromHexString("ccfd376e18bb81be5c6e1b40ddb66837")
var queryName = "query-wc" //状态名字
var queryKey = "this" //用户需要查询的key
val vsd = new ValueStateDescriptor[Int]("wordcount",createTypeInformation[Int])
val resultFuture = client.getKvState(jobID,queryName,queryKey,createTypeInformation[String],vsd)
//同步获取结果
val state = resultFuture.thenAccept(new Consumer[ValueState[Int]] {
override def accept(value: ValueState[Int]): Unit = {
println("结果:"+value.value())
}
})
Thread.sleep(10000)
client.shutdownAndWait()
}
}
上一篇: 阿里巴巴的愿景,使命和价值观