spring与disruptor集成的简单示例
程序员文章站
2022-03-25 15:43:55
disruptor不过多介绍了,描述下当前的业务场景,两个应用a,b,应用 a 向应用 b 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高....
disruptor不过多介绍了,描述下当前的业务场景,两个应用a,b,应用 a 向应用 b 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致a应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用reactor
basequeuehelper.java
/** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。 * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源. * * @author xielongwang * @create 2018-01-18 下午3:49 * @email xielong.wang@nvr-china.com * @description */ public abstract class basequeuehelper<d, e extends valuewrapper<d>, h extends workhandler<e>> { /** * 记录所有的队列,系统退出时统一清理资源 */ private static list<basequeuehelper> queuehelperlist = new arraylist<basequeuehelper>(); /** * disruptor 对象 */ private disruptor<e> disruptor; /** * ringbuffer */ private ringbuffer<e> ringbuffer; /** * initqueue */ private list<d> initqueue = new arraylist<d>(); /** * 队列大小 * * @return 队列长度,必须是2的幂 */ protected abstract int getqueuesize(); /** * 事件工厂 * * @return eventfactory */ protected abstract eventfactory<e> eventfactory(); /** * 事件消费者 * * @return workhandler[] */ protected abstract workhandler[] gethandler(); /** * 初始化 */ public void init() { threadfactory namedthreadfactory = new threadfactorybuilder().setnameformat("disruptorthreadpool").build(); disruptor = new disruptor<e>(eventfactory(), getqueuesize(), namedthreadfactory, producertype.single, getstrategy()); disruptor.setdefaultexceptionhandler(new myhandlerexception()); disruptor.handleeventswithworkerpool(gethandler()); ringbuffer = disruptor.start(); //初始化数据发布 for (d data : initqueue) { ringbuffer.publishevent(new eventtranslatoronearg<e, d>() { @override public void translateto(e event, long sequence, d data) { event.setvalue(data); } }, data); } //加入资源清理钩子 synchronized (queuehelperlist) { if (queuehelperlist.isempty()) { runtime.getruntime().addshutdownhook(new thread() { @override public void run() { for (basequeuehelper basequeuehelper : queuehelperlist) { basequeuehelper.shutdown(); } } }); } queuehelperlist.add(this); } } /** * 如果要改变线程执行优先级,override此策略. yieldingwaitstrategy会提高响应并在闲时占用70%以上cpu, * 慎用sleepingwaitstrategy会降低响应更减少cpu占用,用于日志等场景. * * @return waitstrategy */ protected abstract waitstrategy getstrategy(); /** * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理. */ public synchronized void publishevent(d data) { if (ringbuffer == null) { initqueue.add(data); return; } ringbuffer.publishevent(new eventtranslatoronearg<e, d>() { @override public void translateto(e event, long sequence, d data) { event.setvalue(data); } }, data); } /** * 关闭队列 */ public void shutdown() { disruptor.shutdown(); } }
eventfactory.java
/** * @author xielongwang * @create 2018-01-18 下午6:24 * @email xielong.wang@nvr-china.com * @description */ public class eventfactory implements com.lmax.disruptor.eventfactory<seriesdataevent> { @override public seriesdataevent newinstance() { return new seriesdataevent(); } }
myhandlerexception.java
public class myhandlerexception implements exceptionhandler { private logger logger = loggerfactory.getlogger(myhandlerexception.class); /* * (non-javadoc) 运行过程中发生时的异常 * * @see * com.lmax.disruptor.exceptionhandler#handleeventexception(java.lang.throwable * , long, java.lang.object) */ @override public void handleeventexception(throwable ex, long sequence, object event) { ex.printstacktrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.tostring(), ex.getmessage()); } /* * (non-javadoc) 启动时的异常 * * @see * com.lmax.disruptor.exceptionhandler#handleonstartexception(java.lang. * throwable) */ @override public void handleonstartexception(throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getmessage()); } /* * (non-javadoc) 关闭时的异常 * * @see * com.lmax.disruptor.exceptionhandler#handleonshutdownexception(java.lang * .throwable) */ @override public void handleonshutdownexception(throwable ex) { logger.error("shutdown disruptor error ==[{}]!", ex.getmessage()); } }
seriesdata.java (代表应用a发送给应用b的消息)
public class seriesdata { private string deviceinfostr; public seriesdata() { } public seriesdata(string deviceinfostr) { this.deviceinfostr = deviceinfostr; } public string getdeviceinfostr() { return deviceinfostr; } public void setdeviceinfostr(string deviceinfostr) { this.deviceinfostr = deviceinfostr; } @override public string tostring() { return "seriesdata{" + "deviceinfostr='" + deviceinfostr + '\'' + '}'; } }
seriesdataevent.java
public class seriesdataevent extends valuewrapper<seriesdata> { }
seriesdataeventhandler.java
public class seriesdataeventhandler implements workhandler<seriesdataevent> { private logger logger = loggerfactory.getlogger(seriesdataeventhandler.class); @autowired private deviceinfoservice deviceinfoservice; @override public void onevent(seriesdataevent event) { if (event.getvalue() == null || stringutils.isempty(event.getvalue().getdeviceinfostr())) { logger.warn("receiver series data is empty!"); } //业务处理 deviceinfoservice.processdata(event.getvalue().getdeviceinfostr()); } }
seriesdataeventqueuehelper.java
@component public class seriesdataeventqueuehelper extends basequeuehelper<seriesdata, seriesdataevent, seriesdataeventhandler> implements initializingbean { private static final int queue_size = 1024; @autowired private list<seriesdataeventhandler> seriesdataeventhandler; @override protected int getqueuesize() { return queue_size; } @override protected com.lmax.disruptor.eventfactory eventfactory() { return new eventfactory(); } @override protected workhandler[] gethandler() { int size = seriesdataeventhandler.size(); seriesdataeventhandler[] parameventhandlers = (seriesdataeventhandler[]) seriesdataeventhandler.toarray(new seriesdataeventhandler[size]); return parameventhandlers; } @override protected waitstrategy getstrategy() { return new blockingwaitstrategy(); //return new yieldingwaitstrategy(); } @override public void afterpropertiesset() throws exception { this.init(); } }
valuewrapper.java
public abstract class valuewrapper<t> { private t value; public valuewrapper() {} public valuewrapper(t value) { this.value = value; } public t getvalue() { return value; } public void setvalue(t value) { this.value = value; } }
disruptorconfig.java
@configuration @componentscan(value = {"com.portal.disruptor"}) //多实例几个消费者 public class disruptorconfig { /** * smsparameventhandler1 * * @return seriesdataeventhandler */ @bean public seriesdataeventhandler smsparameventhandler1() { return new seriesdataeventhandler(); } /** * smsparameventhandler2 * * @return seriesdataeventhandler */ @bean public seriesdataeventhandler smsparameventhandler2() { return new seriesdataeventhandler(); } /** * smsparameventhandler3 * * @return seriesdataeventhandler */ @bean public seriesdataeventhandler smsparameventhandler3() { return new seriesdataeventhandler(); } /** * smsparameventhandler4 * * @return seriesdataeventhandler */ @bean public seriesdataeventhandler smsparameventhandler4() { return new seriesdataeventhandler(); } /** * smsparameventhandler5 * * @return seriesdataeventhandler */ @bean public seriesdataeventhandler smsparameventhandler5() { return new seriesdataeventhandler(); } }
测试
//注入seriesdataeventqueuehelper消息生产者 @autowired private seriesdataeventqueuehelper seriesdataeventqueuehelper; @requestmapping(value = "/data", method = requestmethod.post, produces = mediatype.application_json_value) public dataresponsevo<string> receiverdevicedata(@requestbody string devicedata) { long starttime1 = system.currenttimemillis(); if (stringutils.isempty(devicedata)) { logger.info("receiver data is empty !"); return new dataresponsevo<string>(400, "failed"); } seriesdataeventqueuehelper.publishevent(new seriesdata(devicedata)); long starttime2 = system.currenttimemillis(); logger.info("receiver data ==[{}] millisecond ==[{}]", devicedata, starttime2 - starttime1); return new dataresponsevo<string>(200, "success"); }
应用a通过/data 接口把数据发送到应用b ,然后通过seriesdataeventqueuehelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用a. 可接受消息丢失, 可以通过扩展seriesdataeventqueuehelper来达到对disruptor队列的监控
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: PHP header头部定义详解
下一篇: JS面向对象之多选框实现