RabbitMQ与spring结合-topic消息-延时队列
程序员文章站
2022-05-17 08:33:49
...
RabbitMQ与spring结合-topic消息-延时队列
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
项目结构
env.properties中存着配置
#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ghgcn</groupId>
<artifactId>spring-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.jdk>1.8</version.jdk>
</properties>
<dependencies>
<dependency>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
<version>${version.jdk}</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!--日志-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<!--只需要集成这个-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<properties>
<env>dev</env>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>test</id>
<properties>
<env>test</env>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<env>prod</env>
</properties>
</profile>
<profile>
<id>prep</id>
<properties>
<env>prep</env>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>${version.jdk}</source>
<target>${version.jdk}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
<filters>
<filter>src/env/${env}.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>
spring.xml配置direct.xml
监听器
package com.ghgcn.springmq.topic.listener;
import java.util.Date;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TopicListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("TopicListener 接收 -message {} ---{}",message+" 接收时间 "+ new Date());
log.info("TopicListener 接收 -channel {} ",channel);
//消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生产者
package com.ghgcn.springmq.topic;
import java.util.Date;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.rabbitmq.client.MessageProperties;
public class TopicProducer {
private static String exchange="topic_spring_exchange";
private static String rountKey="rounter.topic.key";
private static String queue="topic_spring_queue";
@SuppressWarnings("resource")
public static void main(String[] args) {
//amqpTemplate
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("topic.xml");
AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
System.out.println("============topic message==============");
//构建消息
byte [] msgBytes = ("topic message test "+ new Date()).getBytes();
Message messager = MessageBuilder.withBody(msgBytes)
.setContentType(MessageProperties.TEXT_PLAIN.getContentType())
.setMessageId(UUID.randomUUID().toString())
.build();
//延时5秒后发送
//messager.getMessageProperties().setDelay(5000);
//发送消息
amqpTemplate.convertAndSend(exchange, rountKey, messager);
System.out.println("===发送结束===");
}
}
运行
============topic message==============
11:28:45.324 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - ConsumeOK: aaa@qq.com: tags=[[amq.ctag-92Md5IvnwegxJ7XhZZtSHQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789], acknowledgeMode=MANUAL local queue size=0
11:28:45.384 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2)
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789] to map, size now 1
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1299327689 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789]
11:28:45.385 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={x-delay=5000}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [topic_spring_exchange], routingKey = [rounter.topic.key]
===发送结束===
11:28:45.394 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-92Md5IvnwegxJ7XhZZtSHQ' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-92Md5IvnwegxJ7XhZZtSHQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789], acknowledgeMode=MANUAL local queue size=0
11:28:45.395 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_spring_exchange, receivedRoutingKey=rounter.topic.key, receivedDelay=5000, deliveryTag=1, consumerTag=amq.ctag-92Md5IvnwegxJ7XhZZtSHQ, consumerQueue=topic_spring_queue])
11:28:45.396 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.topic.listener.TopicListener - TopicListener 接收 -message (Body:'topic message test Fri May 31 11:28:45 CST 2019' MessageProperties [headers={}, messageId=8e009836-65da-4799-9b9e-493ea6cfa601, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_spring_exchange, receivedRoutingKey=rounter.topic.key, receivedDelay=5000, deliveryTag=1, consumerTag=amq.ctag-92Md5IvnwegxJ7XhZZtSHQ, consumerQueue=topic_spring_queue]) 接收时间 Fri May 31 11:28:45 CST 2019 ---{}
11:28:45.426 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2) PC:Ack:1:false
11:28:46.746 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.topic.listener.TopicListener - TopicListener 接收 -channel Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 53789]
延时
在交换器声明时添加 ==delayed=“true”==属性
<!--声明交换器 -->
<rabbit:topic-exchange
name="topic_spring_exchange" durable="true" auto-delete="false"
auto-declare="true" declared-by="mqadmin" delayed="true">
<!-- topic rounterKey绑定可以使用匹配符来绑定 -->
<rabbit:bindings>
<rabbit:binding pattern="#.topic.#"
queue="topic_spring_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
发送者
设置延时属性 毫秒
messager.getMessageProperties().setDelay(5000);
AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
System.out.println("============topic message==============");
//构建消息
byte [] msgBytes = ("topic message test "+ new Date()).getBytes();
Message messager = MessageBuilder.withBody(msgBytes)
.setContentType(MessageProperties.TEXT_PLAIN.getContentType())
.setMessageId(UUID.randomUUID().toString())
.build();
//延时5秒后发送
messager.getMessageProperties().setDelay(5000);
//发送消息
amqpTemplate.convertAndSend(exchange, rountKey, messager);
System.out.println("===发送结束===");
上一篇: Spring Boot入门-- 写一个hello world
下一篇: kail教程(四)—配置