并发框架之Disruptor 博客分类: Disruptor,并发 disruptor
程序员文章站
2024-02-20 13:20:40
...
Disruptor它是一个开源的并发框架
官方地址|:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
https://github.com/LMAX-Exchange/disruptor
官方地址|:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
https://github.com/LMAX-Exchange/disruptor
public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; } }
import com.lmax.disruptor.EventFactory; /** *事件工厂 */ public class LongEventFactory implements EventFactory<LongEvent>{ @Override public LongEvent newInstance() { return new LongEvent(); } }
import com.lmax.disruptor.EventHandler; /** * 消费者/事件处理器 */ public class LongEventHandler implements EventHandler{ @Override public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { System.out.println("Event:"+event+" sequence:"+sequence+" endOfBatch:"+endOfBatch); } }
import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { // 下一个序列 long sequence = ringBuffer.next(); try { //获取Disruptor中的条目用于序列 LongEvent event = ringBuffer.get(sequence); //填充数据 event.setValue(bb.getLong(0)); } finally { //发布序列 ringBuffer.publish(sequence); } } }
import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.setValue(bb.getLong(0));; } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class LongEventMain { public static void main(String[] args) throws Exception { Executor executor = Executors.newCachedThreadPool(); // bufferSize必须是2的N次方 int bufferSize = 1024; // 构造disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,bufferSize, executor); // 链接处理器 disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event+" sequence:"+sequence+" endOfBatch:"+endOfBatch)); // 启动运行Disruptor disruptor.start(); //从disruptor中获取环形缓冲区 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)),bb); Thread.sleep(1000); } } }
推荐阅读
-
并发框架之Disruptor 博客分类: Disruptor,并发 disruptor
-
《effective java》之九:并发 博客分类: Java effective
-
线程同步之wait() 博客分类: java并发 javathread
-
线程同步之wait() 博客分类: java并发 javathread
-
网站优化之异步js加载技术实践 博客分类: 异步加载技术,seo优化 网站并发提高 用户友好性 js异步加载界面优化网站并发
-
并发编程之Disruptor并发框架
-
Disruptor并发框架 java并发框架disruptor
-
并发框架Disruptor几个Demo
-
并发框架Disruptor几个Demo
-
并发编程之Disruptor并发框架