欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot整合RabbitMQ

程序员文章站 2022-04-30 13:58:34
...

SpringBoot整合RabbitMQ

前言

RabbitMQ 官网 文档 提供7种消息队列模式,如下图所示

SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ

依赖导入

    <dependency>
            <groupId>com.github.luues</groupId>
            <artifactId>spring-boot-starter-rabbitmq</artifactId>
            <version>1.2.9.1.RELEASE</version>
        </dependency>

普通消息队列

也就是一个生产者和一个消费者模式

SpringBoot整合RabbitMQ

生产者

public class Send {

    public static final String QUEUE_NAME = "test_queue";


    public static void main(String[] args) {

        ConnectionFactory factory = new ConnectionFactory();
        Channel channel = null;
        Connection connection = null;
        factory.setHost("localhost");
        //factory.setPort(5671);

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
            //创建生命队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World! 111 ";
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            factory.clone();
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }

}

发送消息
SpringBoot整合RabbitMQ

消费者

public class Receiving {

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");



        while (true){
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            //true 监听队列 有消息就获取 没有就阻塞
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

        }

    }



}

接受消息
SpringBoot整合RabbitMQ
可以看到接受完消息后main方法没有停止,而是在监听消息处于阻塞状态。

work模式

一个生产者 多个消费者,只能有一个消费者接受到一个消息。
SpringBoot整合RabbitMQ

生产者


        public static String QUEUE_NAME = "test_queue";

        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setVirtualHost("/test");
            factory.setUsername("test");
            factory.setPassword("test");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(
                    QUEUE_NAME, false, false,
                    false, null
            );

            //发送消息
            for (int i = 0; i < 50; i++) {
                String message = "message: " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(message);
                try {
                    Thread.sleep(i * 10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }


            channel.close();
            connection.close();
        }

消费者1 消费者2 代码一样 这里只写一个


        public static String QUEUE_NAME = "test_queue";

        public static void main(String[] args) throws IOException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();

            factory.setHost("localhost");
            factory.setVirtualHost("/test");
            factory.setUsername("test");
            factory.setPassword("test");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            //同一时刻只能发送一个消息给消费者
            channel.basicQos(1);

            channel.queueDeclare(
                    QUEUE_NAME, false, false,
                    false, null
            );
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }

需要注意的是

//同一时刻只能发送一个消息给消费者  那个消费者早消费完 早可以拿消息进行消费  能者多劳
        channel.basicQos(1);

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

  • 模式1:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

  channel.basicConsume(QUEUE_NAME, true, consumer);

只需再监听的时候设置为true就可以了,只要消费者拿到消息就会确认。

  • 模式2:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

  channel.basicConsume(QUEUE_NAME, false, consumer);

false 表示不自动确认消息

手动提交确认消息


            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //手动进行确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };


			//监听队列 不自动提交确认
            channel.basicConsume(QUEUE_NAME, false, consumer);

订阅模式

一个生产者 多个消费者订阅 然后同时接受到消息
SpringBoot整合RabbitMQ

X 表示交换机

红色两个队列绑定到X交换机

订阅者模式:

  • 1、1个生产者,多个消费者
  • 2、每一个消费者都有自己的一一个队列
  • 3、生产者没有将消息直接发送到队列,而是发送到了交换机
  • 4、每个队列都要绑定到交换机
  • 5、生产者发送的消息,经过交换机,到达队列,实现,-一个消息被多个消费者获取的目的

发布者

//交换机
    private static final String EXCHANGE_NAME = "exchange_logs";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //消息
        String msg = "订单更新..... ";
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("发送的消息 : " + msg);

        channel.close();
        connection.close();
    }

两个订阅者

队列为 test_queue1_name 的订阅者

package com.example.demo.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description:
 * @author: Administrator
 * @create: 2020-07-12 17:55
 **/
public class Rece1 {

