欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java深入学习(6):Disruptor

程序员文章站 2022-06-24 09:45:50
Disruptor框架简介: 并发框架,基于事件驱动,使用观察者模式 底层采用环形数组,取模算法 简单使用: 工厂: 消费者: 生产者: 启动: 打印如下: ......

disruptor框架简介:

并发框架,基于事件驱动,使用观察者模式

底层采用环形数组,取模算法

 

简单使用:

/**
 * 声明一个event:表示生产者和消费者之间传递的数据类型
 */
public class longevent {

    private long value;

    public long getvalue() {
        return value;
    }

    public void setvalue(long value) {
        this.value = value;
    }

}

 

工厂:

/**
 * 实例化
 */
public class longeventfactory implements eventfactory<longevent> {

    public longevent newinstance() {
        return new longevent();
    }

}

 

消费者:

/**
 * 消费者
 */
public class longeventhandler implements eventhandler<longevent> {

    public void onevent(longevent event, long sequence, boolean endofbatch) throws exception {
        system.out.println("消费者获取数据:"+event.getvalue());
    }

}

 

生产者:

/**
 * 生产者
 */
public class longeventproducer {

    private ringbuffer<longevent> ringbuffer;

    public longeventproducer(ringbuffer<longevent> ringbuffer) {
        this.ringbuffer = ringbuffer;
    }

    public void ondata(bytebuffer bytebuffer) {
        //获取事件队列的下标位置
        long sequence = ringbuffer.next();
        try {
            //取出空队列
            longevent longevent = ringbuffer.get(sequence);
            //给空队列赋值
            longevent.setvalue(bytebuffer.getlong(0));
        } catch (exception e) {
            e.printstacktrace();
        } finally {
            system.out.println("生产者发送数据");
            //发送数据
            ringbuffer.publish(sequence);
        }
    }

}

 

启动:

public class main {

    public static void main(string[] args) {
        //创建可缓存线程池
        executorservice executor = executors.newcachedthreadpool();
        //创建工厂
        eventfactory<longevent> factory = new longeventfactory();
        //创建ringbuffer(必须为2的n次方)
        int ringbuffer = 1024 * 1024;
        //创建disruptor
        disruptor<longevent> disruptor = new disruptor<longevent>(
                factory,
                ringbuffer,
                executor,
                producertype.multi,
                new yieldingwaitstrategy()
        );
        //注册消费者(如果注册多个消费者默认是重复消费)
        disruptor.handleeventswith(new longeventhandler());
        //启动
        disruptor.start();
        //创建ringbuffer容器
        ringbuffer<longevent> buffer = disruptor.getringbuffer();
        //创建生产者
        longeventproducer longeventproducer = new longeventproducer(buffer);
        //定义大小为8的缓冲区
        bytebuffer bytebuffer = bytebuffer.allocate(8);
        for (int i = 0; i < 100; i++) {
            bytebuffer.putlong(0, i);
            longeventproducer.ondata(bytebuffer);
        }
        executor.shutdown();
        disruptor.shutdown();
    }

}

 

打印如下:

....................
消费者获取数据:39
生产者发送数据
消费者获取数据:40
生产者发送数据
消费者获取数据:41
生产者发送数据
消费者获取数据:42
....................