Flink入门(十六) State
程序员文章站
2024-03-12 19:36:32
...
Flink的state主要是有两种:
- Keyed State
主要依赖于KeyedStream - Operator State
non-keyed state
这两种存在于两种形式:
- Managed State(由flink-runtime控制的结构化数据)
- Raw State。
举例说明Managed State使用方法。
//用户点击方案的事件操作
DataStream<ItemIdClickedBO> itemClickedStream 。。。。
ItemIdClickedBO类为
import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class ItemIdClickedBO {
private String unionId;//用户唯一标识符
private String itemId;//方案唯一标识符
。。。
}
需求,统计过去十分钟,用户点击方案的list集合。
//用户最近10分钟,点击过方案
DataStream<Tuple2<String, List<String>>> userClickStream = itemClickedStream.keyBy(ItemIdClickedBO::getUnionId).map(new UserLatestClickItemIdState());
UserLatestClickItemIdState函数为
import com.google.common.collect.Lists;
import com.tc.flink.analysis.label.output.ItemIdClickedBO;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.List;
public class UserLatestClickItemIdState extends RichMapFunction<ItemIdClickedBO, Tuple2<String, List<String>>> {
private transient ListState<String> merges;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ListStateDescriptor<String> descriptor = new ListStateDescriptor<String>("user_latest_click", String.class);
descriptor.enableTimeToLive(ttlConfig);
merges = getRuntimeContext().getListState(descriptor);
super.open(parameters);
}
@Override
public Tuple2<String, List<String>> map(ItemIdClickedBO value) throws Exception {
merges.add(value.getItemId());
return Tuple2.of(value.getUnionId(), Lists.newArrayList(merges.get()));
}
}
StateTtlConfig设置10分钟TTL,我这里采用ListState满足需求,
从代码来看,还是简单的。