欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ与spring结合-topic消息-延时队列

程序员文章站 2022-05-17 08:33:49
...

RabbitMQ与spring结合-topic消息-延时队列

https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.1.6.RELEASE</version>
</dependency>

项目结构

RabbitMQ与spring结合-topic消息-延时队列

env.properties中存着配置

#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/

pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.ghgcn</groupId>
	<artifactId>spring-mq</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<version.jdk>1.8</version.jdk>
	</properties>

	<dependencies>

		<dependency>
			<groupId>com.sun</groupId>
			<artifactId>tools</artifactId>
			<version>${version.jdk}</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!--日志-->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.8</version>
			<scope>provided</scope>
		</dependency>
		<!--只需要集成这个-->
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>2.1.6.RELEASE</version>
		</dependency>

	</dependencies>
	<profiles>
		<profile>
			<id>dev</id>
			<properties>
				<env>dev</env>
			</properties>
			<activation>
				<activeByDefault>true</activeByDefault>
			</activation>
		</profile>
		<profile>
			<id>test</id>
			<properties>
				<env>test</env>
			</properties>
		</profile>
		<profile>
			<id>prod</id>
			<properties>
				<env>prod</env>
			</properties>
		</profile>
		<profile>
			<id>prep</id>
			<properties>
				<env>prep</env>
			</properties>
		</profile>
	</profiles>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>${version.jdk}</source>
					<target>${version.jdk}</target>
					<encoding>${project.build.sourceEncoding}</encoding>
				</configuration>
			</plugin>
		</plugins>
		<filters>
			<filter>src/env/${env}.properties</filter>
		</filters>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<includes>
					<include>**/*</include>
				</includes>
				<filtering>true</filtering>
			</resource>
		</resources>
	</build>
</project>

spring.xml配置direct.xml


监听器

package com.ghgcn.springmq.topic.listener;

import java.util.Date;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TopicListener implements ChannelAwareMessageListener {

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {

		log.info("TopicListener 接收 -message  {} ---{}",message+"    接收时间  "+ new Date());
		log.info("TopicListener 接收 -channel  {} ",channel);
		//消息确认
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}

}

生产者

package com.ghgcn.springmq.topic;

import java.util.Date;
import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.rabbitmq.client.MessageProperties;

public class TopicProducer {

	
	private static String exchange="topic_spring_exchange";
	private static String rountKey="rounter.topic.key";
	private static String queue="topic_spring_queue";
	@SuppressWarnings("resource")
	public static void main(String[] args) {

		//amqpTemplate
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("topic.xml");
		
		
		AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
		
		System.out.println("============topic message==============");
		//构建消息
		byte [] msgBytes = ("topic message test "+ new Date()).getBytes();
		Message messager = MessageBuilder.withBody(msgBytes)
        .setContentType(MessageProperties.TEXT_PLAIN.getContentType())
        .setMessageId(UUID.randomUUID().toString())
        .build();
		//延时5秒后发送
		//messager.getMessageProperties().setDelay(5000);
		//发送消息
		amqpTemplate.convertAndSend(exchange, rountKey, messager);
		
		System.out.println("===发送结束===");
		
		
	}

}

运行

============topic message==============
11:28:45.324 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - ConsumeOK: aaa@qq.com: tags=[[amq.ctag-92Md5IvnwegxJ7XhZZtSHQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789], acknowledgeMode=MANUAL local queue size=0
11:28:45.384 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2)
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789] to map, size now 1
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1299327689 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789]
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={x-delay=5000}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [topic_spring_exchange], routingKey = [rounter.topic.key]
===发送结束===
11:28:45.394 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-92Md5IvnwegxJ7XhZZtSHQ' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-92Md5IvnwegxJ7XhZZtSHQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789], acknowledgeMode=MANUAL local queue size=0
11:28:45.395 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_spring_exchange, receivedRoutingKey=rounter.topic.key, receivedDelay=5000, deliveryTag=1, consumerTag=amq.ctag-92Md5IvnwegxJ7XhZZtSHQ, consumerQueue=topic_spring_queue])
11:28:45.396 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.topic.listener.TopicListener - TopicListener 接收 -message  (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_spring_exchange, receivedRoutingKey=rounter.topic.key, receivedDelay=5000, deliveryTag=1, consumerTag=amq.ctag-92Md5IvnwegxJ7XhZZtSHQ, consumerQueue=topic_spring_queue])    接收时间  Fri May 31 11:28:45 CST 2019 ---{}
11:28:45.426 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2) PC:Ack:1:false
11:28:46.746 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.topic.listener.TopicListener - TopicListener 接收 -channel  Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789] 

延时

在交换器声明时添加 ==delayed=“true”==属性

<!--声明交换器 -->
	<rabbit:topic-exchange
		name="topic_spring_exchange" durable="true" auto-delete="false"
		auto-declare="true" declared-by="mqadmin" delayed="true">
		<!-- topic rounterKey绑定可以使用匹配符来绑定 -->
		<rabbit:bindings>
			<rabbit:binding pattern="#.topic.#"
				queue="topic_spring_queue"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:topic-exchange>

发送者

设置延时属性 毫秒
messager.getMessageProperties().setDelay(5000);

AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
		
		System.out.println("============topic message==============");
		//构建消息
		byte [] msgBytes = ("topic message test "+ new Date()).getBytes();
		Message messager = MessageBuilder.withBody(msgBytes)
        .setContentType(MessageProperties.TEXT_PLAIN.getContentType())
        .setMessageId(UUID.randomUUID().toString())
        .build();
		//延时5秒后发送
		messager.getMessageProperties().setDelay(5000);
		//发送消息
		amqpTemplate.convertAndSend(exchange, rountKey, messager);
		
		System.out.println("===发送结束===");

github:https://github.com/ln0491/rabbitmq-spring-demo

相关标签: RabbitMQ