欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring与disruptor集成的简单示例

程序员文章站 2022-03-25 15:43:55
disruptor不过多介绍了,描述下当前的业务场景,两个应用a,b,应用 a 向应用 b 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高....

disruptor不过多介绍了,描述下当前的业务场景,两个应用a,b,应用 a 向应用 b 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致a应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用reactor

basequeuehelper.java

/**
 * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
 *
 * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class basequeuehelper<d, e extends valuewrapper<d>, h extends workhandler<e>> {

  /**
   * 记录所有的队列,系统退出时统一清理资源
   */
  private static list<basequeuehelper> queuehelperlist = new arraylist<basequeuehelper>();
  /**
   * disruptor 对象
   */
  private disruptor<e> disruptor;
  /**
   * ringbuffer
   */
  private ringbuffer<e> ringbuffer;
  /**
   * initqueue
   */
  private list<d> initqueue = new arraylist<d>();

  /**
   * 队列大小
   *
   * @return 队列长度,必须是2的幂
   */
  protected abstract int getqueuesize();

  /**
   * 事件工厂
   *
   * @return eventfactory
   */
  protected abstract eventfactory<e> eventfactory();

  /**
   * 事件消费者
   *
   * @return workhandler[]
   */
  protected abstract workhandler[] gethandler();

  /**
   * 初始化
   */
  public void init() {
    threadfactory namedthreadfactory = new threadfactorybuilder().setnameformat("disruptorthreadpool").build();
    disruptor = new disruptor<e>(eventfactory(), getqueuesize(), namedthreadfactory, producertype.single, getstrategy());
    disruptor.setdefaultexceptionhandler(new myhandlerexception());
    disruptor.handleeventswithworkerpool(gethandler());
    ringbuffer = disruptor.start();

    //初始化数据发布
    for (d data : initqueue) {
      ringbuffer.publishevent(new eventtranslatoronearg<e, d>() {
        @override
        public void translateto(e event, long sequence, d data) {
          event.setvalue(data);
        }
      }, data);
    }

    //加入资源清理钩子
    synchronized (queuehelperlist) {
      if (queuehelperlist.isempty()) {
        runtime.getruntime().addshutdownhook(new thread() {
          @override
          public void run() {
            for (basequeuehelper basequeuehelper : queuehelperlist) {
              basequeuehelper.shutdown();
            }
          }
        });
      }
      queuehelperlist.add(this);
    }
  }

  /**
   * 如果要改变线程执行优先级,override此策略. yieldingwaitstrategy会提高响应并在闲时占用70%以上cpu,
   * 慎用sleepingwaitstrategy会降低响应更减少cpu占用,用于日志等场景.
   *
   * @return waitstrategy
   */
  protected abstract waitstrategy getstrategy();

  /**
   * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
   */
  public synchronized void publishevent(d data) {
    if (ringbuffer == null) {
      initqueue.add(data);
      return;
    }
    ringbuffer.publishevent(new eventtranslatoronearg<e, d>() {
      @override
      public void translateto(e event, long sequence, d data) {
        event.setvalue(data);
      }
    }, data);
  }

  /**
   * 关闭队列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

eventfactory.java

/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class eventfactory implements com.lmax.disruptor.eventfactory<seriesdataevent> {

  @override
  public seriesdataevent newinstance() {
    return new seriesdataevent();
  }
}

myhandlerexception.java

public class myhandlerexception implements exceptionhandler {

  private logger logger = loggerfactory.getlogger(myhandlerexception.class);

  /*
   * (non-javadoc) 运行过程中发生时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleeventexception(java.lang.throwable
   * , long, java.lang.object)
   */
  @override
  public void handleeventexception(throwable ex, long sequence, object event) {
    ex.printstacktrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.tostring(), ex.getmessage());
  }

  /*
   * (non-javadoc) 启动时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleonstartexception(java.lang.
   * throwable)
   */
  @override
  public void handleonstartexception(throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getmessage());
  }

  /*
   * (non-javadoc) 关闭时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleonshutdownexception(java.lang
   * .throwable)
   */
  @override
  public void handleonshutdownexception(throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getmessage());
  }
}

