欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring 整合 ActiveMq

程序员文章站 2022-07-10 15:03:34
...

Spring 整合 ActiveMq

整合步骤如下:

 

  1. 添加依赖
  2. 连接 mq 消息服务器
  3. 定义生产者/消费者
  4. 发送/接收消息

添加依赖

<properties>
    <spring_version>4.2.4.RELEASE</spring_version>
  </properties>

  <dependencies>
    <!--Spring-->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.2.4.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <!--ActiveMq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.3</version>
    </dependency>

    <!--<dependency>-->
      <!--<groupId>org.apache.commons</groupId>-->
      <!--<artifactId>commons-pool2</artifactId>-->
      <!--<version>2.5</version>-->
    <!--</dependency>-->

    <!--servlet-->
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>servlet-api</artifactId>
      <version>RELEASE</version>
      <scope>provided</scope>
    </dependency>
    <!--Test-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <!--fast Json-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.47</version>
    </dependency>
  </dependencies>

 

连接服务器

定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);

 

/**
 * author: getthrough
 * date: 2018/3/8
 * description: 连接工厂的包装类
 */
public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory {
    /**apache 提供的连接池*/
//    private PooledConnectionFactory pooledConnectionFactory;
    private String brokerURL;
    private String userName;
    private String password;
    private String maxConntection;

    private ActiveMQConnectionFactory activeMQConnectionFactory;

    public ActiveMqConnectionFactoryDecoration() {
    }

    public void run() throws JMSException {
        activeMQConnectionFactory.setBrokerURL(brokerURL);
        activeMQConnectionFactory.setUserName(userName);
        activeMQConnectionFactory.setPassword(password);
        activeMQConnectionFactory.createConnection();
//        pooledConnectionFactory = new PooledConnectionFactory();
//        pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection));
//        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
//        pooledConnectionFactory.createConnection(userName, password);
    }

    public void stop() {
//        if (null != pooledConnectionFactory) {
//            pooledConnectionFactory.stop();
//        }
    }

    public Connection createConnection() throws JMSException {
//        return pooledConnectionFactory.createConnection();
        return activeMQConnectionFactory.createConnection();
    }

    public Connection createConnection(String userName, String password) throws JMSException {
//        return pooledConnectionFactory.createConnection(userName, password);
        return activeMQConnectionFactory.createConnection(userName, password);
    }

    public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        this.activeMQConnectionFactory = activeMQConnectionFactory;
    }

// ... 其他属性的getters&setters

 

在 spring 配置文件中定义这个 bean:

<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration ">
        <property name="activeMQConnectionFactory">
            <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/>
        </property>
        <property name="brokerURL" value="${brokerURL}"/>
        <property name="userName" value="${userName}"/>
        <property name="password" value="${password}"/>
        <property name="maxConntection" value="${maxConntection}"/>
    </bean>

 

创建生产者/消费者

 创建一个消息发送类, 简单包装下发送消息流程

 

/**
 * author: getthrough
 * date: 2018/3/8
 * description:
 */
public class ActiveMqSender {
    private static JmsTemplate jmsTemplate;
    private static Destination destination;

    public static void sendMqMessage(final String content) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(content);
            }
        });
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }
}

 创建一个监听器, 实现 MessageListener 接口

 

/**
 * author: getthrough
 * date: 2018/3/8
 * description:
 */
// 此处ParentMessageListener实现了 MessageListener接口
public class QueueMessageListener extends ParentMessageListener {
    private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);

    /**
     * 消息前处理
     */
    public void beforeHandling() {
        // doSomething ...
        logger.info("before hanlding queue msg ...");
    }

    /**
     * 消息后处理
     */
    public void afterHandling() {
        // doSomething ...
        logger.info("after hanlding queue msg ...");
    }

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
        } catch (JMSException e) {
            logger.info("########## failed to get message text! ###########");
            e.printStackTrace();
        }
    }
}
 
在配置文件中定义生产者和消费者

 

    <!--Spring 提供的消息模板-->
    <bean id="springJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/>
    </bean>

    <!--定义一个队列的地址-->
    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="com.getthrough.spring_activemq.queue"/>
    </bean>

    <!--消息生产者-->
    <bean id="producer1" class="mq.ActiveMqSender" name="producer1">
        <!--将引用的模板和队列地址注入到助手类的字段中-->
        <property name="jmsTemplate" ref="springJmsTemplate"/>
        <property name="destination" ref="queue"/>
    </bean>

    <!--消息监听器-->
    <bean id="queueMessageListener" class="mq.listener.QueueMessageListener"/>

    <!--消息消费者-->
    <bean id="consumer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/>
        <property name="destination" ref="queue"/>
        <property name="messageListener" ref="queueMessageListener"/>
    </bean>

 

