欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

采用BlockingQueue实现内存消息队列 javaactivemq阻塞队列blockingqueue

程序员文章站 2022-04-21 09:26:43
...

在我们日常开发过程中,有时候需要采用内存消息队列的方案来实现我们想要的功能。

        比如日志系统:

我们需要把系统中关键模块的日志写入文件或者数据库以便存档分析。那么我们可以采用内存队列来存储各个模块输出的日志,另外再由专门的日志存储端把日志写入存储系统中。基本架构如下:

采用BlockingQueue实现内存消息队列
            
    
    
        javaactivemq阻塞队列blockingqueue

图1:日志系统局部架构

 

这样做的好处是打印日志的模块只需要关注打印日志,不用关心日志存储到哪里和如何分类存储等逻辑。而日志读取存储端也只需要关注读取和分析队列中的日志消息即可。

 

      再比如通用邮件发送功能:

一般业务系统中常需要发送各种邮件,比如给用户的业务提醒邮件、修改密码时的密保邮件、业务功能报警邮件等等。那么我们可以采用内存消息队列来存储邮件实体,需要发送邮件的业务功能可以直接把封装好的邮件消息丢入队列中,另外再由专门负责邮件发送端去负责处理真正邮件发送的逻辑。基本架构类似上图,如下:

 

采用BlockingQueue实现内存消息队列
            
    
    
        javaactivemq阻塞队列blockingqueue

图2:邮件发送系统局部架构

 

在这种场景下面,我们采用基于消息队列的设计去实现有以下优点:

1、代码逻辑清晰且易使用

职责分离后,代码逻辑更清晰是必然的,并且在功能的使用上也更简单,比如如果是使用Spring技术的话,业务代码中只需要注入一个统一发送服务类即可。

2、业务和基础设施解耦

负责写日志和发邮件的业务模块逻辑只管把封装好的实体往消息队列中丢就好了,不用去关心日志是怎么存储的和邮件是怎么发送出去的。另外专门负责日志存储和邮件发送的基础设施层由于已经和业务解耦了,那么可以只关注于技术层面的设计和优化。

3、易于进行性能调优和设计

业务逻辑和技术基础逻辑进行解耦以后,两部分就是独立的功能,那么基础设施端可以更易于在此基础上进行性能调优和进行良好的架构设计,比如为了提高吞吐量,我们可以把消息的消费端(即日志存储端和邮件发送端)设计为多线程并发处理的方式。

 

实现消息队列的方法有很多种,成熟的产品比如RabbitMQActiveMQ 和 ZeroMQ,这些产品都是比较成熟的实现消息队列的方案,一般企业里面都是直接使用或者基于这些技术再开发出一套定制化的解决方案。但本文主题不是讨论这些技术,而是针对一种简单的场景,也就是无需使用复杂的消息队列产品,只是想建立一个便于使用的内存消息队列就可以满足需求的场景。这种场景一般有以下特点:

1、消息量不大

2、消息安全性不高

3、不需要HA

4、不需要完备的failover机制

 

下面我们来看下基于阻塞队列实现内存消息队列的基本原理图:

采用BlockingQueue实现内存消息队列
            
    
    
        javaactivemq阻塞队列blockingqueue

图3:基本原理图

统一消息发送端

封装了接收消息、消息合法性校验、消息转化、格式化序列化以及put入队列的逻辑,被业务代码所使用。

阻塞队列

基于BlockQueue实现,在外层做了一定封装。

消息接收消费端

负责读取和消费队列中的消息,在无消息时会进行阻塞等待,遇到异常时会交给异常处理机制进行处理。

异常处理机制

负责在消费消息时发生异常时的后续处理,比如把消息经过处理后重丢回队列或者存储入异常队列专门有一套异常处理流程进行处理等等,具体本文不做详细讨论。

 

一般在这种设计下的消费任务总是由一个专门的线程去监听队列并阻塞等待,而这个线程一般都是随应用启动而启动,所以原理图里的应用初始化启动就是这个意思,说明了消费端的线程是随着应用初始化而创建,并且是常驻的。

 

原理和设计说完了,下面以接收发送报警邮件的业务场景为例,贴一下关键代码:

