欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

jstorm源码之RotatingMap

程序员文章站 2022-07-13 15:44:55
...
一、作用
      基于LinkedList + HashMap<K,V>实现一个循环Map
二、源码
// 通过结合LinkedList 和 HashMap 构成一个循环Map数据结构
public class RotatingMap<K, V> {
    // this default ensures things expire at most 50% past the expiration time
    //  默认bucket数目为3
    private static final int DEFAULT_NUM_BUCKETS = 3;
   
    // 定义内部接口  key 到期的回调操作
    public static interface ExpiredCallback<K, V> {
        public void expire(K key, V val);
    }
    // 通过链表维护  一个 buckets 结构
    private LinkedList<HashMap<K, V>> _buckets;

    private ExpiredCallback _callback;
   //  构造RotationgMap的 bucket数目不能少于2
    public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
        if (numBuckets < 2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for (int i = 0; i < numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }

        _callback = callback;
    }

    public RotatingMap(ExpiredCallback<K, V> callback) {
        this(DEFAULT_NUM_BUCKETS, callback);
    }

    public RotatingMap(int numBuckets) {
        this(numBuckets, null);
    }
   
    // 执行rotate
    // 首先获取LinkedList 末尾元素
    //  其次在first位置 添加占位新元素
    //  结合expire的callback
    //  最末尾的batch多半是时间最久 极可能是超出有效期的
    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if (_callback != null) {
            for (Entry<K, V> entry : dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }
   
    // 是否存在key
    public boolean containsKey(K key) {
        for (HashMap<K, V> bucket : _buckets) {
            if (bucket.containsKey(key)) {
                return true;
            }
        }
        return false;
    }

   // 获取value
    public V get(K key) {
        for (HashMap<K, V> bucket : _buckets) {
            if (bucket.containsKey(key)) {
                return bucket.get(key);
            }
        }
        return null;
    }

    // 添加item
    public void put(K key, V value) {
        Iterator<HashMap<K, V>> it = _buckets.iterator();
        HashMap<K, V> bucket = it.next();
        bucket.put(key, value);
        while (it.hasNext()) {
            bucket = it.next();
            bucket.remove(key);
        }
    }

    // 移除item
    public Object remove(K key) {
        for (HashMap<K, V> bucket : _buckets) {
            if (bucket.containsKey(key)) {
                return bucket.remove(key);
            }
        }
        return null;
    }

    // 总容量
    public int size() {
        int size = 0;
        for (HashMap<K, V> bucket : _buckets) {
            size += bucket.size();
        }
        return size;
    }
}