Spring 整合 ActiveMq
程序员文章站
2022-07-10 15:03:34
...
Spring 整合 ActiveMq
整合步骤如下:
- 添加依赖
- 连接 mq 消息服务器
- 定义生产者/消费者
- 发送/接收消息
添加依赖
<properties> <spring_version>4.2.4.RELEASE</spring_version> </properties> <dependencies> <!--Spring--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring_version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring_version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring_version}</version> </dependency> <!--ActiveMq--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.commons</groupId>--> <!--<artifactId>commons-pool2</artifactId>--> <!--<version>2.5</version>--> <!--</dependency>--> <!--servlet--> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>RELEASE</version> <scope>provided</scope> </dependency> <!--Test--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--fast Json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies>
连接服务器
定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);
/** * author: getthrough * date: 2018/3/8 * description: 连接工厂的包装类 */ public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory { /**apache 提供的连接池*/ // private PooledConnectionFactory pooledConnectionFactory; private String brokerURL; private String userName; private String password; private String maxConntection; private ActiveMQConnectionFactory activeMQConnectionFactory; public ActiveMqConnectionFactoryDecoration() { } public void run() throws JMSException { activeMQConnectionFactory.setBrokerURL(brokerURL); activeMQConnectionFactory.setUserName(userName); activeMQConnectionFactory.setPassword(password); activeMQConnectionFactory.createConnection(); // pooledConnectionFactory = new PooledConnectionFactory(); // pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection)); // pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); // pooledConnectionFactory.createConnection(userName, password); } public void stop() { // if (null != pooledConnectionFactory) { // pooledConnectionFactory.stop(); // } } public Connection createConnection() throws JMSException { // return pooledConnectionFactory.createConnection(); return activeMQConnectionFactory.createConnection(); } public Connection createConnection(String userName, String password) throws JMSException { // return pooledConnectionFactory.createConnection(userName, password); return activeMQConnectionFactory.createConnection(userName, password); } public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { this.activeMQConnectionFactory = activeMQConnectionFactory; } // ... 其他属性的getters&setters
在 spring 配置文件中定义这个 bean:
<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration "> <property name="activeMQConnectionFactory"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> </property> <property name="brokerURL" value="${brokerURL}"/> <property name="userName" value="${userName}"/> <property name="password" value="${password}"/> <property name="maxConntection" value="${maxConntection}"/> </bean>
创建生产者/消费者
创建一个消息发送类, 简单包装下发送消息流程
/** * author: getthrough * date: 2018/3/8 * description: */ public class ActiveMqSender { private static JmsTemplate jmsTemplate; private static Destination destination; public static void sendMqMessage(final String content) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(content); } }); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setDestination(Destination destination) { this.destination = destination; } }
创建一个监听器, 实现 MessageListener 接口
/** * author: getthrough * date: 2018/3/8 * description: */ // 此处ParentMessageListener实现了 MessageListener接口 public class QueueMessageListener extends ParentMessageListener { private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class); /** * 消息前处理 */ public void beforeHandling() { // doSomething ... logger.info("before hanlding queue msg ..."); } /** * 消息后处理 */ public void afterHandling() { // doSomething ... logger.info("after hanlding queue msg ..."); } @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############"); } catch (JMSException e) { logger.info("########## failed to get message text! ###########"); e.printStackTrace(); } } }
在配置文件中定义生产者和消费者
<!--Spring 提供的消息模板--> <bean id="springJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/> </bean> <!--定义一个队列的地址--> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="com.getthrough.spring_activemq.queue"/> </bean> <!--消息生产者--> <bean id="producer1" class="mq.ActiveMqSender" name="producer1"> <!--将引用的模板和队列地址注入到助手类的字段中--> <property name="jmsTemplate" ref="springJmsTemplate"/> <property name="destination" ref="queue"/> </bean> <!--消息监听器--> <bean id="queueMessageListener" class="mq.listener.QueueMessageListener"/> <!--消息消费者--> <bean id="consumer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/> <property name="destination" ref="queue"/> <property name="messageListener" ref="queueMessageListener"/> </bean>
发送消息
使用 servlet 在 tomcat 启动时发送消息(仅用于演示)
/** * author: getthrough * date: 2018/3/12 * description: */ public class StarterListener implements ServletContextListener { private Logger logger = LoggerFactory.getLogger(StarterListener.class); public void contextInitialized(ServletContextEvent sce) { WebApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); ActiveMqConnectionFactoryDecoration connectionFactory = (ActiveMqConnectionFactoryDecoration) context.getBean("activeMqConnectionFactoryDecoration"); try { // 连接 mq server connectionFactory.run(); logger.info("###############"); ActiveMqSender.sendMqMessage("hey, now is" + new Date() + "!"); } catch (JMSException e) { logger.info("####### failed to connect mq server! ########"); e.printStackTrace(); } } public void contextDestroyed(ServletContextEvent sce) { // doSomething } }
接收消息
控制台打印了日志
########### consumer1 has receive the message :hey, now isTue Mar 20 23:48:55 CST 2018! ############
再发消息
在程序运行的情况下,创建一个测试类,用于发送一条消息
/** * author: getthrough * date: 2018/3/12 * description: */ public class ProducerTest { @Test public void testSendMq() throws JMSException { ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(); mqConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616"); // 创建连接 Connection connection = mqConnectionFactory.createConnection(); // 开启连接 connection.start(); // arg1:是否开启事务; arg2:确认模式--自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 通过session 创建 队列 Queue queue = session.createQueue("com.getthrough.spring_activemq.queue"); // 通过 session 创建消费者 MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 默认即持久化 producer.send(session.createTextMessage("It's the message from test producer !")); // 关闭连接(会级联关闭session),释放资源 connection.close(); } }
运行测试类, 主程序控制台输出
########### consumer1 has receive the message :It's the message from test producer ! ############
至此, 基础整合完毕.
优化:
可以看到, 目前对于消息的内容以及消费的方式并没有一个具体的格式, 无论是在不同模块之间还是异构的系统之间的消息传递, 定义一个具体的数据格式是十分重要的.
因此, 需要定义一个 mq 消息对象, 例如
/** * author: getthrough * date: 2018/3/21 * description: */ public class MessageContent { private String mqName;// 该消息的名称,用来具体做什么处理 private String content;// 消息具体内容 public MessageContent(){} public MessageContent(String mqName, String content) { this.mqName = mqName; this.content = content; } public String getMqMsgMethod() { return mqName; } public void setMqMsgMethod(String mqName) { this.mqName = mqName; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
针对 mq 消息实体中的 mq 方法, 可以做一个 方法----服务类的映射, 这种映射关系, 这里使用枚举实现:
/** * author: getthrough * date: 2018/3/21 * description: */ public enum MQMsgMethod { USER_ADD("user.add","userService"),// 方法的名称, 和具体处理该方法的服务类 USER_DELETE("user.delete","userService"), ORDER_UPDATE("order.update","orderService");// 此处user 和 order 应属于不同的队列 private String methodName; private String serviceName; // 该构造主要用于消息的消费 MQMsgMethod(String methodName, String serviceName) { this.methodName = methodName; this.serviceName = serviceName; } // 该构造主要用于消息的发送 MQMsgMethod(String methodName) { this.methodName = methodName; this.serviceName = null; } /** * 根据方法名称获取服务类 * @param methodName * @return */ public static MQMsgMethod getMQMsgMethodByMethodName(String methodName) { MQMsgMethod[] mqMsgMethods = MQMsgMethod.values(); MQMsgMethod result = null; if (mqMsgMethods.length > 0) { for (MQMsgMethod mqMsgMethod : mqMsgMethods) { if (mqMsgMethod.getMethodName().equals(methodName)) { result = mqMsgMethod; break; } } } return result; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } }
然后改写监听器中对消费消息的方式:
/** * 这里假设这个 listener 是监听 user 队列 * @param message */ @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############"); // 将接受到的消息转换为java对象 MessageContent messageContent = (MessageContent)JSONObject.parseObject(textMessage.getText(), MessageContent.class); // 获取mq消息内容,并转化为java bean User user = (User) JSONObject.parseObject(messageContent.getContent(), User.class); // 获取mq方法 String mqMethod = messageContent.getMqMsgMethod(); // 根据mq方法获取处理该消息对应的服务类的名称 MQMsgMethod mqMsgMethod = MQMsgMethod.getMQMsgMethodByMethodName(mqMethod); String serviceName = mqMsgMethod.getServiceName(); // 获得服务类 UserService userService = (UserService) MyApplicationContext.getBean(serviceName); // 调用服务类方法 switch (mqMsgMethod) { case USER_ADD : userService.addUser(user); break; case USER_DELETE: userService.deleteUser(user); break; default: break; } } catch (JMSException e) { logger.info("########## failed to get message text! ###########"); e.printStackTrace(); } }
输出结果
########### consumer1 has receive the message :{"content":"{\"address\":\"zhejiang\",\"age\":18,\"email\":\"123@gmail.com\",\"username\":\"lina\"}","mqMsgMethod":"user.add"} ############ ...// 省略 adding user......
至此演示完毕.
一个疑问
本人在使用 PooledConnectionFactory 创建连接时总是报一个错误,如下
严重: Exception sending context initialized event to listener instance of class mq.listener.StarterListener java.lang.NoClassDefFoundError: org/apache/commons/pool2/KeyedPooledObjectFactory at mq.ActiveMqConnectionFactoryDecoration.run(ActiveMqConnectionFactoryDecoration.java:33) at mq.listener.StarterListener.contextInitialized(StarterListener.java:33) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5068) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5584) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:147) at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:899) at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:875) at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652) at org.apache.catalina.startup.HostConfig.manageApp(HostConfig.java:1863) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:618) at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:565) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468) at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:324) at sun.rmi.transport.Transport$1.run(Transport.java:200) at sun.rmi.transport.Transport$1.run(Transport.java:197) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:196) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.KeyedPooledObjectFactory at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1858) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1701) ... 47 more
一开始怀疑时 jar 包存在冲突, 依赖树如下:
[INFO] ------------------------------------------------------------------------ [INFO] Building spring_activemq Maven Webapp 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ spring_activemq --- [INFO] com.getthrough.spring_activemq:spring_activemq:war:1.0-SNAPSHOT [INFO] +- org.springframework:spring-context:jar:4.2.4.RELEASE:compile [INFO] | +- org.springframework:spring-aop:jar:4.2.4.RELEASE:compile [INFO] | | \- aopalliance:aopalliance:jar:1.0:compile [INFO] | +- org.springframework:spring-beans:jar:4.2.4.RELEASE:compile [INFO] | +- org.springframework:spring-core:jar:4.2.4.RELEASE:compile [INFO] | | \- commons-logging:commons-logging:jar:1.2:compile [INFO] | \- org.springframework:spring-expression:jar:4.2.4.RELEASE:compile [INFO] +- org.springframework:spring-web:jar:4.2.4.RELEASE:compile [INFO] +- org.springframework:spring-jms:jar:4.2.4.RELEASE:compile [INFO] | +- org.springframework:spring-messaging:jar:4.2.4.RELEASE:compile [INFO] | \- org.springframework:spring-tx:jar:4.2.4.RELEASE:compile [INFO] +- org.springframework:spring-test:jar:4.2.4.RELEASE:compile [INFO] +- org.apache.activemq:activemq-all:jar:5.15.3:compile [INFO] +- org.apache.commons:commons-pool2:jar:2.3:compile [INFO] +- javax.servlet:servlet-api:jar:3.0-alpha-1:provided [INFO] +- junit:junit:jar:4.12:test [INFO] | \- org.hamcrest:hamcrest-core:jar:1.3:test [INFO] \- com.alibaba:fastjson:jar:1.2.47:compile [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS
似乎也没有.
更换过commons-pool2 版本为2.5问题还存在.
尝试将 commons-pool2 依赖删除(active-all 中存在该类库), 也无效.
有点蒙
Any advice will be helpful, thanks very much !
示例代码地址
https://github.com/Getthrough/spring_activemq