以下是对BlockQueue进行封装后的队列,接收特定的实体。

/**
 * 报警阻塞队列
 * 
 * @author chongan.wangca
 */
public class AlarmMessageQueue {

    private Logger                        logger            = LoggerFactory.getLogger(AlarmMessageQueue.class);

    //队列大小
    public static final int               QUEUE_MAX_SIZE    = 100;

    private static AlarmMessageQueue      alarmMessageQueue = new AlarmMessageQueue();

    //阻塞队列
    private BlockingQueue<AlarmMessageVO> blockingQueue     = new LinkedBlockingQueue<AlarmMessageVO>(QUEUE_MAX_SIZE);

    public static AlarmMessageQueue getInstance() {
        return alarmMessageQueue;
    }

    /**
     * 消息入队
     * @param alarmMessageVO
     * @return
     */
    public boolean push(AlarmMessageVO alarmMessageVO) {
        return this.blockingQueue.offer(alarmMessageVO);
    }

    /**
     * 消息出队
     * @return
     */
    public AlarmMessageVO poll() {
        AlarmMessageVO result = null;
        try {
            result = this.blockingQueue.take();
        } catch (InterruptedException e) {
            logger.error("", e);
        }
        return result;
    }

    /**
     * 获取队列大小
     * @return
     */
    public int size() {
        return this.blockingQueue.size();
    }
}

 

消息消费端代码节选,示意了阻塞等待及消费消息的实现方式:

    /*
     * 无限循环阻塞等待及消费消息
     * @see java.lang.Runnable#run()
     */
    public void run() {
        while (true) {
            try {
                AlarmMessageVO alarmMessageVO = AlarmMessageQueue.getInstance().poll();
                process(alarmMessageVO);
            } catch (Exception e) {
                logger.error("Poll AlarmMessageVO from AlarmMessageQueue error or send alarm mail error.", e);
            }
        }
    }

注意笔者这里的业务场景不需要对消费异常的消息进行重试,但大家需要根据自己的业务场景去决定是否需要在catch里面进行异常处理流程。

下面再来看下消息发送端是如何使用的,其实很简单:

    public void moduleAlarm(ModuleResourceDO moduleResourceDO, List<ResourceHolder> resourceHolderList) {
        AlarmMessageVO alarmMessageVO = new AlarmMessageVO(moduleResourceDO, resourceHolderList);
        // 把报警的所需数据放进阻塞队列,交给新的负责发邮件的处理类进行异步处理,本方法尽可能以最快速度响应返回
        AlarmMessageQueue.getInstance().push(alarmMessageVO);
    }

最后在应用初始化时候把消费端启动起来,这点可以使用Spring的init-method配置下实现,但是这里有个小问题,因为消费端是阻塞的,所以直接配置的话会导致应用启动不起来,这里需要小技巧,即init-method配置的不是消费端,而是启动消费端的Service,在这个Service里面可以采用new Thrad的方式去把消费端启动起来。如:

/**
 * 用于启动异步发送报警信息线程,由于该执行是block的,无法直接使用spring init-method
 * 
 * @author chongan.wangca
 */
public class AsynAlarmServiceStarter {

    private AsyncAlarmService asyncAlarmService;

    public void init() {
        Thread asyncAlarmServiceThread = new Thread(asyncAlarmService);
        asyncAlarmServiceThread.start();
    }

    public void setAsyncAlarmService(AsyncAlarmService asyncAlarmService) {
        this.asyncAlarmService = asyncAlarmService;
    }

}

好,到这里本文要讲述的内容基本讲完了。大家如果遇到类似的场景不妨采用类似方案,这样会让你得到更多好处。但是话说回来,如果你的场景需要处理消息量很大,并且需要多个MQ,需要进行HA、failover等,那么建议你采用现有的例如RabbitMQ或ActiveMQ,像在阿里有基于ActiveMQ开发的Napoli,这些都是消息队列很好的技术产品。

 

(全文完)

 -------------------------------------

专注Java开发及其相关领域技术。致力于多线程、大并发、高性能、海量数据研究和学习。欢迎加入一起学习讨论。加入Q-群:253042038

 个人博客地址:http://wangchongan.com