欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[flink]Flink State

程序员文章站 2022-07-14 21:47:46
...

state 中存储着每条数据消费后数据的消费点 (生产环境需要持久化这些状态),当 Job 因为某种错误或者其他原因导致重启时,就能够从 checkpoint(定时将 state 做一个全局快照,在 Flink 中,为了能够让 Job 在运行的过程中保证容错 性,才会对这些 state 做一个快照,在 4.3 节中会详细讲) 中的 state 数据进行恢复

  • Keyed State 总是和具体的 key 相关联,也只能在 KeyedStream 的 function 和 operator 上使用。你 可以将 Keyed State 当作是 Operator State 的一种特例,但是它是被分区或分片的。每个 Keyed State 分区对应一个 key 的 Operator State,对于某个 key 在某个分区上有唯一的状态。
  • 每个 operator state 都对应着一个并行实例。Kafka Connector 就是一个很 好的例子。每个 Kafka consumer 的并行实例都会持有一份topic partition 和 offset 的 map,这个map 就是它的 Operator State。

如何使用托管 Keyed State

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

//ValueState 使用方式,第一个字段是 count,第二个字段是运行的和 private transient ValueState<Tuple2<Long, Long>> sum;

	@Override
	public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

//访问状态的 value 值 Tuple2<Long, Long> currentSum = sum.value();

//更新 count currentSum.f0 += 1;

//更新 sum currentSum.f1 += input.f1;

//更新状态 sum.update(currentSum);

//如果 count 等于 2, 发出平均值并清除状态 if (currentSum.f0 >= 2) {

		out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));

		sum.clear(); 
	}

	

	@Override
	public void open(Configuration config) { 
		ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", //状态名称 
		TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), //类型信息 
		Tuple2.of(0L, 0L)); //状态的默认值

		sum = getRuntimeContext().getState(descriptor);//获取状态

} }

	env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))

.keyBy(0) .flatMap(new CountWindowAverage()) .print();

//结果会打印出 (1,4) 和 (1,5)
相关标签: BigData