依赖性任务处理
程序员文章站
2024-03-23 10:18:10
...
事件类
package com.qb.loan.disruptor.dependentevent;
public class MyEvent {
private long value;
public MyEvent() {
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
事件工厂类
package com.qb.loan.disruptor.dependentevent;
import com.lmax.disruptor.EventFactory;
public class MyEventFactory implements EventFactory<MyEvent>{
public MyEvent newInstance() {
return new MyEvent();
}
}
事件处理机
package com.qb.loan.disruptor.dependentevent;
import com.lmax.disruptor.EventHandler;
public class MyEventHandlerB implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event B : " + myEvent.getValue() + " " + Thread.currentThread().getId());
}
}
package com.qb.loan.disruptor.dependentevent;
import com.lmax.disruptor.EventHandler;
public class MyEventHandlerC implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event C : " + myEvent.getValue() + " " + Thread.currentThread().getId());
}
}
package com.qb.loan.disruptor.dependentevent;
import com.lmax.disruptor.EventHandler;
public class MyEventHandlerD implements EventHandler<MyEvent>{
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event D : " + myEvent.getValue() + " " + Thread.currentThread().getId());
}
}
测试类
package com.qb.loan.disruptor.dependentevent;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args){
EventFactory<MyEvent> myEventFactory = new MyEventFactory();
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
int ringBufferSize = 32;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
EventHandler<MyEvent> b = new MyEventHandlerB();
EventHandler<MyEvent> c = new MyEventHandlerC();
EventHandler<MyEvent> d = new MyEventHandlerD();
SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
disruptor.handleEventsWith(processord);
// disruptor.after(b,c).handleEventsWith(d); // 此行能代替上两行的程序逻辑
RingBuffer<MyEvent> ringBuffer = disruptor.start(); // 启动Disruptor
for(int i=0; i<10; i++) {
long sequence = ringBuffer.next(); // 申请位置
try {
MyEvent myEvent = ringBuffer.get(sequence);
myEvent.setValue(i); // 放置数据
} finally {
ringBuffer.publish(sequence); // 提交,如果不提交完成事件会一直阻塞
}
try{
Thread.sleep(100);
}catch (Exception e){
}
}
disruptor.shutdown();
}
}
上一篇: 简单的epoll模型