欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

jstorm源码之RotatingTransactionalState

程序员文章站 2022-07-13 15:50:28
...
一、作用
   构建一个Rotationg transaction的state类 用于完成partition的state管理及操作
二、源码分析
package storm.trident.topology.state;

import backtype.storm.utils.Utils;
import org.apache.zookeeper.KeeperException;

import java.util.HashSet;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

public class RotatingTransactionalState {
    // state intitle接口
    public static interface StateInitializer {
        Object init(long txid, Object lastState);
    }   
    // 事务状态  通过zookeeper来进行管理
    private TransactionalState _state;
    //  子目录
    private String _subdir;
    //  记录transactional id及其内容
    private TreeMap<Long, Object> _curr = new TreeMap<Long, Object>();

    public RotatingTransactionalState(TransactionalState state, String subdir) {
        _state = state;
        _subdir = subdir;
        state.mkdir(subdir);
        sync();
    }

   // 获取最近的transaction state 因为使用的TreeMap 最后一个元素即为最近的transaction state
    public Object getLastState() {
        if(_curr.isEmpty()) return null;
        else return _curr.lastEntry().getValue();
    }
    //  若是transaction state发生改变时需要更新对应的zookeeper对应节点的内容
    public void overrideState(long txid, Object state) {
        _state.setData(txPath(txid), state);
        _curr.put(txid, state);
    }
    //  根据指定的txid删除对应的transaction state内容
    public void removeState(long txid) {
        if(_curr.containsKey(txid)) {
            _curr.remove(txid);
            _state.delete(txPath(txid));
        }
    }
   // 根据执行txId获取本地对应的transaction state
    public Object getState(long txid) {
        return _curr.get(txid);
    }
    // 根据指定的txId 和 对应的初始化器 获取对应的transaction state
    public Object getState(long txid, StateInitializer init) {
        if(!_curr.containsKey(txid)) {
             // 获取小于指定txId的内容
            SortedMap<Long, Object> prevMap = _curr.headMap(txid);
             // 获取大于等于指定的txId的内容
            SortedMap<Long, Object> afterMap = _curr.tailMap(txid);           
            // 判断preMap是否为空 不为空获取最后一个key即为prev state
            Long prev = null;
            if(!prevMap.isEmpty()) prev = prevMap.lastKey();           
            // 
            Object data;
            if(afterMap.isEmpty()) {
                Object prevData;
                if(prev!=null) {
                    prevData = _curr.get(prev);
                } else {
                    prevData = null;
                }
                data = init.init(txid, prevData);
            } else {
                // ??????????
                data = null;
            }
            // 添加到本地
            _curr.put(txid, data);
            // 在zookeeper上创建对应的节点
            _state.setData(txPath(txid), data);
        }
        // 将对应的transaction state内容返回
        return _curr.get(txid);
    }

    public Object getPreviousState(long txid) {
        SortedMap<Long, Object> prevMap = _curr.headMap(txid);
        if(prevMap.isEmpty()) return null;
        else return prevMap.get(prevMap.lastKey());
    }
   // 判断本地cache是否存在
    public boolean hasCache(long txid) {
        return _curr.containsKey(txid);
    }

    /**
     * Returns null if it was created, the value otherwise.
     */
    public Object getStateOrCreate(long txid, StateInitializer init) {
        if(_curr.containsKey(txid)) {
            return _curr.get(txid);
        } else {
            getState(txid, init);
            return null;
        }
    }
   // 删除指定txId对应的node内容 包括两个部分:本地 和 zookeeper
    public void cleanupBefore(long txid) {
        SortedMap<Long, Object> toDelete = _curr.headMap(txid);
        for(long tx: new HashSet<Long>(toDelete.keySet())) {
            _curr.remove(tx);
            try {
                _state.delete(txPath(tx));
            } catch(RuntimeException e) {
                // Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
                // zookeeper reads are eventually consistent.
                if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                    throw e;
                }
            }
        }
    }
   // 同步获取指定subdir的所有子节点 并获取对应的内容 同时完成在本地进行保存
    private void sync() {
        List<String> txids = _state.list(_subdir);
        for(String txid_s: txids) {
            Object data = _state.getData(txPath(txid_s));
            _curr.put(Long.parseLong(txid_s), data);
        }
    }
   
    private String txPath(long tx) {
        return txPath("" + tx);
    }

    private String txPath(String tx) {
        return _subdir + "/" + tx;
    }   

}