发送消息

        使用 servlet 在 tomcat 启动时发送消息(仅用于演示)

/**
 * author: getthrough
 * date: 2018/3/12
 * description:
 */
public class StarterListener implements ServletContextListener {

    private Logger logger = LoggerFactory.getLogger(StarterListener.class);

    public void contextInitialized(ServletContextEvent sce) {
        WebApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
        ActiveMqConnectionFactoryDecoration connectionFactory =
                (ActiveMqConnectionFactoryDecoration) context.getBean("activeMqConnectionFactoryDecoration");
        try {
            // 连接 mq server
            connectionFactory.run();
            logger.info("###############");
            ActiveMqSender.sendMqMessage("hey, now is" + new Date() + "!");
        } catch (JMSException e) {
            logger.info("####### failed to connect mq server! ########");
            e.printStackTrace();
        }
    }

    public void contextDestroyed(ServletContextEvent sce) {
        // doSomething
    }
}

 

接收消息

控制台打印了日志

########### consumer1 has receive the message :hey, now isTue Mar 20 23:48:55 CST 2018! ############

 

再发消息

在程序运行的情况下,创建一个测试类,用于发送一条消息

 

/**
 * author: getthrough
 * date: 2018/3/12
 * description:
 */
public class ProducerTest {

    @Test
    public void testSendMq() throws JMSException {
        ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();
        mqConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
        // 创建连接
        Connection connection = mqConnectionFactory.createConnection();
        // 开启连接
        connection.start();
        // arg1:是否开启事务; arg2:确认模式--自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 通过session 创建 队列
        Queue queue = session.createQueue("com.getthrough.spring_activemq.queue");
        // 通过 session 创建消费者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 默认即持久化
        producer.send(session.createTextMessage("It's the message from test producer !"));
        // 关闭连接(会级联关闭session),释放资源
        connection.close();
    }

}

 

运行测试类, 主程序控制台输出

 

########### consumer1 has receive the message :It's the message from test producer ! ############

 

至此, 基础整合完毕.

 

优化:

可以看到, 目前对于消息的内容以及消费的方式并没有一个具体的格式, 无论是在不同模块之间还是异构的系统之间的消息传递, 定义一个具体的数据格式是十分重要的. 

因此, 需要定义一个 mq 消息对象, 例如

 

/**
 * author: getthrough
 * date: 2018/3/21
 * description:
 */
public class MessageContent {
    private String mqName;// 该消息的名称,用来具体做什么处理
    private String content;// 消息具体内容

    public MessageContent(){}

    public MessageContent(String mqName, String content) {
        this.mqName = mqName;
        this.content = content;
    }

    public String getMqMsgMethod() {
        return mqName;
    }

    public void setMqMsgMethod(String mqName) {
        this.mqName = mqName;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

}

 

针对 mq 消息实体中的 mq 方法, 可以做一个 方法----服务类的映射, 这种映射关系, 这里使用枚举实现:

 

/**
 * author: getthrough
 * date: 2018/3/21
 * description:
 */
public enum MQMsgMethod {
    USER_ADD("user.add","userService"),// 方法的名称, 和具体处理该方法的服务类
    USER_DELETE("user.delete","userService"),
    ORDER_UPDATE("order.update","orderService");// 此处user 和 order 应属于不同的队列

    private String methodName;
    private String serviceName;

    // 该构造主要用于消息的消费
    MQMsgMethod(String methodName, String serviceName) {
        this.methodName = methodName;
        this.serviceName = serviceName;
    }
    // 该构造主要用于消息的发送
    MQMsgMethod(String methodName) {
        this.methodName = methodName;
        this.serviceName = null;
    }

    /**
     * 根据方法名称获取服务类
     * @param methodName
     * @return
     */
    public static MQMsgMethod getMQMsgMethodByMethodName(String methodName) {
        MQMsgMethod[] mqMsgMethods = MQMsgMethod.values();
        MQMsgMethod result = null;
        if (mqMsgMethods.length > 0) {
            for (MQMsgMethod mqMsgMethod : mqMsgMethods) {
                if (mqMsgMethod.getMethodName().equals(methodName)) {
                    result = mqMsgMethod;
                    break;
                }
            }
        }
        return result;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }
}

 

然后改写监听器中对消费消息的方式:

 