seriesdata.java (代表应用a发送给应用b的消息)

public class seriesdata {
  private string deviceinfostr;
  public seriesdata() {
  }

  public seriesdata(string deviceinfostr) {
    this.deviceinfostr = deviceinfostr;
  }

  public string getdeviceinfostr() {
    return deviceinfostr;
  }

  public void setdeviceinfostr(string deviceinfostr) {
    this.deviceinfostr = deviceinfostr;
  }

  @override
  public string tostring() {
    return "seriesdata{" +
        "deviceinfostr='" + deviceinfostr + '\'' +
        '}';
  }
}

seriesdataevent.java

public class seriesdataevent extends valuewrapper<seriesdata> {
}

seriesdataeventhandler.java

public class seriesdataeventhandler implements workhandler<seriesdataevent> {
  private logger logger = loggerfactory.getlogger(seriesdataeventhandler.class);
  @autowired
  private deviceinfoservice deviceinfoservice;

  @override
  public void onevent(seriesdataevent event) {
    if (event.getvalue() == null || stringutils.isempty(event.getvalue().getdeviceinfostr())) {
      logger.warn("receiver series data is empty!");
    }
    //业务处理
    deviceinfoservice.processdata(event.getvalue().getdeviceinfostr());
  }
}

seriesdataeventqueuehelper.java

@component
public class seriesdataeventqueuehelper extends basequeuehelper<seriesdata, seriesdataevent, seriesdataeventhandler> implements initializingbean {
  private static final int queue_size = 1024;
  @autowired
  private list<seriesdataeventhandler> seriesdataeventhandler;

  @override
  protected int getqueuesize() {
    return queue_size;
  }

  @override
  protected com.lmax.disruptor.eventfactory eventfactory() {
    return new eventfactory();
  }

  @override
  protected workhandler[] gethandler() {
    int size = seriesdataeventhandler.size();
    seriesdataeventhandler[] parameventhandlers = (seriesdataeventhandler[]) seriesdataeventhandler.toarray(new seriesdataeventhandler[size]);
    return parameventhandlers;
  }

  @override
  protected waitstrategy getstrategy() {
    return new blockingwaitstrategy();
    //return new yieldingwaitstrategy();
  }

  @override
  public void afterpropertiesset() throws exception {
    this.init();
  }
}

valuewrapper.java

public abstract class valuewrapper<t> {
  private t value;
  public valuewrapper() {}
  public valuewrapper(t value) {
    this.value = value;
  }

  public t getvalue() {
    return value;
  }

  public void setvalue(t value) {
    this.value = value;
  }
}

disruptorconfig.java

@configuration
@componentscan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class disruptorconfig {

  /**
   * smsparameventhandler1
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler1() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler2
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler2() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler3
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler3() {
    return new seriesdataeventhandler();
  }


  /**
   * smsparameventhandler4
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler4() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler5
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler5() {
    return new seriesdataeventhandler();
  }
}

测试

  //注入seriesdataeventqueuehelper消息生产者
  @autowired
  private seriesdataeventqueuehelper seriesdataeventqueuehelper;

  @requestmapping(value = "/data", method = requestmethod.post, produces = mediatype.application_json_value)
  public dataresponsevo<string> receiverdevicedata(@requestbody string devicedata) {
    long starttime1 = system.currenttimemillis();

    if (stringutils.isempty(devicedata)) {
      logger.info("receiver data is empty !");
      return new dataresponsevo<string>(400, "failed");
    }
    seriesdataeventqueuehelper.publishevent(new seriesdata(devicedata));
    long starttime2 = system.currenttimemillis();
    logger.info("receiver data ==[{}] millisecond ==[{}]", devicedata, starttime2 - starttime1);
    return new dataresponsevo<string>(200, "success");
  }

应用a通过/data 接口把数据发送到应用b ,然后通过seriesdataeventqueuehelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用a. 可接受消息丢失, 可以通过扩展seriesdataeventqueuehelper来达到对disruptor队列的监控

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。