SpringBoot整合RabbitMQ之主题交换机模式
SpringBoot整合RabbitMQ之主题交换机模式
先聊一会
RabbitMQ
的交换机模式似乎有五六种,这里我只拿出了直连交换机和主题交换机两种常用的来说,而且配置模式也并不是用@RabbitListens
来配置的,如果有想了解的朋友可以多百度一下,我这里不做赘述,怎么配置都是看个人习惯的,达到目的才是我们共同追求的!
生产者配置
TopicRabbitConfig.java
package com.chunlei.provider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 朝花不迟暮
* @version 1.0
* @date 2020/11/12 8:42
*/
@Configuration
public class TopicRabbitConfig
{
//绑定键
public final static String MAN = "topic.man";
public final static String WOMAN = "topic.woman";
@Bean
Queue firstQueue()
{
return new Queue(TopicRabbitConfig.MAN);
}
@Bean
Queue secondQueue()
{
return new Queue(TopicRabbitConfig.WOMAN);
}
@Bean
TopicExchange exchange()
{
return new TopicExchange("topicExchange");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage()
{
return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
//这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2()
{
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
不做详细描述了,代码很直观了!
控制层多添加两个接口,用于投递消息!
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1()
{
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2()
{
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
这里有关convertAndSend
前文已经简单描述过了,这里不加叙述!
消费者配置
TopicManReceiver.java
package com.chunlei.consumer.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author 朝花不迟暮
* @version 1.0
* @date 2020/11/12 8:48
*/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(Map testMessage)
{
log.info("【消息队列监听者】 DirectReceiver消费者收到消息:{}", testMessage.toString());
}
}
TopicTotalReceiver.java
package com.chunlei.consumer.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author 朝花不迟暮
* @version 1.0
* @date 2020/11/12 8:50
*/
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(Map testMessage) {
log.info("【消息队列监听者】 DirectReceiver消费者收到消息:{}", testMessage.toString()); }
}
普通监听代码,不解释!
TopicRabbitConfig.java
package com.chunlei.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 朝花不迟暮
* @version 1.0
* @date 2020/11/12 8:54
*/
@Configuration
public class TopicRabbitConfig
{
//绑定键
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
这里我们注意一下
bindingExchangeMessage2
这个bean,他在绑定routingkey的时候,是以topic.#
的模式来匹配的,具体可以看注释,前文也讲过topic的匹配规则!
同时启动两个项目,我们开始测试接口:
访问:http://localhost:8021/sendTopicMessage1
2020-11-12 10:17:13.941 INFO 5596 --- [ntContainer#1-1] c.c.consumer.config.TopicManReceiver : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:17:13, messageId=fe10bbf5-b3b2-44e4-a481-15f22889ca44, messageData=message: M A N }
2020-11-12 10:17:13.941 INFO 5596 --- [ntContainer#2-1] c.c.consumer.config.TopicTotalReceiver : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:17:13, messageId=fe10bbf5-b3b2-44e4-a481-15f22889ca44, messageData=message: M A N }
可以看出,他被bindingExchangeMessage
和bindingExchangeMessage2
同时监听到了,因为他符合这个topic.#
规则!
访问:http://localhost:8021/sendTopicMessage2
2020-11-12 10:19:05.516 INFO 5596 --- [ntContainer#2-1] c.c.consumer.config.TopicTotalReceiver : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:19:05, messageId=d0f6625d-f67e-4fe8-8f0c-c18c12b3c7e6, messageData=message: woman is all }
这里的话,就只有一条打印,很显然只有bindingExchangeMessage2
监听到了!
到此为止
主题交换机模式的简单使用就介绍完了,rabbitmq系列的文章还会继续更新,后期会同步到公众号,欢迎大家订阅我的公众号:
本文地址:https://blog.csdn.net/Curtisjia/article/details/109637665
上一篇: IO流拆分以及合并文件
下一篇: synchronized
推荐阅读
-
SpringBoot整合RabbitMQ之主题交换机模式
-
springboot整合rabbitmq 五种模式及死信队列的demo代码
-
SpringBoot2.x系列教程63--SpringBoot整合消息队列之RabbitMQ详解
-
springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)
-
RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合
-
RabbitMQ交换机与Springboot整合的简单实现
-
SpringBoot整合RabbitMQ之主题交换机模式
-
RabbitMQ交换机与Springboot整合的简单实现
-
RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合
-
springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)