欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ入门DEMO 以及和Spring的整合

程序员文章站 2022-05-18 14:48:15
...

好了直接我开始ActiveMQ的入门案例!

创建一个maven工程activeMQ_helloworld,提供两个测试类进行演示.

ActiveMQ入门DEMO 以及和Spring的整合

 

pom文件导入的依赖

ActiveMQ入门DEMO 以及和Spring的整合

 

创建一个测试类来做生产者生产消息,这里我用的是队列形式(queue),一对一的消费,我创建了一个队列叫

HelloActiveMQ,并发送了十条消息.

 

ActiveMQ入门DEMO 以及和Spring的整合

 1 public class ActiveMQProducer {
 2     @Test
 3     public void testProduceMsg() throws Exception{
 4         //连接工厂
 5         //使用默认的用户名,密码,路径
 6         //路径为 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         //获取一个连接
 9         Connection connection = connectionFactory.createConnection();
10         //创建一个会话
11         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
12         //创建队列或者话题
13         Queue queue = session.createQueue("HelloActiveMQ");
14         //创建生产者或者消费者
15         MessageProducer producer = session.createProducer(queue);
16         //发送消息
17         for (int i = 0; i < 10; i++) {
18             producer.send(session.createTextMessage("activeMQ,你好!"+i));
19         }
20         //提交操作
21         session.commit();
22     }
23 }

ActiveMQ入门DEMO 以及和Spring的整合

 

熟悉ActiveMQ的API,根据API来发送消息,最后的commit不要忘了!!!

在创建一个消费者来对消息进行消费,消费者引用的队列名为之前创建的生产者队列名HelloActiveMQ

ActiveMQ入门DEMO 以及和Spring的整合

 1 public class ActiveMQConsumer {
 2     @Test
 3     public void testConsumeMsg() throws Exception{
 4         // 连接工厂
 5         // 使用默认用户名、密码、路径
 6         // 路径 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         // 获取一个连接
 9         Connection connection = connectionFactory.createConnection();
10         //开启连接
11         connection.start();
12         //建立会话,第一个参数是否开启事务,为true时,最后需要session.conmit()的提交
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
14         // 创建队列或者话题对象
15         Queue queue = session.createQueue("HelloActiveMQ");
16         // 创建消费者
17         MessageConsumer messageConsumer = session.createConsumer(queue);
18         
19         while (true) {
20             TextMessage message = (TextMessage) messageConsumer.receive(5000);
21             if (message != null) {
22                 System.out.println(message.getText());
23             } else {
24                 break;
25             }
26         }
27     }
28 }

ActiveMQ入门DEMO 以及和Spring的整合

生产者和消费者都已经创建好,现在就可以开始愉快的测试了~~~

哦,还没开启呢...

安装好的ActiveMQ在本地,进入bin选择win64(我电脑64的),activemq.bat开启

ActiveMQ入门DEMO 以及和Spring的整合

开启后

ActiveMQ入门DEMO 以及和Spring的整合

进入Activemq管理页面,地址http://服务器ip:8161,用户名admin,密码admin,如图

ActiveMQ入门DEMO 以及和Spring的整合

 

 

这个消息管理页面非常好用,用的很多,后面说~

现在执行一次生产者testProduceMsg(),生产了十条消息,可以在管理页面看到(queues队列)

ActiveMQ入门DEMO 以及和Spring的整合

 

显然有十条消息生产了~

现在调用消费者testConsumeMsg(),去消费这十条消息!

ActiveMQ入门DEMO 以及和Spring的整合

 

控制台打印出十条消息,再去看看消息管理页面>

ActiveMQ入门DEMO 以及和Spring的整合

 

十条消息已经消费了~~~ok

 

然而然而业务场景中用的最多的是监听机制,对生产者的消息进行监听,生产者一生产出消息,消费者立马进行消费掉!!!

这里我再进行监听测试>>

在消费者测试类里添加第二个方法(监听消费的方法),线程得一直开着.

ActiveMQ入门DEMO 以及和Spring的整合

 1 @Test
 2     // 使用监听器消费
 3     public void testCosumeMQ2() throws Exception {
 4         // 连接工厂
 5         // 使用默认用户名、密码、路径
 6         // 路径 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         // 获取一个连接
 9         Connection connection = connectionFactory.createConnection();
10         // 开启连接
11         connection.start();
12         // 建立会话
13         // 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit();
14         Session session = connection.createSession(false,
15                 Session.AUTO_ACKNOWLEDGE);
16         // 创建队列或者话题对象
17         Queue queue = session.createQueue("HelloActiveMQ");
18         // 创建消费者
19         MessageConsumer messageConsumer = session.createConsumer(queue);
20         messageConsumer.setMessageListener(new MessageListener() {
21             // 每次接收消息,自动调用 onMessage
22             public void onMessage(Message message) {
23                 TextMessage textMessage = (TextMessage)message;
24                 try {
25                     System.out.println(textMessage.getText());
26                 } catch (JMSException e) {
27                     e.printStackTrace();
28                 }
29             }
30         });
31         //不能关闭线程
32         while(true){
33             
34         }
35     }

