欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

disruptor(一) 单一生产者和WorkPool消费者源码阅读

程序员文章站 2022-05-07 20:30:25
...



 最近项目中有用到disruptor,提供一个类似队列或者数据容器的功能,并发能力很强

 

概念:

Sequence:就是一个增长序列,类似oracle的增长序列,生产和消费程序都有Sequence,记录生产和消费程序的序列

Sequencer: 多个概念的一个组合,持有Sequence,等待策略等一些引用,生产者引用

SequenceBarrier:直接翻译就是序列屏障,就是Sequence和RingBuffer交互的一个屏障,单个生产者时,生产者不需要SequenceBarrier

RingBuffer:是一个数组,根据追踪生产和消费的Sequence来实现一个环形的数据结构

 

说明:

1 很多地方都说到RingBuffer是一个环形的数据结构,它功能上表现出来的确实是环形结构,但是实现上是一个数组,而是通过生产者覆盖已经读过的数据,消费者回头读取未读取过的数据来实现的环形数据结构,

很多画图画成环形的,很容易误导理解

2 RingBuffer的size为2的n次方,可是Sequence是一直递增的,不知道其他人怎么理解的,我原来没看代码前的理解就是它的长度不超过RingBuffer的长度,然后重置后重新增长,这个错误的理解主要是看了很多博客上的环形的那个图,不论生产还是消费的Sequence一直

都是递增的, 到RingBuffer取值时,会根据RingBuffer的长度转换成对应的下标值

 

1 maven pom

 

		<dependency>
			<groupId>com.lmax</groupId>
			<artifactId>disruptor</artifactId>
			<version>3.3.0</version>
		</dependency>

 

 

2 我现在只是用到,了解的不是很深,看了部分代码,参考了很多其他博客,下面是我自己的看法,不对的地方欢迎指正

  

  2.1 测试代码是这样用的

   

  TestEvent.java

public class TestEvent {

    private String line;

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }
}

   生产者TestEventProducer.java:

 

   

public class TestEventProducer {

    private RingBuffer<TestEvent> ringBuffer;

    public TestEventProducer (RingBuffer<TestEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 转换器,向队列里放值的时候调用(队列先设置固定长度的对象,然后通过set方法生产值)
     */
    private static EventTranslatorOneArg<TestEvent, String> eventTranslatorOneArg = new EventTranslatorOneArg<TestEvent, String>() {
        @Override
        public void translateTo(TestEvent event, long sequence, String arg0) {
            event.setLine(arg0);
        }
    };

    /**
     * 生产者向队列里放入数据
     * @param line
     */
    public void onData (String line) {
        this.ringBuffer.publishEvent(eventTranslatorOneArg, line);
    }

    /**
     * 处理数据
     */
    public void handler () {
        for (int i = 0; i < 1000; i++) {
            this.onData("wozaizhe" + i);
        }
    }

   消费者TestEventHandler:消费者从RingBuffer中拿出数据打印了一下

   

public class TestEventHandler implements WorkHandler<TestEvent> {
    @Override
    public void onEvent(TestEvent event) throws Exception {
        System.out.println("处理了一行数据:" + event.getLine());
    }
}

 

 

   测试类:

public class TestEventMain {

    public static final int BUFFER_SIZE = 8;

    public static void main (String[] args) {
        testDisruptor();
    }

    public static void testDisruptor () {
        ExecutorService executor = Executors.newFixedThreadPool(8);
        EventFactory<TestEvent> eventFactory = new TestEventFactory();
        //创建disruptor,设置单生产者模式
        Disruptor disruptor = new Disruptor(eventFactory, BUFFER_SIZE, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy ());
        //设置消费者程序
        disruptor.handleEventsWithWorkerPool(new TestEventHandler(), new TestEventHandler(),
                new TestEventHandler(), new TestEventHandler());
        //设置异常处理
        disruptor.handleExceptionsWith(new TestEventExceptionHandler());
        //启动disruptor并返回RingBuffer
        RingBuffer<TestEvent> ringBuffer = disruptor.start();
        //创建生产者线程,并生产
        TestEventProducer producer = new TestEventProducer(ringBuffer);
        producer.handler();
        disruptor.shutdown();
        executor.shutdown();
    }

}

   测试中使用的是单一的生产者,消费者有多个,使用WorkerPool来管理多个消费者

 

   disruptor 的数据存放在RingBuffer中,RingBuffer的类结构:

 
disruptor(一) 单一生产者和WorkPool消费者源码阅读
            
    
    博客分类: disruptor disruptorRingBufferSequenceSequenceBarrierWorkPool 

 

disruptor设计的主要类

 
disruptor(一) 单一生产者和WorkPool消费者源码阅读
            
    
    博客分类: disruptor disruptorRingBufferSequenceSequenceBarrierWorkPool 
 

 

 上面是主要类图,RingBuffer只负责存储,生产者和消费者取号的协调工作都是由SingleProducerSequencer来完成的

 

 从代码分析:

 (1) 创建Disruptor

 

public Disruptor(final EventFactory<T> eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy)
    {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }

    //创建RingBuffer
    public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy)
    {
        switch (producerType)
        {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }

      //创建Sequencer并实例化到RingBuffer中
    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
                                                         int             bufferSize,
                                                         WaitStrategy    waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }
 

