欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

jstorm源码之PartitionedTridentSpoutExecutor

程序员文章站 2022-07-13 15:45:49
...
一、作用
    Partition Spout对应的executor
二、源码分析
package storm.trident.spout;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;

// PartitionedTridentSpout的执行器
public class PartitionedTridentSpoutExecutor implements ITridentSpout<Integer> {
     // 内置一个spout对象 
    IPartitionedTridentSpout _spout;

    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) {
        _spout = spout;
    }

    public IPartitionedTridentSpout getPartitionedSpout() {
        return _spout;
    }

    // 
    class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
       // 协调器
        private IPartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map conf, TopologyContext context) {
            _coordinator = _spout.getCoordinator(conf, context);
        }

        //  若是当前的metadata为空则直接返回batch的partitions
        // 否则返回当前的metadata
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            if (currMetadata != null) {
                return currMetadata;
            } else {
                return _coordinator.getPartitionsForBatch();
            }
        }

        // 关闭协调器
        @Override
        public void close() {
            _coordinator.close();
        }
        //  协调成功操作
        @Override
        public void success(long txid) {
        }
         //  指定的事务是否准备ok
        @Override
        public boolean isReady(long txid) {
            return _coordinator.isReady(txid);
        }
    }
     // 静态内部类 emit的状态类
    static class EmitterPartitionState {
        // transaction state
        public RotatingTransactionalState rotatingState;
        //  spout partiton
        public ISpoutPartition partition;

        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            rotatingState = s;
            partition = p;
        }
    }
   // emit的操作
    class Emitter implements ITridentSpout.Emitter<Object> {
         // emit
        private IPartitionedTridentSpout.Emitter _emitter;
         // 事务
        private TransactionalState _state;
        // partition transaction state
        private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
        // emit索引
        private int _index;
        //  task 的总数
        private int _numTasks;

        public Emitter(String txStateId, Map conf, TopologyContext context) {
            _emitter = _spout.getEmitter(conf, context);
            _state = TransactionalState.newUserState(conf, txStateId);
            _index = context.getThisTaskIndex();
            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
        }
        //  记录coordinator的meta data
        Object _savedCoordinatorMeta = null;

         //  若是本地缓存的coordinator的meta数据为空或者和指定coordinatorMeta不同 则直接重新获取partitions
         //  同时清空原有的partition state
         //  根据原有的索引 遍历partitions  : 1、本地缓存  2、refresh partitions 3、更新本地coordinator
        @Override
        public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta, final TridentCollector collector) {
            if (_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
                _partitionStates.clear();
                List<ISpoutPartition> myPartitions = new ArrayList();
                for (int i = _index; i < partitions.size(); i += _numTasks) {
                    ISpoutPartition p = partitions.get(i);
                    String id = p.getId();
                    myPartitions.add(p);
                    _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
                }
                _emitter.refreshPartitions(myPartitions);
                _savedCoordinatorMeta = coordinatorMeta;
            }
            for (EmitterPartitionState s : _partitionStates.values()) {
                RotatingTransactionalState state = s.rotatingState;
                final ISpoutPartition partition = s.partition;
                Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer() {
                    @Override
                    public Object init(long txid, Object lastState) {
                        return _emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
                    }
                });
                // it's null if one of:
                // a) a later transaction batch was emitted before this, so we should skip this batch
                // b) if didn't exist and was created (in which case the StateInitializer was invoked and
                // it was emitted
                if (meta != null) {
                    _emitter.emitPartitionBatch(tx, collector, partition, meta);
                }
            }
        }
        //  提供成功的操作
        @Override
        public void success(TransactionAttempt tx) {
            for (EmitterPartitionState state : _partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
        }
         //  关闭state 和 emit
        @Override
        public void close() {
            _state.close();
            _emitter.close();
        }
    }

    @Override
    public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }

    @Override
    public Fields getOutputFields() {
        return _spout.getOutputFields();
    }
}