ActiveMQ入门DEMO 以及和Spring的整合

 

先执行这个方法使线程一直开启监听,再去执行生产者生产十条消息,可以发现>>>

消息一生产出来立马被监听到消费掉!

ActiveMQ入门DEMO 以及和Spring的整合

简单的入门案例就写到这里,Active整合Spring的简单使用下面开写~~~

注意了!!!  开始整合Spring了...

这次分别用Queue和Topic演示

创建maven工程activeMQ_spring

ActiveMQ入门DEMO 以及和Spring的整合

pom的依赖

 

ActiveMQ入门DEMO 以及和Spring的整合

 1 <dependencies>
 2       <dependency>
 3           <groupId>org.springframework</groupId>
 4           <artifactId>spring-context</artifactId>
 5           <version>4.1.7.RELEASE</version>
 6       </dependency>
 7       <dependency>
 8           <groupId>org.springframework</groupId>
 9           <artifactId>spring-test</artifactId>
10           <version>4.1.7.RELEASE</version>
11       </dependency>
12       <dependency>
13           <groupId>junit</groupId>
14           <artifactId>junit</artifactId>
15           <version>4.12</version>
16       </dependency>
17       <dependency>
18           <groupId>org.apache.activemq</groupId>
19           <artifactId>activemq-all</artifactId>
20           <version>5.14.0</version>
21       </dependency>
22       <dependency>
23           <groupId>org.springframework</groupId>
24           <artifactId>spring-jms</artifactId>
25           <version>4.1.7.RELEASE</version>
26       </dependency>
27   </dependencies>

ActiveMQ入门DEMO 以及和Spring的整合

 

如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2,5.14.2

此时用到spring-jms消息服务,jms模版和jms的监听处理

在consumer包下创建两个Queue消费者(队列消费者)

QueueConsumer1:

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class QueueConsumer1 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消费的QueueConsumer1获取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

ActiveMQ入门DEMO 以及和Spring的整合

QueueConsumer2:

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class QueueConsumer2 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消费的QueueConsumer2获取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

ActiveMQ入门DEMO 以及和Spring的整合

 

创建两个Topic消费者(话题/广播消费者)

TopicConsumer1:

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class TopicConsumer1 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消费的TopicConsumer1获取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

ActiveMQ入门DEMO 以及和Spring的整合

TopicConsumer2:

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class TopicConsumer2 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消费的TopicConsumer2获取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

ActiveMQ入门DEMO 以及和Spring的整合

配置applicationContext-mq-consumer.xml,注释说明配置信息~~~

ActiveMQ入门DEMO 以及和Spring的整合

 1     <!-- 扫描包 -->
 2     <context:component-scan base-package="cn.bowen.activemq.consume" />
 3     
 4     <!-- ActiveMQ 连接工厂 -->
 5     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
 6     <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
 7     <amq:connectionFactory id="amqConnectionFactory"
 8         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
 9 
10     <!-- Spring Caching连接工厂 -->
11     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
12     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
14         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15         <!-- 同上,同理 -->
16         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17         <!-- Session缓存数量 -->
18         <property name="sessionCacheSize" value="100" />
19     </bean>
20     
21      <!-- 消息消费者 start-->
22 
23     <!-- 定义Queue监听器 -->
24     <jms:listener-container destination-type="queue" container-type="default" 
25         connection-factory="connectionFactory" acknowledge="auto">
26         <!-- destination是队列或话题名称 -->
27         <!-- 默认注册bean名称,应该是类名首字母小写  -->
28         <jms:listener destination="springQueue" ref="queueConsumer1"/>
29         <jms:listener destination="springQueue" ref="queueConsumer2"/>
30     </jms:listener-container>
31     
32     <!-- 定义Topic监听器 -->
33     <jms:listener-container destination-type="topic" container-type="default" 
34         connection-factory="connectionFactory" acknowledge="auto">
35         <jms:listener destination="springTopic" ref="topicConsumer1"/>
36         <jms:listener destination="springTopic" ref="topicConsumer2"/>
37     </jms:listener-container>
38 
39     <!-- 消息消费者 end -->

ActiveMQ入门DEMO 以及和Spring的整合

配置applicationContext-mq.xml,注释说明配置信息