   创建Disruptor的过程: 创建了一个RingBuffer,实例化了生产者sequencer,实例化了其他参数

 

   (2) 设置消费者程序

       源码:

     

    //被调用的方法
    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
    {
        return createWorkerPool(new Sequence[0], workHandlers);
    }

    //创建workpool
    EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
    {
        //创建SequenceBarrier,每次消费者要读取RingBuffer中的下一个值都要通过SequenceBarrier来获取SequenceBarrier用来协调多个消费者并发的问题
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        //实现在下个方法
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
        consumerRepository.add(workerPool, sequenceBarrier);
        return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
    }

    //workpool 构造方法
    public WorkerPool(final RingBuffer<T> ringBuffer,
                      final SequenceBarrier sequenceBarrier,
                      final ExceptionHandler exceptionHandler,
                      final WorkHandler<? super T>... workHandlers)
    {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        //创建一个和消费者线程个数相同的WorkProcessor数组
        workProcessors = new WorkProcessor[numWorkers];

        for (int i = 0; i < numWorkers; i++)
        {
            //消费者在源码中的表现形式就是WorkProcessor,通过构造方法可以知道,一个workpool中的消费者程序,使用的是相同的sequenceBarrier, workSequence
            workProcessors[i] = new WorkProcessor<T>(ringBuffer,
                                                     sequenceBarrier,
                                                     workHandlers[i],
                                                     exceptionHandler,
                                                     workSequence);
        }
    }

    这个部分就是创建一个workpool,根据每个WorkHandler创建对应的WorkProcessor,同一个workpool中的消费者线程共享同一个sequenceBarrier,workSequence,类的关系可以看上面的类图

 

   (3) 启动disruptor

      

    public RingBuffer<T> start()
    {
        //获取每一个消费者程序的Sequence和workpool的Sequence
        Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
        ringBuffer.addGatingSequences(gatingSequences);

        checkOnlyStartedOnce();
        for (ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

    //根据一系列的引用,找到消费者程序WorkProcessor,初始化每个WorkProcessor的sequence,然后执行提交到线程池执行
    public RingBuffer<T> start(final Executor executor)
    {
        if (!started.compareAndSet(false, true))
        {
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        }

        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);

        for (WorkProcessor<?> processor : workProcessors)
        {
            processor.getSequence().set(cursor);
            executor.execute(processor);
        }

        return ringBuffer;
    }

    启动Disruptor就是启动消费者线程

 

   (4)单一生产者生产数据

      

    @Override
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
    {
        //获取RingBuffer下一个可操作的序列
        final long sequence = sequencer.next();
        //把数据set到队列,设置cursor的序列
        translateAndPublish(translator, sequence, arg0);
    }

    /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        //RingBuffer持有Sequencer,SingleProducerSequencer有nextValue和cachedValue两个成员变量,前者记录生产者生产到的位置,后者记录消费者线程中序列号最小的序列号,即是在最前面的消费者的序号
        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;
        //wrapPoint是一个很关键的变量,这个变量决定生产者是否可以覆盖序列号nextSequence,wrapPoint是为什么是nextSequence - bufferSize;RingBuffer表现出来的是一个环形的数据结构,实际上是一个长度为bufferSize的数组, 
        //nextSequence - bufferSize如果nextSequence小于bufferSize wrapPoint是负数,表示可以一直生产;如果nextSequence大于bufferSize wrapPoint是一个大于0的数,由于生产者和消费者的序列号差距不能超过bufferSize
        //(超过bufferSize会覆盖消费者未消费的数据),wrapPoint要小于等于多个消费者线程中消费的最小的序列号,即cachedValue的值,这就是下面if判断的根据
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = this.cachedValue;

        //继续生成会覆盖消费者未消费的数据
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            long minSequence;
            //判断wrapPoint是否大于消费者线程最小的序列号,如果大于,不能写入,继续等待
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            //满足生产条件了,缓存这次消费者线程最小消费序号,供下次使用
            this.cachedValue = minSequence;
        }
        //缓存生产者最大生产序列号
        this.nextValue = nextSequence;

