欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activemq实现队列的独有消费

程序员文章站 2022-07-13 16:53:09
...

     在我们实际的开发中可能存在这么一种情况,应用程序要向一个队列名为queue的队列中发送3条消息,需要保证这3条消息按顺序消费。必须是第一条消费完,在消费第二条然后是第三条。而我们的程序中可能有时候存在多个consumer对这个队列进行消费,那么可能出现消息时按1,2,3进行消费的,但第二条可能比较耗时,就会导致第2条消息没有消费完,第三条消息就已经消费完了,这个时候可能就会出现问题。

     解决方案:1、使用独有消费者就可以解决这个问题。(在创建队列的时候,向队列后面增加参数 ?consumer.exclusive=true

                        2、使用消息分组也可以进行解决(比上面一种方案好

                        3、本博文使用的是独有消费者进行解决

     场景模拟:向队列中发送3条消费,且在消费第二条消费时,线程沉睡5秒中,程序中启动2个监听对消息进行消费,看消息的消费结果

     队列是属于点对点的模式,且队列中的一条消费只可能被一个消费者进行消费

1、pom文件的配置

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.12.1</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-aop</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-beans</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context-support</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-core</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-messaging</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-oxm</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-tx</artifactId>
	<version>4.3.8.RELEASE</version>
</dependency>
<dependency>
	<groupId>commons-pool</groupId>
	<artifactId>commons-pool</artifactId>
	<version>1.6</version>
</dependency>

 2、消息监听器一 (消费者一)

public class ExclusiveConsumerListener implements SessionAwareMessageListener<TextMessage> {
	@Override
	public void onMessage(TextMessage message, Session session) throws JMSException {
		String msg = message.getText();
		if (msg.startsWith("2")) {
			try {
				System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒.");
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg);
	}
}

3、消息监听器二(消费者二)

public class ExclusiveConsumerListener2 implements SessionAwareMessageListener<TextMessage> {
	@Override
	public void onMessage(TextMessage message, Session session) throws JMSException {
		String msg = message.getText();
		if (msg.startsWith("2")) {
			try {
				System.out.println(Thread.currentThread().getName() + " 开始沉睡5秒.");
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + " : " + this.getClass().getName() + " :开始消费消息:" + msg);
	}
}

 4、配置文件(没有实现消息的独有消费

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="alwaysSyncSend" value="true" />
				<property name="brokerURL" value="tcp://localhost:61616" />
			</bean>
		</property>
		<property name="maxConnections" value="10" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="queue" />
		<property name="pubSubDomain" value="false" />
	</bean>

	<bean id="exclusiveConsumerListener" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener" />
	<bean id="exclusiveConsumerListener2" class="com.huan.activemq.独有消费者.ExclusiveConsumerListener2" />
	<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg name="name" value="spring-queue" />
	</bean>
	<jms:listener-container acknowledge="auto"
		connection-factory="connectionFactory" container-type="default"
		destination-type="queue">
		<jms:listener destination="spring-queue" ref="exclusiveConsumerListener" />
		<jms:listener destination="spring-queue" ref="exclusiveConsumerListener2" />
	</jms:listener-container>

 5、测试

public static void main(String[] args) {
		ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:application-exclusive-consumer.xml");
		JmsTemplate jmsTemplate = ctx.getBean(JmsTemplate.class);
		String msg1 = "1-这是第一条消息";
		String msg2 = "2-这是第二条消息";
		String msg3 = "3-这是第三条消息";
		jmsTemplate.convertAndSend(msg1);
		jmsTemplate.convertAndSend(msg2);
		jmsTemplate.convertAndSend(msg3);
	}

 6、测试结果(可以看到消息是乱的,并没有按照1消费完后消费2,2消费完后消费3)


    activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 

 

7、修改配置文件(在队列的字符串后面加上?consumer.exclusive=true实现消费的独有消费


   activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 
 8、重新测试(结果为按照顺序消费的

 
activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 
 

  • activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 
  • 大小: 31.2 KB
  • activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 
  • 大小: 129.1 KB
  • activemq实现队列的独有消费
            
    
    博客分类: activemq 消息队列activemq独有消费者 
  • 大小: 35.1 KB