欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

初识ActiveMQ

程序员文章站 2022-04-24 12:31:46
博主之前的一个高并发需求:Java并发(三):实例引出并发应用场景中所提到的,后来经过初步测试发现多线程并不能完全满足需求,特别是性能上的需求,或者说多线程不是比较好的解决方案,真实需求是:将商品库存(第三方数据库上)"及时"通知第三方的网购平台,达到同步商品余量信息的目的,本地是存儲了相应的阈值, ......

  博主之前的一个高并发需求:java并发(三):实例引出并发应用场景中所提到的,后来经过初步测试发现多线程并不能完全满足需求,特别是性能上的需求,或者说多线程不是比较好的解决方案,真实需求是:将商品库存(第三方数据库上)"及时"通知第三方的网购平台,达到同步商品余量信息的目的,本地是存儲了相应的阈值,在第三方数据库上的库存一旦少于库存,我们就认为这件商品已经售罄,因为要防止线上线下同一时间段销售引起的库存紧张,甚至订单已经发出但库存实际不足的情况...之前多线程定时访问库存并同步数据显然非常低效,主管老哥推荐我使用消息队列来解决问题,顿时一脸懵,消息队列是啥??

消息队列的基本概念:

  消息队列(英语:message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 ——*

初识ActiveMQ

 

博主使用的消息队列中间件是activemq,为什么用它呢?

  1. 多种语言和协议编写客户端。语言: java, c, c++, c#, ruby, perl, python, php。应用协议: openwire,stomp rest,wsnotification,xmpp,amqp
  2. 完全支持jms1.1和j2ee 1.4规范 (持久化,xa消息,事务)
  3. 对spring的支持,activemq可以很容易内嵌到使用spring的系统里面去,而且也支持spring2.0的特性
  4. 支持多种传送协议:in-vm,tcp,ssl,nio,udp,jgroups,jxta
  5. 支持通过jdbc和journal提供高速的消息持久化
  6. 从设计上保证了高性能的集群,客户端-服务器,点对点等等

以上的总结比较官方,概括来说,activemq的优势在于它是java语言开发,在基于spring的项目上容易内嵌,很大程度的减少耦合,提供可靠的任务异步处理.

 

activemq的通信模式:

1.点对点(queue)

  • 一个消息只能被一个服务接收
  • 消息一旦被消费,就会消失
  • 如果没有被消费,就会一直等待,直到被消费
  • 多个服务监听同一个消费空间,先到先得

2.发布/订阅模式(topic)

  • 一个消息可以被多个服务接收
  • 订阅一个主题的消费者,只能消费自它订阅之后发布的消息
  • 消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)

 

如何实现?

业务需求是用发布-订阅模式完成,我负责消费者部分的代码,一开始是这样实现的,五步走:

  1. 通过连接工厂获取连接对象
  2. 启动连接
  3. 创建session
  4. 创建队列或topic
  5. 注册消息监听

 

public class roundrobinconfig1 {

    private logger logger = loggerfactory.getlogger(getclass());

    @resource
    private inventoryservice inventoryservice;

    @scheduled(cron = "0 53 * * * ?")//每2分钟调度一次任务
    public void operation(){
        connectionfactory connectionfactory; // 连接工厂
        connection connection = null; // 连接
        session session; // 会话 接受或者发送消息的线程
        destination destination; // 消息的目的地
        messageconsumer consumer; //创建消费者

        // 实例化连接工厂
        connectionfactory=new activemqconnectionfactory(jmsconsumer.username, jmsconsumer.password, jmsconsumer.brokeurl);

        try {
            connection=connectionfactory.createconnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            /**
             * 这里的最好使用boolean.false,如果是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化情况。
             */
            session=connection.createsession(boolean.false, session.auto_acknowledge); // 创建session
//            destination=session.createqueue("firstqueue1"); // 创建消息队列
            destination=session.createtopic("firsttopic");
            consumer=session.createconsumer(destination);
            consumer.setmessagelistener(new mylistener()); // 注册消息监听
        } catch (exception e) {
            // todo auto-generated catch block
            e.printstacktrace();
        }
    }
}

具体业务逻辑写在listener里,大家使用时别忘了引入maven依赖

<!-- activemq -->
        <dependency>
            <groupid>org.apache.activemq</groupid>
            <artifactid>activemq-all</artifactid>
            <version>5.9.1</version>
        </dependency>

 

然后就进行初步测试,喜闻乐见地遇到问题了:

初识ActiveMQ

 

博主经过一通资料的查阅,依旧没有搞懂问题所在,最后问同事要来了生产者的代码,发现了问题可能出在这里:

stompjmsconnectionfactory factory = new stompjmsconnectionfactory();