ActiveMQ入门DEMO 以及和Spring的整合

 1     <!-- 扫描包 -->
 2     <context:component-scan base-package="cn.bowen.activemq.produce" />
 3     
 4     <!-- ActiveMQ 连接工厂 -->
 5     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
 6     <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
 7     <amq:connectionFactory id="amqConnectionFactory"
 8         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
 9 
10     <!-- Spring Caching连接工厂 -->
11     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
12     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
14         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15         <!-- 同上,同理 -->
16         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17         <!-- Session缓存数量 -->
18         <property name="sessionCacheSize" value="100" />
19     </bean>
20     
21      <!-- Spring JmsTemplate 的消息生产者 start-->
22 
23     <!-- 定义JmsTemplate的Queue类型 -->
24     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
25         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
26         <constructor-arg ref="connectionFactory" />
27         <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
28         <property name="pubSubDomain" value="false" />
29     </bean>
30 
31     <!-- 定义JmsTemplate的Topic类型 -->
32     <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
33          <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
34         <constructor-arg ref="connectionFactory" />
35         <!-- pub/sub模型(发布/订阅) -->
36         <property name="pubSubDomain" value="true" />
37     </bean>
38 
39     <!--Spring JmsTemplate 的消息生产者 end-->

ActiveMQ入门DEMO 以及和Spring的整合

在produce包下创建QueueProducer生产者,引用模版的JmsTemplate的Queue类型

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.produce;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 public class QueueProducer {
15     @Autowired
16     @Qualifier("jmsQueueTemplate")
17     private JmsTemplate jmsTemplate;
18     
19     public void send(String queueName,final String msg){
20         jmsTemplate.send(queueName, new MessageCreator() {
21             
22             public Message createMessage(Session session) throws JMSException {
23                 return session.createTextMessage(msg);
24             }
25         });
26     }
27 }

ActiveMQ入门DEMO 以及和Spring的整合

在produce包下创建TopicProducer生产者,引用模版的JmsTemplate的Topic类型

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq.produce;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 public class TopicProducer {
15     @Autowired
16     @Qualifier("jmsTopicTemplate")
17     private JmsTemplate jmsTemplate;
18     
19     public void send(String topicName,final String msg){
20         jmsTemplate.send(topicName, new MessageCreator() {
21             
22             public Message createMessage(Session session) throws JMSException {
23                 return session.createTextMessage(msg);
24             }
25         });
26     }
27 }

ActiveMQ入门DEMO 以及和Spring的整合

最后生产者和消费者的Queue和Topic俩种类型都准备好了~~~

准备测试>>>

测试我使用的是spring的JUnit4来进行注解测试

在test包下创建ConsumerTest(消费者监听)

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.test.context.ContextConfiguration;
 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 7 
 8 @RunWith(SpringJUnit4ClassRunner.class)
 9 @ContextConfiguration(locations="classpath:applicationContext-mq-consumer.xml")
10 public class ConsumerTest {
11 
12     @Test
13     public void testProduce(){
14         //线程不能关闭
15         while(true){}
16     }
17 }

ActiveMQ入门DEMO 以及和Spring的整合

创建生产者ProducerTest生产

ActiveMQ入门DEMO 以及和Spring的整合

 1 package cn.bowen.activemq;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.test.context.ContextConfiguration;
 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 8 
 9 import cn.bowen.activemq.produce.QueueProducer;
10 import cn.bowen.activemq.produce.TopicProducer;
11 
12 @RunWith(SpringJUnit4ClassRunner.class)
13 @ContextConfiguration(locations="classpath:applicationContext-mq.xml")
14 public class ProducerTest {
15     @Autowired
16     private QueueProducer queueProducer;
17     
18     @Autowired
19     private TopicProducer topicProducer;
20     
21     @Test
22     public void testProduce(){
23         queueProducer.send("springQueue", "这是一个队列消息!");
24         topicProducer.send("springTopic", "这是一个广播/话题消息!");
25     }
26 }

ActiveMQ入门DEMO 以及和Spring的整合

先执行消费者进行监听>>>

在通过生产者生产第一次消息发现>>

ActiveMQ入门DEMO 以及和Spring的整合

在通过生产者生产第二次消息发现>>

ActiveMQ入门DEMO 以及和Spring的整合

在通过生产者生产第三次消息发现>>

ActiveMQ入门DEMO 以及和Spring的整合

 

不难发现Queue和Topic的区别???

发送消息类型为Topic时,是以广播的形式,每一个消费者都能消费到~~~

而发送消息Queue类型时,是作为一对一队列形式的消费,一条消息只能一个消费者消费~~~(两个消费者又好像是轮流消费哈)

两种类型应用的业务场景不一样!

相关标签: activeMQ