欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发框架之Disruptor 博客分类: Disruptor,并发 disruptor 

程序员文章站 2024-02-20 13:20:40
...
Disruptor它是一个开源的并发框架
官方地址|:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

https://github.com/LMAX-Exchange/disruptor


public class LongEvent {
	
	private Long value;

	public Long getValue() {
		return value;
	}

	public void setValue(Long value) {
		this.value = value;
	}
}


import com.lmax.disruptor.EventFactory;
/**
 *事件工厂
 */
public class LongEventFactory implements EventFactory<LongEvent>{
	@Override
	public LongEvent newInstance() {
		 return new LongEvent();
	}

}

import com.lmax.disruptor.EventHandler;
/**
 * 消费者/事件处理器
 */
public class LongEventHandler implements EventHandler{
	@Override
	public void onEvent(Object event, long sequence, boolean endOfBatch)
			throws Exception {
		System.out.println("Event:"+event+"   sequence:"+sequence+"   endOfBatch:"+endOfBatch);
	}
}



import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;
	public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	public void onData(ByteBuffer bb) {
		// 下一个序列
		long sequence = ringBuffer.next();
		try {
			//获取Disruptor中的条目用于序列
			LongEvent event = ringBuffer.get(sequence); 
			//填充数据
			event.setValue(bb.getLong(0));
		} finally {
			//发布序列
			ringBuffer.publish(sequence);
		}
	}
}


import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

	public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
		public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
			event.setValue(bb.getLong(0));;
		}
	};

	public void onData(ByteBuffer bb) {
		ringBuffer.publishEvent(TRANSLATOR, bb);
	}

}


import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class LongEventMain {
	public static void main(String[] args) throws Exception {
		Executor executor = Executors.newCachedThreadPool();
		// bufferSize必须是2的N次方
		int bufferSize = 1024;
		// 构造disruptor
		Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,bufferSize, executor);
		// 链接处理器
		disruptor.handleEventsWith((event, sequence, endOfBatch) ->
		System.out.println("Event: " + event+"  sequence:"+sequence+"  endOfBatch:"+endOfBatch));
		// 启动运行Disruptor
		disruptor.start();
		//从disruptor中获取环形缓冲区
		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
		ByteBuffer bb = ByteBuffer.allocate(8);
		for (long l = 0; true; l++) {
			bb.putLong(0, l);
			ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)),bb);
			Thread.sleep(1000);
		}
	}

}


相关标签: disruptor