    //交换机
    private static final String EXCHANGE_NAME = "exchange_logs";
    //队列
    public static final String QUEUE_NAME = "test_queue1_name";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received1  '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}

队列为 test_queue2_name 的订阅者

package com.example.demo.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @description:
 * @author: Administrator
 * @create: 2020-07-12 17:55
 **/
public class Rece2 {

    //交换机
    private static final String EXCHANGE_NAME = "exchange_logs";
    //队列
    public static final String QUEUE_NAME = "test_queue2_name";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received2  '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}


控制台中可以看到绑定的两个队列

SpringBoot整合RabbitMQ

发布的消息
SpringBoot整合RabbitMQ
订阅的消息
SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

路由模式

SpringBoot整合RabbitMQ

x表示交换器

type表示交换器要发送消息的类型key

不同的key发送到不同的队列中去,可以将数据类型分开进行数据订阅,从而进行不同的数据处理。

路由流程图

根据不同的key进行订阅
SpringBoot整合RabbitMQ

如:前台新增一个商品数据,此时我们不需要写到redis中
前台删除一个商品数据,我们需要从redis中删除这条数据,也需要从数据库删除这条数据。

在发布订阅的基础代码上更改

但是必须设置事件类型为
SpringBoot整合RabbitMQ

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

路由发布者

  //消息
        String msg = "新增商品..... ";
        channel.basicPublish(EXCHANGE_NAME,"add",null,msg.getBytes());
        System.out.println("发送的消息 : " + msg);

删除redis缓存发布者

 
  //消息
        String msg = "删除商品..... ";
        channel.basicPublish(EXCHANGE_NAME,"del",null,msg.getBytes());
        System.out.println("发送的消息 : " + msg);

路由订阅者

增加订阅者

 //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add"); 
        //同时订阅多个路由类型可以多写一行
         //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); 

删除订阅者



        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "del");

通配符匹配模式(Topic)

通配符匹配模式是对路由模式的

大白话来说:路由模式是全匹配key必须一样,通配符是模糊匹配。

通配符匹配模式图

SpringBoot整合RabbitMQ

匹配符
SpringBoot整合RabbitMQ

* 表示匹配一个
# 表示匹配多个

例如:

设置事件类型

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

接收iteme.add消息

   
    //交换机
    private static final String EXCHANGE_NAME = "exchange_topic";
    //队列
    public static final String QUEUE_NAME = "router_queue_topic_add";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.add");

        //同一时刻只接受一条消息
        channel.basicQos(1);



        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 新增商品  '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }

接收iteme.del删除消息


    //交换机
    private static final String EXCHANGE_NAME = "exchange_topic";
    //队列
    public static final String QUEUE_NAME = "router_queue_topic_del";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");


        //绑定队列到交换机  绑定del路由 key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.del");


        //同一时刻只接受一条消息
        channel.basicQos(1);


        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 删除redis缓存  '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }

接收所有消息

 
    //交换机
    private static final String EXCHANGE_NAME = "exchange_topic";
    //队列
    public static final String QUEUE_NAME = "router_queue_topic_all";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");


        //绑定队列到交换机  绑定del路由 key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "iteme.#");


        //同一时刻只接受一条消息
        channel.basicQos(1);


        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 获取所有消息  '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }

发送消息

  //交换机
    private static final String EXCHANGE_NAME = "exchange_topic";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //消息
        String msg = "新增消息接收..... ";
        channel.basicPublish(EXCHANGE_NAME,"iteme.add",null,msg.getBytes());


        //String msg = "删除消息接收..... ";
        //channel.basicPublish(EXCHANGE_NAME,"iteme.del",null,msg.getBytes());
        System.out.println("发送的消息 : " + msg);

        channel.close();
        connection.close();
    }

topic消息队列功能更强大,可以兼容路由和发布订阅模式的消息功能。

代码地址

官网文档地址 https://www.rabbitmq.com/getstarted.html
环境安装 https://blog.csdn.net/weixin_38361347/article/details/107292227