欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

jstorm源码之 RichSpoutBatchExecutor

程序员文章站 2022-07-13 15:50:28
...
一、作用
RichSpoutBatchExecutor是IRichSpout及其子类Spout的executor执行器;主要包括:
        1、RichSpoutEmitter :用户tuple的emit
        2、RichSpoutCoordinator:用于spout的调度
        3、CaptureCollector: 用于获取spout的collect的field的

二、
package storm.trident.spout;

import backtype.storm.Config;

public class RichSpoutBatchExecutor implements ITridentSpout {
    public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";

    IRichSpout _spout;
    // 构造RichSpouBatchExecutor 初始化Spout
    public RichSpoutBatchExecutor(IRichSpout spout) {
        _spout = spout;
    }

    // 获取Spout配置信息
    @Override
    public Map getComponentConfiguration() {
        return _spout.getComponentConfiguration();
    }

    // 获取向下级投递的stream fields
    @Override
    public Fields getOutputFields() {
        return TridentUtils.getSingleOutputStreamFields(_spout);

    }
    // 构建batch的调度器
    @Override
    public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new RichSpoutCoordinator();
    }

   //  构建emitter
    @Override
    public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new RichSpoutEmitter(conf, context);
    }

    // Emitter的实现
    //  指定emit的参数
    class RichSpoutEmitter implements ITridentSpout.Emitter<Object> {
        // batch的大小
        int _maxBatchSize;
        // 是否完成初始化
        boolean prepared = false;
        // filed collect
        CaptureCollector _collector;
        // 记录fail的id
        RotatingMap<Long, List<Object>> idsMap;
        //  spout 的配置
        Map _conf;
        // topology的上下文
        TopologyContext _context;
        // 记录emit上一次的时间
        long lastRotate = System.currentTimeMillis();
        //  记录emit的当前时间  主要是根据message timeout进行计算
        long rotateTime;

        public RichSpoutEmitter(Map conf, TopologyContext context) {
            _conf = conf;
            _context = context;
             // spout的batch的size 根据配置文件中max_batch_size的内容得来的
            Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
            //  默认大小为1000
            if (batchSize == null)
                batchSize = 1000;
            _maxBatchSize = batchSize.intValue();            
            _collector = new CaptureCollector();
            idsMap = new RotatingMap(3);
            rotateTime = 1000L * ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        }

        // batch执行emit操作
        //  获取batch对应的transaction的id 和 当前系统时间与lastRotate的差值 
       //   若是超过timeout指定的时间即认为batch的emit的操作失败  通过循环遍历idsMap中最后一个桶中 并将对应桶中的id执行fail   最后更新lastRotate对应的时间  同时也要将这一batch的txId执行fail
        //   若是没有超时或者提出超时的那部分id 
        @Override
        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            long txid = tx.getTransactionId();

            long now = System.currentTimeMillis();
            if (now - lastRotate > rotateTime) {
                Map<Long, List<Object>> failed = idsMap.rotate();
                for (Long id : failed.keySet()) {
                    // TODO: this isn't right... it's not in the map anymore
                    fail(id);
                }
                lastRotate = now;
            }

            if (idsMap.containsKey(txid)) {
                fail(txid);
            }
             // 重置collect 使用batch的collector  同时ids重置
            _collector.reset(collector);
             //  判断当前spout是否已经完成初始化 否则执行初始化  调整状态
            if (!prepared) {
                _spout.open(_conf, _context, new SpoutOutputCollector(_collector));
                prepared = true;
            }
            //  循环判断batchs中的batch是否处理完成  根据collect中的emitted的个数和当前的下标进行比对 若是小于当前下标则为后续的batch未处理
            //  最终将当前完成的id存放idsMap
            for (int i = 0; i < _maxBatchSize; i++) {
                _spout.nextTuple();
                if (_collector.numEmitted < i) {
                    break;
                }
            }
            idsMap.put(txid, _collector.ids);

        }

        // batch指定成功后 执行ack
        @Override
        public void success(TransactionAttempt tx) {
            ack(tx.getTransactionId());
        }
        // 循环执行spout的ack
        private void ack(long batchId) {
            List<Object> ids = (List<Object>) idsMap.remove(batchId);
            if (ids != null) {
                for (Object id : ids) {
                    _spout.ack(id);
                }
            }
        }
       // 循环执行spout的fail
        private void fail(long batchId) {
            List<Object> ids = (List<Object>) idsMap.remove(batchId);
            if (ids != null) {
                for (Object id : ids) {
                    _spout.fail(id);
                }
            }
        }

        // 关闭spout
        @Override
        public void close() {
            _spout.close();
        }

    }

    // spout的coordinator
    class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator {
         // 事务初始化
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            return null;
        }
        // 事务完成
        @Override
        public void success(long txid) {
        }
        //  是否已经准备
        @Override
        public boolean isReady(long txid) {
            return true;
        }
         //  关闭
        @Override
        public void close() {
        }
    }

    //  spout的collect  主要是针对batch的操作 进行collect的
    static class CaptureCollector implements ISpoutOutputCollector {
      
        TridentCollector _collector;
        public List<Object> ids;
        public int numEmitted;

        public void reset(TridentCollector c) {
            _collector = c;
            ids = new ArrayList<Object>();
        }

        @Override
        public void reportError(Throwable t) {
            _collector.reportError(t);
        }

        @Override
        public List<Integer> emit(String stream, List<Object> values, Object id) {
            if (id != null)
                ids.add(id);
            numEmitted++;
            _collector.emit(values);
            return null;
        }

        @Override
        public void emitDirect(int task, String stream, List<Object> values, Object id) {
            throw new UnsupportedOperationException("Trident does not support direct streams");
        }

    }

}