生产者用了这个连接工厂获取连接,随即百度了一下stomp,了解到这其实是一种消息格式协议,另外还有amqp,openwire,mqtt等,几种消息协议的概述可以,我便换成了stompjmsconnection对象来获取连接,结果成功获取到消息体:

初识ActiveMQ

 

由于需要让订阅消息队列的程序一直运行,我采取官方推荐的死循环方式处理,并且使其在模块启动时运行,后来考虑了一下,万一死循环出现异常,那整个模块不就宕了吗,于是我给模块创建了一个子进程用来轮询消息队列,这样子进程就算挂了,整个模块也不受影响了:

import com.google.gson.gson;
import com.ycyz.framework.task.domain.inventorycache;
import com.ycyz.framework.task.service.inventorycacheservice;
import org.fusesource.hawtbuf.buffer;
import org.fusesource.stomp.jms.stompjmsconnectionfactory;
import org.fusesource.stomp.jms.stompjmsdestination;
import org.fusesource.stomp.jms.message.stompjmsbytesmessage;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.boot.applicationarguments;
import org.springframework.boot.applicationrunner;
import org.springframework.core.annotation.order;
import org.springframework.stereotype.component;

import javax.annotation.resource;
import javax.jms.*;
import java.util.hashmap;
import java.util.list;
import java.util.map;

/**
 * @author yhw
 * @classname: autorunner
 * @description:
 * @date 2019/1/7 8:27
 */

@order(value = 1)
@component
public class autorunner implements applicationrunner {

    private logger logger = loggerfactory.getlogger(getclass());

    map map = new hashmap(16);
    string result = null;
    string user = env("activemq_user", "");
    string password = env("activemq_password", "");
    string host = env("activemq_host", "域名");
    string destination = "/topic/bn.stock.prod";
    int port = integer.parseint(env("activemq_port", "端口号"));
    destination dest = new stompjmsdestination(destination);

    @resource
    private inventorycacheservice inventorycacheservice;

    gson gson = new gson();

    stompjmsconnectionfactory factory = new stompjmsconnectionfactory();

    @override
    public void run(applicationarguments args) throws exception{
        logger.info("开始运行了...");
        thread thread = new thread(){
            @override
            public void run() {
                messageconsumer consumer = null;
                connection connection = null;
                long start = 0l;
                long count = 0l;
                try {
                    factory.setbrokeruri("tcp://" + host + ":" + port);
                    connection = factory.createconnection(user, password);
                    connection.start();
                    session session = connection.createsession(false, session.auto_acknowledge);
                    consumer = session.createconsumer(dest);
                    start = system.currenttimemillis();
                    count = 1;
                    system.out.println("waiting for messages...");

                    while (true) {
                        system.out.println("轮询消息队列..");
                        message message = null;
                        try {
                            message = consumer.receive();
                            if (message instanceof stompjmsbytesmessage) {
                                stompjmsbytesmessage sm = (stompjmsbytesmessage) message;
                                buffer buffer = sm.getcontent();
                                byte[] a = buffer.getdata();
                                result = new string(a);if (result.contains("shutdown")) {
                                    long diff = system.currenttimemillis() - start;
                                    system.out.println(string.format("received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                                    break;
                                }
                                //result是获取到的消息字符串,这里开始处理它
                            }
                        }catch(exception e) {
                            e.printstacktrace();
                            continue;
                        }
                    }
                    connection.close();
                }catch(jmsexception e) {
                    e.printstacktrace();
                }
            }
        };
        thread.start();

    }

    private static string env(string key, string defaultvalue) {
        string rc = system.getenv(key);
        if( rc== null ){
            return defaultvalue;
        }
        return rc;
    }

    private static void flagtrigger(inventorycache inventorycache){
        if(new integer(1).equals(inventorycache.getflag())){
            inventorycache.setflag(0);
        }else{
            inventorycache.setflag(1);
        }
    }

    private static string getresult(string thewholemessage){
        int startflag = 0;
        int endflag = 0;
        for (int i = 0; i < thewholemessage.length(); i++) {
            if (thewholemessage.charat(i) == '{') {
                startflag = i;
            } else if (thewholemessage.charat(i) == '}') {
                endflag = i;
            }
        }
        return thewholemessage.substring(startflag, endflag + 1);
    }
}

这样就初步完成,只要部署了多个消费者,就可以有效提升系统性能了,生产者只管往队列里"塞"待处理消息,多个消费者只管"拿"消息来处理,做到了有效的应用程序解耦.

当然我也不确定还有没有更好的方案,博主才疏学浅,懂得太少,希望有看到的大牛能够不吝赐教,谢谢了