ActiveMQ的简单使用
程序员文章站
2022-07-13 15:44:07
...
ActiveMQ的简单使用
ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
相关文章:
范例项目: http://wosyingjun.iteye.com/blog/2312553
ActiveMQ集群高可用方案:http://wosyingjun.iteye.com/blog/2314683
ActiveMQ组成:
ActiveMQ接发送消息流程图:
一. ActiveMQ的安装和配置
1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
https://activemq.apache.org/download.html
2、解压安装
tar -zxvf apache-activemq-5.13.4-bin.tar.gz
3、配置(这里采用默认配置,无需修改)
vim /usr/lical/activemq-1/conf/activemq.xml
4、启动
cd /usr/local/activemq-1/bin
./activemq start
5、打开管理界面(管理界面可以查看并管理所有队列及消息)
http://192.168.1.100:8161
二. Spring结合ActiveMQ使用
1、pom文件引入依赖
<!--active mq start--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.13.3</version> </dependency> <!--active mq end-->
2、spring-mq配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}"/> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> <!-- 这里定义重试策略,注意:只有持久化的才会重试--> <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/> </bean> <!--这里设置各个消息队列的重发机制--> <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap"> <property name="redeliveryPolicyEntries"> <list> <ref bean="bizRedeliveryPolicy"/> <ref bean="mailRedeliveryPolicy"/> </list> </property> </bean> <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)--> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="1000"/> <property name="backOffMultiplier" value="2"/> <property name="useExponentialBackOff" value="true"/> <property name="destination" ref="bizQueue"/> </bean> <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重发次数 延时、延时系数、延时指数开关--> <property name="maximumRedeliveries" value="2"/> <property name="redeliveryDelay" value="5000"/> <property name="destination" ref="mailQueue"/> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="${mq.pool.maxConnections}"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> <property name="reconnectOnException" value="true"/> </bean> <!-- 队列目的地--> <bean id="bizQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${biz.queueName}"/> </bean> <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${mail.queueName}"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 这里配置2个,一个用于分布式业务,一个用于发送邮件--> <bean id="bizMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="bizQueue"/> <!-- 使 deliveryMode, priority, timeToLive设置生效--> <property name="explicitQosEnabled" value="true" /> <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失--> <property name="deliveryPersistent" value="true"/> <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的--> <property name="sessionTransacted" value="false"/> </bean> <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="mailQueue"/> <!-- 使 deliveryMode, priority, timeToLive设置生效--> <property name="explicitQosEnabled" value="true" /> <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失--> <property name="deliveryPersistent" value="true"/> <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的--> <property name="sessionTransacted" value="true"/> </bean> <!-- 消息监听实现方法一 --> <bean id="bizListener" class="com.yingjun.ssm.mq.listener.TransactionBizMessageListener"/> <bean id="mailListener" class="com.yingjun.ssm.mq.listener.MailMessageListener"/> <!-- 消息接收监听器用于异步接收消息--> <bean id="bizContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="bizQueue"/> <property name="messageListener" ref="bizListener"/> <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的--> <property name="sessionTransacted" value="true"/> <property name="concurrentConsumers" value="1"/> </bean> <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="mailQueue"/> <property name="messageListener" ref="mailListener"/> <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的--> <property name="sessionTransacted" value="true"/> <property name="concurrentConsumers" value="1"/> </bean> </beans>
3、重试机制以及死信的配置
<!--这里设置各个消息队列的重发机制--> <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap"> <property name="redeliveryPolicyEntries"> <list> <ref bean="bizRedeliveryPolicy"/> <ref bean="mailRedeliveryPolicy"/> </list> </property> </bean> <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)--> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="1000"/> <property name="backOffMultiplier" value="2"/> <property name="useExponentialBackOff" value="true"/> <property name="destination" ref="bizQueue"/> </bean> <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重发次数 延时、延时系数、延时指数开关--> <property name="maximumRedeliveries" value="2"/> <property name="redeliveryDelay" value="5000"/> <property name="destination" ref="mailQueue"/> </bean>
4、发送端代码
package com.yingjun.ssm.biz; import com.alibaba.fastjson.JSONObject; import com.yingjun.ssm.common.model.BizOperator; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * @author yingjun */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:application.xml") public class Application { private final Logger log = LoggerFactory.getLogger(Application.class); @Autowired private JmsTemplate bizMqJmsTemplate; @Test public void mailSend() throws Exception { bizMqJmsTemplate.setSessionTransacted(true); for (int i = 0; i < 1; i++) { log.info("==>send message" + i); bizMqJmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { log.info("getTransacted:" + session.getTransacted()); BizOperator operator = new BizOperator("testDistributedTransaction", 1001); return session.createTextMessage(JSONObject.toJSONString(operator)); } }); log.info("==>finish send message"+ i); } while (true) { } } }
5、接受端代码
package com.yingjun.ssm.mq.listener; import com.alibaba.fastjson.JSONObject; import com.yingjun.ssm.common.model.BizOperator; import com.yingjun.ssm.mq.biz.TransactionBizService; import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * * @author yingjun */ @Component public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> { private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class); private final String transactionBiz = "testDistributedTransaction"; @Autowired private TransactionBizService transactionBizService; /** * @param message * @param session */ public void onMessage(Message message, Session session) throws JMSException{ //这里建议不要try catch,让异常抛出,通过redeliveryPolicy去重试,达到重试次数进入死信DLQ(Dead Letter Queue) ActiveMQTextMessage msg = (ActiveMQTextMessage) message; String ms = ms = msg.getText(); log.info("==>receive message:" + ms); // 转换成相应的对象 BizOperator operator = JSONObject.parseObject(ms, BizOperator.class); if (operator != null && transactionBiz.equals(operator.getOperator())) { transactionBizService.addScoreBySyn(100); //throw new RuntimeException("test redeliveryPolicy"); } else { log.info("==>message:" + ms + " no about operator!"); } } }
如上所以就完成了Spring结合ActiveMQ的简单实现,完整代码可在文章最上头的范例项目中找到。