Disruptor并发框架 java并发框架disruptor
程序员文章站
2022-07-03 09:39:44
...
Disruptor:高效的并发组件(框架),可以认为是没有锁的生产者消费者模型,所以效率高,内部使用RingBuffer机制,做到无锁并发,
代码示例
Disruptor服务器
disruptor 框架的事件
FrameEventHandler实现EventHandler接口,消费者处理
消费者:真正的消息处理
NET功能,,跟Disruptor框架无关,上层应用
客户端
完整代码下载参考附件
代码示例
Disruptor服务器
package com.gbcom.frame.disruptor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; /** * DisruptorServer,封装了Disruptor服务,提供 start ,stop,注册消费者,提供发布方法等功能 * * * 该类 可以直接使用,并且独立于业务。该类描述了 Disruptor的核心 * @author syz * @date 2014-10-17 * @version v1.0.0 * @see com.gbcom.frame.disruptor.DisruptorServer */ public class DisruptorServer { private static final int BUFFER_SIZE = 4; private Disruptor<FrameEvent> disruptor; private EventProcessor processor = new EventProcessor(); /** * 数据的封装;封装成 FrameEvent */ private static final EventTranslatorThreeArg<FrameEvent, String, Integer, byte[]> TRANSLATOR = new EventTranslatorThreeArg<FrameEvent, String, Integer, byte[]>(){ @Override public void translateTo(FrameEvent event, long seq, String ip, Integer port, byte[] data) { event.setData(data); event.setIp(ip); event.setPort(port); } }; private static class DisruptorHolder{ private static final DisruptorServer INSTANCE = new DisruptorServer(); } public static DisruptorServer getInstance(){ return DisruptorHolder.INSTANCE; } public void start(){ initDisruptor(); } private void initDisruptor() { ExecutorService executor = Executors.newCachedThreadPool(); //线程池,构建disruptor需要 disruptor = new Disruptor<FrameEvent>(FrameEvent.ENENT_FACTORY, BUFFER_SIZE, executor, ProducerType.SINGLE, new BlockingWaitStrategy());//构建disruptor disruptor.handleEventsWith(new FrameEventHandler<FrameEvent>(processor));//注册一个消费者,消费者事先注册好 disruptor.start();//开启并发服务 disruptor.getRingBuffer();//启动轮转,轮转的size 在构建disruptor指定,基于该机制实现的无锁 高并发 } /** * 生产者,发布一个消息: (DisruptorServer.onData) * @param ip * @param port * @param data */ public void onData(String ip,Integer port,byte[] data){ disruptor.getRingBuffer().publishEvent(TRANSLATOR,ip,port,data); } /** * 生产错误的处理。直接调用,非添加到队列。 * @param t */ public void onError(Throwable t){ if(processor !=null){ processor.onError(t); } } }
disruptor 框架的事件
package com.gbcom.frame.disruptor; import java.util.Arrays; import com.lmax.disruptor.EventFactory; /** * disruptor 框架的事件。POJO,disruptor 需要提供factory, * @author syz * @date 2014-10-17 * @version v1.0.0 * @see com.gbcom.frame.disruptor.FrameEvent */ public class FrameEvent { private byte[] data; private String ip; private int port; public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } /** * 工厂类 */ public static final EventFactory<FrameEvent> ENENT_FACTORY = new EventFactory<FrameEvent> (){ @Override public FrameEvent newInstance() { // TODO Auto-generated method stub return new FrameEvent(); } }; @Override public String toString() { return "FrameEvent [data=" + Arrays.toString(data) + ", ip=" + ip + ", port=" + port + "]"; } }
FrameEventHandler实现EventHandler接口,消费者处理
package com.gbcom.frame.disruptor; import com.lmax.disruptor.EventHandler; /** * 消费者处理,,实现EventHandler接口 * @author SYZ * @date 2016-11-3 下午02:08:32 * @version 1.0.0 * @see com.gbcom.frame.disruptor.FrameEventHandler */ public class FrameEventHandler<FrameEvent> implements EventHandler<FrameEvent>{ private EventProcessor processor ; protected FrameEventHandler(EventProcessor processor) { super(); this.processor = processor; } @Override public void onEvent(FrameEvent event, long seq, boolean endOfBatch) throws Exception { processor.onReceive((com.gbcom.frame.disruptor.FrameEvent) event); } }
消费者:真正的消息处理
package com.gbcom.frame.disruptor; import java.io.UnsupportedEncodingException; import org.apache.commons.io.Charsets; import com.gbcom.frame.disruptor.snetty.UdpHandler; /** * 消费者:真正的消息处理 * @author syz * @date 2014-10-17 * @version v1.0.0 * @see com.gbcom.frame.disruptor.EventProcessor */ public class EventProcessor implements UdpHandler{ public void onReceive(FrameEvent event){ System.out.println("receive is success :"+event); try { NatTabManager.getInstance().put(new String(event.getData(),"utf-8"), event); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } event = NatTabManager.getInstance().getNat(new String(event.getData(),Charsets.UTF_8)); //System.out.println(HttpClientUtil.get("http://"+event.getIp()+":"+event.getPort())); } public void onError(Throwable t ){ System.out.println("receive is false :" + t); } public static void main(String args[]){ System.out.println(HttpClientUtil.get("http://127.0.0.1:80")); } @Override public void onReceive(String ip, int port, byte[] data) { } }
NET功能,,跟Disruptor框架无关,上层应用
package com.gbcom.frame.disruptor; import java.util.HashMap; import java.util.Map; /** * NET功能,,跟Disruptor框架无关, * @author SYZ * @date 2016-11-3 下午02:09:14 * @version 1.0.0 * @see com.gbcom.frame.disruptor.NatTabManager */ public class NatTabManager { private Map<String,FrameEvent> natTab = new HashMap<String,FrameEvent>(); private static NatTabManager intance = new NatTabManager(); public static NatTabManager getInstance(){ return intance; } public void put(String key,FrameEvent value){ natTab.put(key, value); } public Map getNatTab(){ return natTab; } public FrameEvent getNat(String key){ return natTab.get(key); } }
客户端
package com.gbcom.frame.disruptor; /** * * * Disruptor:高效的并发组件(框架),可以认为是没有锁的生产者消费者模型,所以效率高 * * 该例子完成Disruptor功能,且具有一定的使用价值,Disruptor独立于业务,,,DisruptorServer * * 高效的生产者消费者模型:DisruptorServer * * @author SunYanzheng * @date 2014-10-17 * @version v1.0.0 * @see com.gbcom.frame.disruptor.DisClient */ public class DisClient { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub DisruptorServer.getInstance().start(); //初始化服务 DisruptorServer.getInstance().onData("10.1.1.1", 162, "hello".getBytes());//调用onData 发布数据 } }
完整代码下载参考附件
推荐阅读
-
并发编程之Disruptor并发框架
-
并发系列(4)之 Future 框架详解
-
Python并发编程之学习异步IO框架:asyncio 中篇(十)
-
04 整合IDEA+Maven+SSM框架的高并发的商品秒杀项目之高并发优化
-
[转]Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理
-
[转]Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用
-
[转]Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析
-
C++多进程并发框架FFLIB
-
Java 并发编程之ForkJoin框架
-
推荐:实现RTSP/RTMP/HLS/HTTP协议的轻量级流媒体框架,支持大并发连接请求