        return nextSequence;
    }
 

   (5) 消费者程序WorkProccessor

     

    
    //线程的逻辑
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();
      
        //标志位,用来标志一次消费过程,此标志位在代码方面用的很巧妙,把两次执行揉成一段代码
        boolean processedSequence = true;
        //用来缓存消费者可以使用的RingBuffer最大序列号
        long cachedAvailableSequence = Long.MIN_VALUE;
        //记录被分配的WorkSequence的序列号,也是去RingBuffer取数据的序列号
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                //每次消费开始执行
                if (processedSequence)
                {
                    processedSequence = false;
                    //使用CAS算法从WorkPool的序列WorkSequence取得可用序列nextSequence
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                //如果可使用的最大序列号cachedAvaliableSequence大于等于我们要使用的序列号nextSequence,直接从RingBuffer取数据;不然进入else
                if (cachedAvailableSequence >= nextSequence)
                {
                    //可以满足消费的条件,根据序列号去RingBuffer去读取数据
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    //一次消费结束,设置标志位
                    processedSequence = true;
                }
                else
                {//等待生产者生产,获取到最大的可以使用的序列号
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

    //ProcessorSequencerBarrier.java等待生产者生产出更多的产品用来消费
    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        
        //根据选用的等待策略来等待生产者生产
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        
        //这个不明白是为什么,生产者最大的序列小于要使用的序列,直接返回了,上面的run()方法中的while循环要再执行一遍,不明白此处的用意 
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        //返回较大的Sequence
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

    //YieldingWaitStrategy 等待策略,先尝试一百次,再不满足条件,当前线程就yield,让其他线程先执行
    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        //等待次数
        int counter = SPIN_TRIES;
 
        //循环,如果生产的最大序列号小于消费者需要的序列号,继续等待,等待次数超过counter次,线程yield
        //这里dependentSequence就是cursorSequence,在ProcessorSequencerBarrier构造函数中可以看到
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    //counter大于0则减一返回,否则当前线程yield
    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        if (0 == counter)
        {
            Thread.yield();
        }
        else
        {
            --counter;
        }

        return counter;
    }

    消费者的整体逻辑:多个消费者共同使用同一个Sequence即workSequence,大家都从这个sequence里取得序列号,通过CAS保证线程安全,然后每个消费者拿到序列号nextSequence后去和RingBuffer的cursor比较,即生产者生产到的最大序列号比较,如果自己要取的序号还没有被生产者生产出来,则等待生产者生成出来后再从RingBuffer中取数据,处理数据

 

  上面的每个线程run()方法是while(true)怎么停下来的呢,通过跑出异常来控制的,抛出AlertException,在捕捉到异常后break循环语句块

   促使抛出异常在关闭disruptor的时候

  (6)disruptor关闭

 

 

    
    public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
    {
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        //断是否有剩余消息未发送,有则继续循环
        while (hasBacklog())
        {
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
            {
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
        }
        //数据全部发送和消费完毕
        halt();
    }



   /**
     * Confirms if all messages have been consumed by all event processors
     */
    private boolean hasBacklog()
    {
        final long cursor = ringBuffer.getCursor();
        for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false))
        {
            //通过判断生产数是否等于消费数,等于表示生产消费结束,返回false
            if (cursor > consumer.get())
            {
                return true;
            }
        }
        return false;
    }

    //线程运行设置为false, Barrier的alert设置为true,run()方法执行while循环的时候会检查一下alert,为true则跳出循环
    @Override
    public void halt()
    {
        running.set(false);
        sequenceBarrier.alert();
    }
 

 

 总体来说:RingBuffer在生产Sequencer中记录一个cursor,追踪生产者生产到的最新位置,通过WorkSequence和sequence记录整个workpool消费的位置和每个WorkProcessor消费到位置,来协调生产和消费程序

  • disruptor(一) 单一生产者和WorkPool消费者源码阅读
            
    
    博客分类: disruptor disruptorRingBufferSequenceSequenceBarrierWorkPool 
  • 大小: 69.8 KB
  • disruptor(一) 单一生产者和WorkPool消费者源码阅读
            
    
    博客分类: disruptor disruptorRingBufferSequenceSequenceBarrierWorkPool 
  • 大小: 161 KB
  • disruptor(一) 单一生产者和WorkPool消费者源码阅读
            
    
    博客分类: disruptor disruptorRingBufferSequenceSequenceBarrierWorkPool 
  • 大小: 44.8 KB