[flink]Flink State
程序员文章站
2022-07-14 21:47:46
...
state 中存储着每条数据消费后数据的消费点 (生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint(定时将 state 做一个全局快照,在 Flink 中,为了能够让 Job 在运行的过程中保证容错 性,才会对这些 state 做一个快照,在 4.3 节中会详细讲) 中的 state 数据进行恢复
- Keyed State 总是和具体的 key 相关联,也只能在 KeyedStream 的 function 和 operator 上使用。你 可以将 Keyed State 当作是 Operator State 的一种特例,但是它是被分区或分片的。每个 Keyed State 分区对应一个 key 的 Operator State,对于某个 key 在某个分区上有唯一的状态。
- 每个 operator state 都对应着一个并行实例。Kafka Connector 就是一个很 好的例子。每个 Kafka consumer 的并行实例都会持有一份topic partition 和 offset 的 map,这个map 就是它的 Operator State。
如何使用托管 Keyed State
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
//ValueState 使用方式,第一个字段是 count,第二个字段是运行的和 private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
//访问状态的 value 值 Tuple2<Long, Long> currentSum = sum.value();
//更新 count currentSum.f0 += 1;
//更新 sum currentSum.f1 += input.f1;
//更新状态 sum.update(currentSum);
//如果 count 等于 2, 发出平均值并清除状态 if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", //状态名称
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), //类型信息
Tuple2.of(0L, 0L)); //状态的默认值
sum = getRuntimeContext().getState(descriptor);//获取状态
} }
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0) .flatMap(new CountWindowAverage()) .print();
//结果会打印出 (1,4) 和 (1,5)