欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink入门(十六) State

程序员文章站 2024-03-12 19:36:32
...

Flink的state主要是有两种:

  • Keyed State
    主要依赖于KeyedStream
  • Operator State
    non-keyed state

这两种存在于两种形式:

  • Managed State(由flink-runtime控制的结构化数据)
  • Raw State。

举例说明Managed State使用方法。

//用户点击方案的事件操作
DataStream<ItemIdClickedBO> itemClickedStream  。。。。

ItemIdClickedBO类为

import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class ItemIdClickedBO {

    private String unionId;//用户唯一标识符

    private String itemId;//方案唯一标识符
	。。。
}

需求,统计过去十分钟,用户点击方案的list集合。

//用户最近10分钟,点击过方案
DataStream<Tuple2<String, List<String>>> userClickStream = itemClickedStream.keyBy(ItemIdClickedBO::getUnionId).map(new UserLatestClickItemIdState());

UserLatestClickItemIdState函数为

import com.google.common.collect.Lists;
import com.tc.flink.analysis.label.output.ItemIdClickedBO;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.List;

public class UserLatestClickItemIdState extends RichMapFunction<ItemIdClickedBO, Tuple2<String, List<String>>> {
    
    private transient ListState<String> merges;

    @Override
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
        ListStateDescriptor<String> descriptor = new ListStateDescriptor<String>("user_latest_click", String.class);
        descriptor.enableTimeToLive(ttlConfig);
        merges = getRuntimeContext().getListState(descriptor);
        super.open(parameters);
    }

    @Override
    public Tuple2<String, List<String>> map(ItemIdClickedBO value) throws Exception {
        merges.add(value.getItemId());
        return  Tuple2.of(value.getUnionId(), Lists.newArrayList(merges.get()));
    }
}

StateTtlConfig设置10分钟TTL,我这里采用ListState满足需求,
从代码来看,还是简单的。

相关标签: flink state