ActiveMQ入门DEMO 以及和Spring的整合
好了直接我开始ActiveMQ的入门案例!
创建一个maven工程activeMQ_helloworld,提供两个测试类进行演示.
pom文件导入的依赖
创建一个测试类来做生产者生产消息,这里我用的是队列形式(queue),一对一的消费,我创建了一个队列叫
HelloActiveMQ,并发送了十条消息.
1 public class ActiveMQProducer {
2 @Test
3 public void testProduceMsg() throws Exception{
4 //连接工厂
5 //使用默认的用户名,密码,路径
6 //路径为 tcp://host:61616
7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
8 //获取一个连接
9 Connection connection = connectionFactory.createConnection();
10 //创建一个会话
11 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
12 //创建队列或者话题
13 Queue queue = session.createQueue("HelloActiveMQ");
14 //创建生产者或者消费者
15 MessageProducer producer = session.createProducer(queue);
16 //发送消息
17 for (int i = 0; i < 10; i++) {
18 producer.send(session.createTextMessage("activeMQ,你好!"+i));
19 }
20 //提交操作
21 session.commit();
22 }
23 }
熟悉ActiveMQ的API,根据API来发送消息,最后的commit不要忘了!!!
在创建一个消费者来对消息进行消费,消费者引用的队列名为之前创建的生产者队列名HelloActiveMQ
1 public class ActiveMQConsumer {
2 @Test
3 public void testConsumeMsg() throws Exception{
4 // 连接工厂
5 // 使用默认用户名、密码、路径
6 // 路径 tcp://host:61616
7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
8 // 获取一个连接
9 Connection connection = connectionFactory.createConnection();
10 //开启连接
11 connection.start();
12 //建立会话,第一个参数是否开启事务,为true时,最后需要session.conmit()的提交
13 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
14 // 创建队列或者话题对象
15 Queue queue = session.createQueue("HelloActiveMQ");
16 // 创建消费者
17 MessageConsumer messageConsumer = session.createConsumer(queue);
18
19 while (true) {
20 TextMessage message = (TextMessage) messageConsumer.receive(5000);
21 if (message != null) {
22 System.out.println(message.getText());
23 } else {
24 break;
25 }
26 }
27 }
28 }
生产者和消费者都已经创建好,现在就可以开始愉快的测试了~~~
哦,还没开启呢...
安装好的ActiveMQ在本地,进入bin选择win64(我电脑64的),activemq.bat开启
开启后
进入Activemq管理页面,地址http://服务器ip:8161,用户名admin,密码admin,如图
这个消息管理页面非常好用,用的很多,后面说~
现在执行一次生产者testProduceMsg(),生产了十条消息,可以在管理页面看到(queues队列)
显然有十条消息生产了~
现在调用消费者testConsumeMsg(),去消费这十条消息!
控制台打印出十条消息,再去看看消息管理页面>
十条消息已经消费了~~~ok
然而然而业务场景中用的最多的是监听机制,对生产者的消息进行监听,生产者一生产出消息,消费者立马进行消费掉!!!
这里我再进行监听测试>>
在消费者测试类里添加第二个方法(监听消费的方法),线程得一直开着.
1 @Test
2 // 使用监听器消费
3 public void testCosumeMQ2() throws Exception {
4 // 连接工厂
5 // 使用默认用户名、密码、路径
6 // 路径 tcp://host:61616
7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
8 // 获取一个连接
9 Connection connection = connectionFactory.createConnection();
10 // 开启连接
11 connection.start();
12 // 建立会话
13 // 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit();
14 Session session = connection.createSession(false,
15 Session.AUTO_ACKNOWLEDGE);
16 // 创建队列或者话题对象
17 Queue queue = session.createQueue("HelloActiveMQ");
18 // 创建消费者
19 MessageConsumer messageConsumer = session.createConsumer(queue);
20 messageConsumer.setMessageListener(new MessageListener() {
21 // 每次接收消息,自动调用 onMessage
22 public void onMessage(Message message) {
23 TextMessage textMessage = (TextMessage)message;
24 try {
25 System.out.println(textMessage.getText());
26 } catch (JMSException e) {
27 e.printStackTrace();
28 }
29 }
30 });
31 //不能关闭线程
32 while(true){
33
34 }
35 }
先执行这个方法使线程一直开启监听,再去执行生产者生产十条消息,可以发现>>>
消息一生产出来立马被监听到消费掉!
简单的入门案例就写到这里,Active整合Spring的简单使用下面开写~~~
注意了!!! 开始整合Spring了...
这次分别用Queue和Topic演示
创建maven工程activeMQ_spring
pom的依赖
1 <dependencies>
2 <dependency>
3 <groupId>org.springframework</groupId>
4 <artifactId>spring-context</artifactId>
5 <version>4.1.7.RELEASE</version>
6 </dependency>
7 <dependency>
8 <groupId>org.springframework</groupId>
9 <artifactId>spring-test</artifactId>
10 <version>4.1.7.RELEASE</version>
11 </dependency>
12 <dependency>
13 <groupId>junit</groupId>
14 <artifactId>junit</artifactId>
15 <version>4.12</version>
16 </dependency>
17 <dependency>
18 <groupId>org.apache.activemq</groupId>
19 <artifactId>activemq-all</artifactId>
20 <version>5.14.0</version>
21 </dependency>
22 <dependency>
23 <groupId>org.springframework</groupId>
24 <artifactId>spring-jms</artifactId>
25 <version>4.1.7.RELEASE</version>
26 </dependency>
27 </dependencies>
如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2,5.14.2
此时用到spring-jms消息服务,jms模版和jms的监听处理
在consumer包下创建两个Queue消费者(队列消费者)
QueueConsumer1:
1 package cn.bowen.activemq.consume;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 import org.springframework.stereotype.Service;
9 @Service
10 public class QueueConsumer1 implements MessageListener{
11
12 public void onMessage(Message message) {
13 TextMessage textMessage = (TextMessage)message;
14 try {
15 System.out.println("消费的QueueConsumer1获取消息:"+textMessage.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
QueueConsumer2:
1 package cn.bowen.activemq.consume;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 import org.springframework.stereotype.Service;
9 @Service
10 public class QueueConsumer2 implements MessageListener{
11
12 public void onMessage(Message message) {
13 TextMessage textMessage = (TextMessage)message;
14 try {
15 System.out.println("消费的QueueConsumer2获取消息:"+textMessage.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
创建两个Topic消费者(话题/广播消费者)
TopicConsumer1:
1 package cn.bowen.activemq.consume;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 import org.springframework.stereotype.Service;
9 @Service
10 public class TopicConsumer1 implements MessageListener{
11
12 public void onMessage(Message message) {
13 TextMessage textMessage = (TextMessage)message;
14 try {
15 System.out.println("消费的TopicConsumer1获取消息:"+textMessage.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
TopicConsumer2:
1 package cn.bowen.activemq.consume;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.MessageListener;
6 import javax.jms.TextMessage;
7
8 import org.springframework.stereotype.Service;
9 @Service
10 public class TopicConsumer2 implements MessageListener{
11
12 public void onMessage(Message message) {
13 TextMessage textMessage = (TextMessage)message;
14 try {
15 System.out.println("消费的TopicConsumer2获取消息:"+textMessage.getText());
16 } catch (JMSException e) {
17 e.printStackTrace();
18 }
19 }
20
21 }
配置applicationContext-mq-consumer.xml,注释说明配置信息~~~
1 <!-- 扫描包 -->
2 <context:component-scan base-package="cn.bowen.activemq.consume" />
3
4 <!-- ActiveMQ 连接工厂 -->
5 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
6 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
7 <amq:connectionFactory id="amqConnectionFactory"
8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
9
10 <!-- Spring Caching连接工厂 -->
11 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15 <!-- 同上,同理 -->
16 <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17 <!-- Session缓存数量 -->
18 <property name="sessionCacheSize" value="100" />
19 </bean>
20
21 <!-- 消息消费者 start-->
22
23 <!-- 定义Queue监听器 -->
24 <jms:listener-container destination-type="queue" container-type="default"
25 connection-factory="connectionFactory" acknowledge="auto">
26 <!-- destination是队列或话题名称 -->
27 <!-- 默认注册bean名称,应该是类名首字母小写 -->
28 <jms:listener destination="springQueue" ref="queueConsumer1"/>
29 <jms:listener destination="springQueue" ref="queueConsumer2"/>
30 </jms:listener-container>
31
32 <!-- 定义Topic监听器 -->
33 <jms:listener-container destination-type="topic" container-type="default"
34 connection-factory="connectionFactory" acknowledge="auto">
35 <jms:listener destination="springTopic" ref="topicConsumer1"/>
36 <jms:listener destination="springTopic" ref="topicConsumer2"/>
37 </jms:listener-container>
38
39 <!-- 消息消费者 end -->
配置applicationContext-mq.xml,注释说明配置信息
1 <!-- 扫描包 -->
2 <context:component-scan base-package="cn.bowen.activemq.produce" />
3
4 <!-- ActiveMQ 连接工厂 -->
5 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
6 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
7 <amq:connectionFactory id="amqConnectionFactory"
8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
9
10 <!-- Spring Caching连接工厂 -->
11 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15 <!-- 同上,同理 -->
16 <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17 <!-- Session缓存数量 -->
18 <property name="sessionCacheSize" value="100" />
19 </bean>
20
21 <!-- Spring JmsTemplate 的消息生产者 start-->
22
23 <!-- 定义JmsTemplate的Queue类型 -->
24 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
25 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
26 <constructor-arg ref="connectionFactory" />
27 <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
28 <property name="pubSubDomain" value="false" />
29 </bean>
30
31 <!-- 定义JmsTemplate的Topic类型 -->
32 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
33 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
34 <constructor-arg ref="connectionFactory" />
35 <!-- pub/sub模型(发布/订阅) -->
36 <property name="pubSubDomain" value="true" />
37 </bean>
38
39 <!--Spring JmsTemplate 的消息生产者 end-->
在produce包下创建QueueProducer生产者,引用模版的JmsTemplate的Queue类型
1 package cn.bowen.activemq.produce;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.Session;
6
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Qualifier;
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12
13 @Service
14 public class QueueProducer {
15 @Autowired
16 @Qualifier("jmsQueueTemplate")
17 private JmsTemplate jmsTemplate;
18
19 public void send(String queueName,final String msg){
20 jmsTemplate.send(queueName, new MessageCreator() {
21
22 public Message createMessage(Session session) throws JMSException {
23 return session.createTextMessage(msg);
24 }
25 });
26 }
27 }
在produce包下创建TopicProducer生产者,引用模版的JmsTemplate的Topic类型
1 package cn.bowen.activemq.produce;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.Session;
6
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Qualifier;
9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12
13 @Service
14 public class TopicProducer {
15 @Autowired
16 @Qualifier("jmsTopicTemplate")
17 private JmsTemplate jmsTemplate;
18
19 public void send(String topicName,final String msg){
20 jmsTemplate.send(topicName, new MessageCreator() {
21
22 public Message createMessage(Session session) throws JMSException {
23 return session.createTextMessage(msg);
24 }
25 });
26 }
27 }
最后生产者和消费者的Queue和Topic俩种类型都准备好了~~~
准备测试>>>
测试我使用的是spring的JUnit4来进行注解测试
在test包下创建ConsumerTest(消费者监听)
1 package cn.bowen.activemq;
2
3 import org.junit.Test;
4 import org.junit.runner.RunWith;
5 import org.springframework.test.context.ContextConfiguration;
6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
7
8 @RunWith(SpringJUnit4ClassRunner.class)
9 @ContextConfiguration(locations="classpath:applicationContext-mq-consumer.xml")
10 public class ConsumerTest {
11
12 @Test
13 public void testProduce(){
14 //线程不能关闭
15 while(true){}
16 }
17 }
创建生产者ProducerTest生产
1 package cn.bowen.activemq;
2
3 import org.junit.Test;
4 import org.junit.runner.RunWith;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.test.context.ContextConfiguration;
7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
8
9 import cn.bowen.activemq.produce.QueueProducer;
10 import cn.bowen.activemq.produce.TopicProducer;
11
12 @RunWith(SpringJUnit4ClassRunner.class)
13 @ContextConfiguration(locations="classpath:applicationContext-mq.xml")
14 public class ProducerTest {
15 @Autowired
16 private QueueProducer queueProducer;
17
18 @Autowired
19 private TopicProducer topicProducer;
20
21 @Test
22 public void testProduce(){
23 queueProducer.send("springQueue", "这是一个队列消息!");
24 topicProducer.send("springTopic", "这是一个广播/话题消息!");
25 }
26 }
先执行消费者进行监听>>>
在通过生产者生产第一次消息发现>>
在通过生产者生产第二次消息发现>>
在通过生产者生产第三次消息发现>>
不难发现Queue和Topic的区别???
发送消息类型为Topic时,是以广播的形式,每一个消费者都能消费到~~~
而发送消息Queue类型时,是作为一对一队列形式的消费,一条消息只能一个消费者消费~~~(两个消费者又好像是轮流消费哈)
两种类型应用的业务场景不一样!
上一篇: NIO和IO多路复用
下一篇: ActiveMQ初识