欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ入门Demo,基于springboot

程序员文章站 2022-07-12 12:28:22
...

缘起

前面几章我们基本了解了RabbitMQ的基本概念,以及RabbitMQ是如何保证消息的可靠性的,那么本章开始,将真正用java代码去连接使用一些RabbitMQ,通过阅读本章内容,你会明白如何在java springboot的项目中使用RabbitMQ。

阅读人群

项目采用springboot搭建,所以你对springboot需要有一个基本的了解,并且我们假设已经在你的服务器或本机安装了RabbitMQ,所以本章不会涉及关于如何安装RabbitMQ知识。

前置条件

在前面我们说到,对于Exchanges来说,他的路由规则有以下三种

  • Direct exchange:完全根据key进行投递的叫做Direct交换机。如果Routing key匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。例如,绑定时设置了Routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
  • Fanout exchange:不需要key的叫做Fanout交换机。它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
  • Topic exchange:对key进行模式匹配后进行投递的叫做Topic交换机。比如符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。

项目采用springboot搭建,基础配置文件信息为

spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=192.168.23.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

对于这三种方式,我们的代码实现也会有所不同,我们一一讲解。

Direct exchange

Config

@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue(){
        return new Queue("hello");
    }
}

producer

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(){
        String context = "hello " + new Date();
        System.out.println("Sender : "+ context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

consumer

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello){
        System.out.println("Receiver : " + hello);
    }
}

测试类

@SpringBootTest
public class Test {
    @Autowired
    private HelloSender helloSender;

    @org.junit.jupiter.api.Test
    void contextLoads(){
        helloSender.send();
    }
}

运行结果

RabbitMQ入门Demo,基于springboot

Topic exchange

Config

@Configuration
public class TopicRabbitConfig {
    final static String message = "topic.message";
    final static String messages = "topic.messages";

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * 将对列绑定到Topic交换器
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    /**
     * 将queueMessage队列绑定到Topic交换器并监听为topic.message的routingKey
     * 也就是说,当你发送消息时的key为topic.message的话,他就会被投递到queueMessage队列中,这个队列名字需要与上面声明的方法名相同
     *
     *     public Queue queueMessage() {
     *         return new Queue(TopicRabbitConfig.message);
     *     }
     *
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列绑定到Topic交换器 采用#的方式,与上述方法类似,只是本处采用通配符。也就是说,所有以topic.开头的消息都会被投递到queueMessages队列中
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

producer

@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.ffsda", context);
    }
}

Consumer

@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1  : " + message);
    }

}

Consumer2

@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }

}

测试类

@SpringBootTest
public class Test {

    @Autowired
    private TopicSender topicSender;

    @org.junit.jupiter.api.Test
    public void send1() {
        topicSender.send1();
    }
    @org.junit.jupiter.api.Test
    public void send2() {
        topicSender.send2();
    }
}

运行结果

send1()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EvuqRoyK-1587697889265)(http://www.bxoon.com/upload/2020/3/image-2ba6a6604c534527be8b117f477aa41a.png)]
send1()的key为topic.message,所以queueMessage队列会被放入,queueMessages队列监听的key为通配符topic.#,所以也会被放入,所以消费者两个都会消费这条消息。

send2()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L89jrplH-1587697889266)(http://www.bxoon.com/upload/2020/3/image-eb0bd4aa34534539a6e25b032662fb71.png)]
send2()的key为topic.ffsda,由于queueMessage队列监听的key为topic.message,所以不会被放入。queueMessages队列监听的key为通配符topic.#,所以会被放入,所以消费者两只有一个会消费这条消息。

Fanout exchange

相关标签: RabbitMQ