activemq实现队列的独有消费
程序员文章站
2022-07-13 16:53:09
...
在我们实际的开发中可能存在这么一种情况,应用程序要向一个队列名为queue的队列中发送3条消息,需要保证这3条消息按顺序消费。必须是第一条消费完,在消费第二条然后是第三条。而我们的程序中可能有时候存在多个consumer对这个队列进行消费,那么可能出现消息时按1,2,3进行消费的,但第二条可能比较耗时,就会导致第2条消息没有消费完,第三条消息就已经消费完了,这个时候可能就会出现问题。
解决方案:1、使用独有消费者就可以解决这个问题。(在创建队列的时候,向队列后面增加参数 ?consumer.exclusive=true)
2、使用消息分组也可以进行解决(比上面一种方案好)
3、本博文使用的是独有消费者进行解决
场景模拟:向队列中发送3条消费,且在消费第二条消费时,线程沉睡5秒中,程序中启动2个监听对消息进行消费,看消息的消费结果
队列是属于点对点的模式,且队列中的一条消费只可能被一个消费者进行消费
1、pom文件的配置
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>4.3.8.RELEASE</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency>
2、消息监听器一 (消费者一)
public class ExclusiveConsumerListener implements SessionAwareMessageListener<TextMessage> { @Override public void onMessage(TextMessage message, Session session) throws JMSException { String msg = message.getText(); if (msg.startsWith("2")) { try { System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg); } }
3、消息监听器二(消费者二)
public class ExclusiveConsumerListener2 implements SessionAwareMessageListener<TextMessage> { @Override public void onMessage(TextMessage message, Session session) throws JMSException { String msg = message.getText(); if (msg.startsWith("2")) { try { System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒."); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg); } }
4、配置文件(没有实现消息的独有消费)
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="alwaysSyncSend" value="true" /> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> <property name="maxConnections" value="10" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queue" /> <property name="pubSubDomain" value="false" /> </bean> <bean id="exclusiveConsumerListener" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener" /> <bean id="exclusiveConsumerListener2" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener2" /> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="spring-queue" /> </bean> <jms:listener-container acknowledge="auto" connection-factory="connectionFactory" container-type="default" destination-type="queue"> <jms:listener destination="spring-queue" ref="exclusiveConsumerListener" /> <jms:listener destination="spring-queue" ref="exclusiveConsumerListener2" /> </jms:listener-container>
5、测试
public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-exclusive-consumer.xml"); JmsTemplate jmsTemplate = ctx.getBean(JmsTemplate.class); String msg1 = "1-这是第一条消息"; String msg2 = "2-这是第二条消息"; String msg3 = "3-这是第三条消息"; jmsTemplate.convertAndSend(msg1); jmsTemplate.convertAndSend(msg2); jmsTemplate.convertAndSend(msg3); }
6、测试结果(可以看到消息是乱的,并没有按照1消费完后消费2,2消费完后消费3)
7、修改配置文件(在队列的字符串后面加上?consumer.exclusive=true)实现消费的独有消费
8、重新测试(结果为按照顺序消费的)