[flink]#31_扩展库:State Processor API
程序员文章站
2022-07-14 21:52:47
...
使用 DataSet 读取作业状态
读取现有的 Savepoint
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend());
//读取 Operator State 时,只需指定算子的 uid、状态名称和类型信息。
DataSet<Integer> listState = savepoint.readListState("zhisheng-uid", "list-state", Types.INT);
DataSet<Integer> unionState = savepoint.readUnionState("zhisheng-uid", "unionstate", Types.INT);
DataSet<Tuple2<Integer, Integer>> broadcastState =savepoint.readBroadcastState("zhisheng-uid", "broadcast-state", Types.INT, Types.INT);
//在状态描述符(StateDescriptor)中使用了自定义类型序列化器 TypeSerializer
DataSet<Integer> listState = savepoint.readListState( "zhisheng-uid", "list-state", Types.INT, new MyCustomIntSerializer());
写入新的 Savepoint
public class Account { public int id;
public double amount;
public long timestamp;
}
public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> { ValueState<Double> state;
@Override public void open(Configuration parameters) {
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<> ("total",Types.DOUBLE);
state = getRuntimeContext().getState(descriptor);
}
@Override public void processElement(Account value, Context ctx) throws Exception {
state.update(value.amount);
}
}
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
BootstrapTransformation<Account> transformation = OperatorTransformation .bootstrapWith(accountDataSet) .keyBy(acc -> acc.id) .transform(new AccountBootstrapper());
Savepoint
.create(backend, 128) .withOperator("uid1", transformation1) .withOperator("uid2", transformation2) .write(savepointPath);
修改现有的 Savepoint
Savepoint
.load(backend, oldPath) .withOperator("uid", transformation) .write(newPath);
Savepoint
.removeOperator(oldOperatorUid) .withOperator(oldOperatorUid, transformation) .write(path)