ActiveMQ学习记录(三)
继续学习ActiveMQ,这节就讲讲ActiveMQ和spring的结合了,直接开始吧。
第一步:创建Maven工程,然后在pom中引入相关的配置(主要是dependencies):
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
第二步:创建一些文件,整个工程结构如图所示
第三步:编写activemq.properties、spring-activemq.xml和spring-context.xml内容,内容注释代码里都有写
activemq.properties:
#ActiveMQ Config
activemq.brolerURL=tcp://127.0.0.1:61616
activemq.username=admin
activemq.password=admin
activemq.pool.maxConnections=10
activemq.queueName=myspringqueue
spring-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<!--读入配置文件activemq.properties-->
<context:property-placeholder location="classpath:activemq.properties" />
<!--开启注解配置-->
<context:annotation-config />
<!--扫描-->
<context:component-scan base-package="com.activemq.*" />
<import resource="classpath:spring-activemq.xml" />
</beans>
spring-activemq.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<!--创建ActiveMQConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.brolerURL}" />
<property name="userName" value="${activemq.username}" />
<property name="password" value="${activemq.password}" />
</bean>
<!--ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们资源消耗
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="${activemq.pool.maxConnections}" />
</bean>
<!--spring用于管理真正的ConnectionFactory-->
<bean id="myconnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!--目的地-->
<bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${activemq.queueName}</value>
</constructor-arg>
</bean>
<!--spring提供jms工具类,他可以进行消息的发送、接收等-->
<!--queue模式-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--这个connectionFactory对应的是真正的ConnectionFactory,即我们自己定义的spring提供的那个ConnectionFactory对象-->
<property name="connectionFactory" ref="myconnectionFactory"/>
<property name="defaultDestinationName" value="${activemq.queueName}" />
</bean>
<!--配置自定义监听:MessageListener-->
<bean id="msgQueueMessageListener" class="com.activemq.Consumer"></bean>
<!--将连接工厂、目标队列、自定义监听注入jms模板,其实就是配置消费者通过messageListener接受消息-->
<bean id="sessionAeareLinstenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="myconnectionFactory"/>
<property name="destination" ref="msgQueue"/>
<property name="messageListener" ref="msgQueueMessageListener"/>
</bean>
</beans>
从ActiveMQConectionFactory到PooledConnectionFactory,到Spring提供的SingleConnectionFactory,就是一个适配的过程。
配置connectionFactory是Spring用于创建到JMS服务器链接的,Spring提供了多种connectionFactory.具体常见如下:
1.PooledConnectionFactory: JmsTemlate每次发送消息时只会缓存connection,session和productor,不会缓存consumer。因此只适合于生产者发送消息,这个只是在要求消息处理的及时性不是特别高的情况下。
2.SingleConnectionFactory: 对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。3.CachingConnectionFactory: 继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。
注意Spring的套路经常是这样的,提供XxxTemplate,比如HibernateTemplate,对于JMS,提供了JmsTemplate。
生产者应该持有JmsTemplate进行发送消息。
消费者,提供监听器、监听的目的地、连接工厂即可。
第四步:编写Producer和Consumer
Producer:
public class Producer {
public void sendMessage(JmsTemplate jmsTemplate,final String info){
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(info);
}
});
}
}
Consumer:
public class Consumer implements SessionAwareMessageListener{
@Override
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage){
System.out.println("consumer get msg :" + ((TextMessage) message).getText() );
}
}
}
第五步:编写Test测试类进行测试。
Test:
public class Test{
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate");
Producer producer = new Producer();
producer.sendMessage(jmsTemplate,"producer send a message");
}
}
运行结果:
这样就是一个简单的spring+ActiveMQ的结合了。
下面在看一下JmsTemplate属性和其父类JmsDestinationAccessor
可以看到这个参数pubSubDomain=false,说明默认是使用点对点(P2P)模式,如果将这个设置成true就变成了主题模式
那接下来来尝试下发布订阅怎么实现的,其中知识已在注释在添加。
第一步,添加一个ConsumerTwo,内容是:
public class Consumer implements SessionAwareMessageListener{
@Override
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage){
System.out.println("consumer get msg :" + ((TextMessage) message).getText() );
}
}
}
第二步:修改activemq.xml(修改内容较多,直接附上全部代码):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<!--创建ActiveMQConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.brolerURL}" />
<property name="userName" value="${activemq.username}" />
<property name="password" value="${activemq.password}" />
<!--异步发送-->
<property name="useAsyncSend" value="true" />
</bean>
<!--ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们资源消耗
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="${activemq.pool.maxConnections}" />
</bean>
<!--spring用于管理真正的ConnectionFactory的ConnectionFactory
1.SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。
2.CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,
同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。
我们使用CachingConnectionFactory来作为示例。
-->
<!--producerConnectionFactory和consumerOneConnectionFactory区别是多设置了clientId,理由就不解释了,上一篇有讲-->
<bean id="producerConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
<!--Session缓存数量
<property name="sessionCacheSize" value="10" />
-->
</bean>
<!--目的地 topic模式-->
<bean id="msgTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg>
<!--订阅多个topic,则以英文逗号’,’隔开实现订阅多个topic。-->
<value>myTestSpringTopic</value>
</constructor-arg>
</bean>
<!--下面是生产者的配置-->
<!--消息发送者客户端,spring提供jms工具类,他可以进行消息的发送、接收等-->
<!--topic模式-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--这个connectionFactory对应的是我们自己定义的spring提供的那个ConnectionFactory对象-->
<property name="connectionFactory" ref="producerConnectionFactory"/>
<!--注意,和之前的P2P模式配置不一样,这里是defaultDestination,不是defaultDestinationName-->
<property name="defaultDestination" ref="msgTopic" />
<!--知识点一介绍了,默认是false,表示的是P2P模式,所以这里设置JmsTemplate这个属性为true,即Topic模式-->
<property name="pubSubDomain" value="true"/>
<!--其他属性配置-->
<property name="receiveTimeout" value="10000" />
<property name="explicitQosEnabled" value="true"/>
<property name="deliveryMode" value="2" />
<!--其他属性介绍:
deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false
<property name="explicitQosEnabled" value="true"/>
DeliveryMode.NON_PERSISTENT=1:非持久化 ;
DeliveryMode.PERSISTENT=2:持久化
<property name="deliveryMode" value="2" />
带事务的,那么配置(如果session带有事务,并且事务成功提交,则消息被自动签收。
如果事务回滚,则消息会被再次传送。):
javax.jms.Session.SESSION_TRANSACTED = 0
不带事务,几种配置:(不带事务的session的签收方式,取决于session的配置)
javax.jms.Session.AUTO_ACKNOWLEDGE = 1 消息自动签收
javax.jms.Session.CLIENT_ACKNOWLEDGE = 2 客户端调用acknowledge方法手动签收
javax.jms.Session.DUPS_OK_ACKNOWLEDGE = 3 不必必须签收
<property name="sessionAcknowledgeMode" value="1" />
-->
</bean>
<!--下面是消费者的配置-->
<bean id="consumerOneConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
<property name="clientId" value="consumer-one" />
<!--Session缓存数量
<property name="sessionCacheSize" value="10" />
-->
</bean>
<bean id="consumerTwoConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
<property name="clientId" value="consumer-two" />
<!--Session缓存数量
<property name="sessionCacheSize" value="10" />
-->
</bean>
<!--配置自定义监听:MessageListener-->
<bean id="msgTopicMessageListenerOne" class="com.activemq.Consumer"></bean>
<bean id="msgTopicMessageListenerTwo" class="com.activemq.ConsumerTwo"></bean>
<!--Consumer定义消息配置-->
<bean id="sessionAeareLinstenerContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerOneConnectionFactory"/>
<!--知识点一介绍了,默认是false,表示的是P2P模式,这里设置成true来开启订阅模式-->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="msgTopic"/>
<!-- 消息持久化 -->
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumer-one"/>
<property name="messageListener" ref="msgTopicMessageListenerOne"/>
<!-- 消息应答方式
AUTO_ACKNOWLEDGE = 1 :自动确认
CLIENT_ACKNOWLEDGE = 2:客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
SESSION_TRANSACTED = 0:事务提交并确认
INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认
-->
<property name="sessionAcknowledgeMode" value="1"/>
<!--
如果我们要进行本地的事务管理,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true
<property name="sessionTransacted" value="true"/>
-->
</bean>
<!--ConsumerTwo定义消息配置-->
<bean id="sessionAeareLinstenerContainerTwo" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerTwoConnectionFactory"/>
<!--知识点一介绍了,默认是false,表示的是P2P模式,这里设置成true来开启订阅模式-->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="msgTopic"/>
<property name="clientId" value="consumer-two"/>
<!-- 消息持久化 -->
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="messageListener" ref="msgTopicMessageListenerTwo"/>
<!-- 消息应答方式
AUTO_ACKNOWLEDGE = 1 :自动确认
CLIENT_ACKNOWLEDGE = 2:客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
SESSION_TRANSACTED = 0:事务提交并确认
INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认
-->
<property name="sessionAcknowledgeMode" value="1"/>
<!--
如果我们要进行本地的事务管理,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true
<property name="sessionTransacted" value="true"/>
-->
</bean>
</beans>
这里面提几个注意的点:
①producerConnectionFactory中不能添加consumerOneConnectionFactory、consumerTwoConnectionFactory一样的属性clientId,理由在第二节提到了,因为clientId是唯一不能重复的。
②将之前queue改成Topic,即将ActiveMQQueue改成ActiveMQTopic,并对其构造方法设置属性。
③JmsTemplate其实相当于生产者的工具,因此connectionFactory是producerConnectionFactory。
④defaultDestination对应的是topic,在上面是defaultDestinationName,如果是defaultDestinationName,那就是myTestSpringTopic。
⑤Topic模式,必须要将pubSubDomain这个设置成true,上面有讲理由。
⑥如果不需要持久化的话,需要删掉上面consumerOneConnectionFactory、consumerTwoConnectionFactory里面的clientId属性以外,还需要删掉对应监听器sessionAeareLinstenerContainerTwo、sessionAeareLinstenerContainerOne里的clientId,并修改其subscriptionDurable值为false,不然会报错。
第三步,运行Test,结果如下:
OK,ActiveMQ+spring结合就到这里了。谢谢!
推荐阅读
-
Django开发学习BUG记录--RemovedInDjango19Warning:Model class apps.user.models.User doesn't declare an explicit app_label
-
一起学习造*(三):从零开始写一个React-Redux
-
给你选择Python语言实现机器学习算法的三大理由
-
linux学习日记三 文件权限与目录配置
-
laravel框架学习记录之表单操作详解
-
Spring security学习笔记(三)
-
AngularJS学习笔记(三)数据双向绑定的简单实例
-
iNeuOS 物联网云操作系统2.0发布,集成设备容器、视图建模、机器学习三大模块
-
QT 学习基础问题记录
-
Spring学习之依赖注入的方法(三种)