    /**
     * 这里假设这个 listener 是监听 user 队列
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
            // 将接受到的消息转换为java对象
            MessageContent messageContent = (MessageContent)JSONObject.parseObject(textMessage.getText(), MessageContent.class);
            // 获取mq消息内容,并转化为java bean
            User user = (User) JSONObject.parseObject(messageContent.getContent(), User.class);
            // 获取mq方法
            String mqMethod = messageContent.getMqMsgMethod();
            // 根据mq方法获取处理该消息对应的服务类的名称
            MQMsgMethod mqMsgMethod = MQMsgMethod.getMQMsgMethodByMethodName(mqMethod);
            String serviceName = mqMsgMethod.getServiceName();
            // 获得服务类
            UserService userService = (UserService) MyApplicationContext.getBean(serviceName);
            // 调用服务类方法
            switch (mqMsgMethod) {
                case USER_ADD :
                    userService.addUser(user);
                    break;
                case USER_DELETE:
                    userService.deleteUser(user);
                    break;
                default:
                    break;
            }

        } catch (JMSException e) {
            logger.info("########## failed to get message text! ###########");
            e.printStackTrace();
        }
    }

 

输出结果

 

########### consumer1 has receive the message :{"content":"{\"address\":\"zhejiang\",\"age\":18,\"email\":\"123@gmail.com\",\"username\":\"lina\"}","mqMsgMethod":"user.add"} ############
...// 省略
adding user......
 

 

至此演示完毕.

 

一个疑问

本人在使用 PooledConnectionFactory 创建连接时总是报一个错误,如下

 

严重: Exception sending context initialized event to listener instance of class mq.listener.StarterListener
java.lang.NoClassDefFoundError: org/apache/commons/pool2/KeyedPooledObjectFactory
	at mq.ActiveMqConnectionFactoryDecoration.run(ActiveMqConnectionFactoryDecoration.java:33)
	at mq.listener.StarterListener.contextInitialized(StarterListener.java:33)
	at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5068)
	at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5584)
	at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:147)
	at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:899)
	at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:875)
	at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652)
	at org.apache.catalina.startup.HostConfig.manageApp(HostConfig.java:1863)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
	at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:618)
	at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:565)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
	at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
	at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
	at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
	at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
	at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:324)
	at sun.rmi.transport.Transport$1.run(Transport.java:200)
	at sun.rmi.transport.Transport$1.run(Transport.java:197)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.KeyedPooledObjectFactory
	at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1858)
	at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1701)
	... 47 more

 

一开始怀疑时 jar 包存在冲突, 依赖树如下:

 

[INFO] ------------------------------------------------------------------------
[INFO] Building spring_activemq Maven Webapp 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ spring_activemq ---
[INFO] com.getthrough.spring_activemq:spring_activemq:war:1.0-SNAPSHOT
[INFO] +- org.springframework:spring-context:jar:4.2.4.RELEASE:compile
[INFO] |  +- org.springframework:spring-aop:jar:4.2.4.RELEASE:compile
[INFO] |  |  \- aopalliance:aopalliance:jar:1.0:compile
[INFO] |  +- org.springframework:spring-beans:jar:4.2.4.RELEASE:compile
[INFO] |  +- org.springframework:spring-core:jar:4.2.4.RELEASE:compile
[INFO] |  |  \- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  \- org.springframework:spring-expression:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-web:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-jms:jar:4.2.4.RELEASE:compile
[INFO] |  +- org.springframework:spring-messaging:jar:4.2.4.RELEASE:compile
[INFO] |  \- org.springframework:spring-tx:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-test:jar:4.2.4.RELEASE:compile
[INFO] +- org.apache.activemq:activemq-all:jar:5.15.3:compile
[INFO] +- org.apache.commons:commons-pool2:jar:2.3:compile
[INFO] +- javax.servlet:servlet-api:jar:3.0-alpha-1:provided
[INFO] +- junit:junit:jar:4.12:test
[INFO] |  \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] \- com.alibaba:fastjson:jar:1.2.47:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

 

似乎也没有.

更换过commons-pool2 版本为2.5问题还存在.

 

尝试将 commons-pool2 依赖删除(active-all 中存在该类库), 也无效.

 

有点蒙

 

Any advice will be helpful, thanks very much !

 

示例代码地址

https://github.com/Getthrough/spring_activemq