欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot整合RabbitMQ之主题交换机模式

程序员文章站 2022-12-20 19:36:10
SpringBoot整合RabbitMQ之主题交换机模式先聊一会RabbitMQ的交换机模式似乎有五六种,这里我只拿出了直连交换机和主题交换机两种常用的来说,而且配置模式也并不是用@RabbitListens来配置的,如果有想了解的朋友可以多百度一下,我这里不做赘述,怎么配置都是看个人习惯的,达到目的才是我们共同追求的!生产者配置TopicRabbitConfig.javapackage com.chunlei.provider.config;import org.springframewor...

SpringBoot整合RabbitMQ之主题交换机模式

先聊一会

RabbitMQ的交换机模式似乎有五六种,这里我只拿出了直连交换机和主题交换机两种常用的来说,而且配置模式也并不是用@RabbitListens来配置的,如果有想了解的朋友可以多百度一下,我这里不做赘述,怎么配置都是看个人习惯的,达到目的才是我们共同追求的!

生产者配置

TopicRabbitConfig.java

package com.chunlei.provider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 朝花不迟暮
 * @version 1.0
 * @date 2020/11/12 8:42
 */
@Configuration
public class TopicRabbitConfig
{
    //绑定键
    public final static String MAN = "topic.man";
    public final static String WOMAN = "topic.woman";

    @Bean
    Queue firstQueue()
    {
        return new Queue(TopicRabbitConfig.MAN);
    }

    @Bean
    Queue secondQueue()
    {
        return new Queue(TopicRabbitConfig.WOMAN);
    }

    @Bean
    TopicExchange exchange()
    {
        return new TopicExchange("topicExchange");
    }

    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage()
    {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
    }

    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    //这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    @Bean
    Binding bindingExchangeMessage2()
    {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
}

不做详细描述了,代码很直观了!
控制层多添加两个接口,用于投递消息!

    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1()
    {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }

    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2()
    {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
        return "ok";
    }

这里有关convertAndSend前文已经简单描述过了,这里不加叙述!

消费者配置

TopicManReceiver.java

package com.chunlei.consumer.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author 朝花不迟暮
 * @version 1.0
 * @date 2020/11/12 8:48
 */
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver
{
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(Map testMessage)
    {
        log.info("【消息队列监听者】 DirectReceiver消费者收到消息:{}", testMessage.toString());
    }
}

TopicTotalReceiver.java

package com.chunlei.consumer.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author 朝花不迟暮
 * @version 1.0
 * @date 2020/11/12 8:50
 */
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver
{
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    public void process(Map testMessage) {
        log.info("【消息队列监听者】 DirectReceiver消费者收到消息:{}", testMessage.toString());    }
}

普通监听代码,不解释!

TopicRabbitConfig.java

package com.chunlei.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 朝花不迟暮
 * @version 1.0
 * @date 2020/11/12 8:54
 */
@Configuration
public class TopicRabbitConfig
{
    //绑定键
    public final static String man = "topic.man";
    public final static String woman = "topic.woman";

    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }

    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
}

这里我们注意一下bindingExchangeMessage2这个bean,他在绑定routingkey的时候,是以topic.#的模式来匹配的,具体可以看注释,前文也讲过topic的匹配规则!

同时启动两个项目,我们开始测试接口:
访问:http://localhost:8021/sendTopicMessage1

2020-11-12 10:17:13.941  INFO 5596 --- [ntContainer#1-1] c.c.consumer.config.TopicManReceiver     : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:17:13, messageId=fe10bbf5-b3b2-44e4-a481-15f22889ca44, messageData=message: M A N }
2020-11-12 10:17:13.941  INFO 5596 --- [ntContainer#2-1] c.c.consumer.config.TopicTotalReceiver   : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:17:13, messageId=fe10bbf5-b3b2-44e4-a481-15f22889ca44, messageData=message: M A N }

可以看出,他被bindingExchangeMessagebindingExchangeMessage2同时监听到了,因为他符合这个topic.#规则!

访问:http://localhost:8021/sendTopicMessage2

2020-11-12 10:19:05.516  INFO 5596 --- [ntContainer#2-1] c.c.consumer.config.TopicTotalReceiver   : 【消息队列监听者】 DirectReceiver消费者收到消息:{createTime=2020-11-12 10:19:05, messageId=d0f6625d-f67e-4fe8-8f0c-c18c12b3c7e6, messageData=message: woman is all }

这里的话,就只有一条打印,很显然只有bindingExchangeMessage2监听到了!

到此为止

主题交换机模式的简单使用就介绍完了,rabbitmq系列的文章还会继续更新,后期会同步到公众号,欢迎大家订阅我的公众号:
SpringBoot整合RabbitMQ之主题交换机模式

本文地址:https://blog.csdn.net/Curtisjia/article/details/109637665