欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息中间件-RabbitMQ的使用

程序员文章站 2022-06-21 14:54:48
初学消息中间件RabbitMQ一 . 环境安装1. 安装Erlang2. .安装RabbitMQ3. 配置RabbitMQ二 . java连接rabbieMQ一.简单实现消息队列1.创建项目,并添加rabbieMQ依赖2. 创建生产者类3. 创建消费者类4. 运行程序二 . 功能升级1. 一个生产者,两个消费者一 . 环境安装1. 安装Erlanghttp://www.erlang.org/downloads2. .安装RabbitMQhttp://www.rabbitmq.com/downloa...

一 . 环境安装

1. 安装Erlang

http://www.erlang.org/downloads

2. .安装RabbitMQ

http://www.rabbitmq.com/download.html
默认安装的RabbitMQ 监听端口是5672

RabbitMQ版本 Erlang最低要求 Erlang最高要求
3.7.7 - 3.7.12 20.3.x 21.x
3.7.0 - 3.7.6 19.3 20.3.x

3. 配置RabbitMQ

  1. 激活 RabbitMQ
    在RabbitMQ的安装目录的sbin下.打开cmd,执行rabbitmq-plugins.bat" enable rabbitmq_management
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
  1. 启动RabbitMQ服务
net start RabbitMQ
  1. 查看用户列表
rabbitmqctl.bat list_user
  1. 打开网址 http://localhost:15672
    使用默认的用户名和密码登录网址

  2. 创建新用户
    rabbitmqctl.bat add_user 用户名 密码

  3. 为用户分配权限

rabbitmqctl.bat set_user_tags 用户名 权限
  • rabbitmq用户角色可分为五类:超级管理员, 监控者, 策略制定者, 普通管理者以及其他。

    (1) 超级管理员(administrator)

    可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    (2) 监控者(monitoring)

    可登陆管理控制台(启用management
    plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    (3) 策略制定者(policymaker)

    可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。

    (4) 普通管理者(management)

    仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

    (5) 其他的

    无法登陆管理控制台,通常就是普通的生产者和消费者。

二 . java连接rabbieMQ

一 . 简单实现消息队列

写一个简单的程序来使用rabbitMQ,创建一个生产者类,一个消费者类.生产者写入消息到队列,消费者从队列消费消息.

消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用

1.创建项目,并添加rabbieMQ依赖


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>

2. 创建生产者类

package com.gw.test1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 *   生产者
 * */
public class Producer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.  创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 2. 创建连接
        Connection connection = factory.newConnection();
       // 3. 创建信道
        Channel channel = connection.createChannel();
 
		 /*
         * 4. 声明一个消息队列:
         * 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
         * 参数二: 是否需要持久化
         * 参数三: 设置当前队列是否只能被一个消费者消费
         * 参数四: 在没有消费者时是否自动删除这个队列
         * 参数五: 指定这个队列的其他消息
         * */
        channel.queueDeclare("hello", false, false, false, null);

        String msg = "my first rabbitMQ msg!!!";
        // 5. 发送消息到指定的队列
        /*
         *   参数1:交换机
         *   参数2:路由规则routingKey
         *   参数3:指定传递的消息锁携带的propertites
         *   参数4:要发布的消息,类型必须为bytes[]
         * */
        channel.basicPublish("", "hello", null, msg.getBytes());
        System.out.println("生产者发送消息成功");
        //6. 关闭资源
        channel.close();
        connection.close();
    }
}

3. 创建消费者类

package com.gw.test1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 * 消费者
 * */
public class Consumer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 获取连接对象
        Connection connection = factory.newConnection();
        // 3. 根据连接获取信道
        Channel channel = connection.createChannel();
 
		 /*
         * 4. 声明一个消息队列:
         * 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
         * 参数二: 是否需要持久化
         * 参数三: 设置当前队列是否只能被一个消费者消费
         * 参数四: 在没有消费者时是否自动删除这个队列
         * 参数五: 指定这个队列的其他消息
         * */
        channel.queueDeclare("hello", false, false, false, null);
        // 6. 开启监听队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到消息: " + new String(body, "UTF-8"));
            }
        };
        System.out.println("消费者正在等待消息...");
        // 5. 消费者消费队列的设置
        /*
         *   参数1:指定消费队列的名字
         *   参数2:是否自动ACK.--收到消息后是否马上回馈RabbitMQ
         *   参数3: 指定消费回馈,--调用哪个方法来监听队列
         * */
        channel.basicConsume("hello", true, consumer);
        //7. 阻塞程序,防止程序结束
        System.in.read();
        //8. 环比资源
        channel.close();
        connection.close();
    }
}

4. 运行程序

消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用

二 . 功能升级

1. 公平分发

消息中间件-RabbitMQ的使用
使用工作队列将工作分配给不同的消费者来提高效率是很有效的,rabbitMq在分配工作时是使用的轮询机制,就是平均分配给每一个消费者.
但是实际情况中,很多时候不同工作所用的时间不等,不同消费者的效率也不相同,轮询可能会造成部分消费者繁忙,剩余消费在空闲的情况.
为了使得每个消费者都按照自己的能力来消费,我们采用能者多劳的策略来分配任务.
实现这个功能只需要在消费者端为其增加Qos能力,并更改为手动ack,就可以让消费者根据自己的能力去消费,达到物尽其用.

  1. 生产者使用for循环发送20次消息到队列.其他不变
  2. 将其中一个消费者进行如下配置
 001 配置消费者每次消费1条消息
 002 手动回馈,表示已经完成当前工作,delivery.getEnvelope().getDeliveryTag()收到消息
 003 设置ack为手动
package com.gw.demo2;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer21 {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 获取连接对象
        Connection connection = factory.newConnection();
        // 3. 根据连接获取信道
        Channel channel = connection.createChannel();
        // 001 配置消费者每次消费1条消息
        channel.basicQos(1);
		 /*
         * 4. 声明一个消息队列:
         * 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
         * 参数二: 是否需要持久化
         * 参数三: 设置当前队列是否只能被一个消费者消费
         * 参数四: 在没有消费者时是否自动删除这个队列
         * 参数五: 指定这个队列的其他消息
         * */
        channel.queueDeclare("hello", false, false, false, null);
        // 5. 开启监听队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1接受msg:" + new String(body));
                // 002 手动回馈,表示已经完成当前工作delivery.getEnvelope().getDeliveryTag()收到消息的唯一标识
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        System.out.println("消费者1正在等待消息...");
        // 6. 消费者消费队列的设置
        /*
         *   参数1:指定消费队列的名字
         *   参数2:是否自动ACK.--收到消息后是否马上回馈RabbitMQ
         *   参数3: 指定消费回馈,--调用哪个方法来监听队列
         * */
        //003 设置ack为手动
        channel.basicConsume("hello", false, consumer);
        //7 . 阻塞程序,防止程序结束
        System.in.read();
        //8 . 释放资源
        channel.close();
        connection.close();
    }
}

两个消费者,其中一个进行了公平分发的配置,并且每次接受任务都会进行1毫秒的休眠,而另一个消费者采用默认的分发方式分配任务.由于只是发送打印消息所用时间很少,所以在消费者1休眠的1毫秒内,消费者2会消费大多数任务.
消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用

2. 队列的持久化

当RabbitMQ奔溃时,或者当服务重启时,前面所声明创建的队列都会消失,队列中的消息也会消失.所以需要对队列进行持久化设置.
在生产者和消费者声明队列的时候,将其第二个参数设置为true.

		 /*
         * 4. 声明一个消息队列:
         * 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
         * 参数二: 是否需要持久化
         * 参数三: 设置当前队列是否只能被一个消费者消费
         * 参数四: 在没有消费者时是否自动删除这个队列
         * 参数五: 指定这个队列的其他消息
         * */
        channel.queueDeclare("hello", false, false, false, null);

因为消息队列的声明创建是在按照运行顺序来进行的,而我们不确定每次运行是生产者在前还是消费者在前,所以两者都需要这只持久化.

三 . spring-boot整合RabbitMQ

1. 环境搭建

  1. 创建一个maven项目,删除其src文件夹,
  2. 在maven中创建两个Spring-boot的模型,生产者和消费者
  3. 在两个项目中的pop文件中导入RabbitMQ的包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 参数配置
    生产者参数配置
# 应用名称
spring.application.name=j2005_producer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

消费者参数配置

# 应用名称
spring.application.name=j2005_consumer
# 应用服务 WEB 访问端口
server.port=8081

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

2. 简单实现消息队列

简单实现一个消息队列,生产者发送消息,消费者接受消息:

  1. 消费者代码

创建一个service包,创建一个MQacceptImpl1类用来接听对应的消息队列

package com.gw.j2005_consumer.service.impl;

import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component//将其对象的创建交给Spring管理
@RabbitListener(queues = "hello")//指定的消息队列名
@Slf4j
public class MQacceptImpl1{

    @RabbitHandler
    public void acceptMsg(Object object) {
    //将接收到的消息打印
        log.info("hello:acceptMsg:" + object);
    }
}
  1. 生产者代码
    生产者在向消息队列发送消息之前需要先创建一个消息队列,因此使用配置类MQConfig.class来对其配置:
@Configuration
public class MqConfig {
    //  创建hello消息队列
    @Bean
    public Queue topicQueue() {
        return new Queue("hello");
    }
}

创建一个controller来发送消息:

package com.gw.j2005_producer.controller;

import com.gw.j2005_producer.service.MQsendService;
import com.gw.po.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MsgController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/sendMsg")
    public String sendMsg(String msg) {
        try {
            log.info("发送消息: " + msg);
            amqpTemplate.convertAndSend("hello", msg);
            return "发送成功";
        } catch (Exception e) {
        }
        return "发送失败";
    }
}

在浏览器访问此接口
消息中间件-RabbitMQ的使用
然后去消费者的控制台查看是否收到消息.
消息中间件-RabbitMQ的使用

由于队列实在生产者中配置的,所以需要先启动生产者的项目,创建队列,否则消费者监听不存在的队列会报错.

3 . 交换机

生产者发送消息到队列,实际上并不是直接发送到队列的.而是先发送到交换机,然后再由路由规则将消息转发到不同的消息队列.

RabbitMQ交换机有很多种,topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key*的绑定不同的队列:

消息中间件-RabbitMQ的使用
如果我们不对交换机进行配置,RabbitMq会有一个默认的交换机来转发消息;

在spring-boot中配置交换机很简单:

  • 生产者:
    在MQConfig.class配置文件中配置交换机,并将交换机和消息队列绑定
package com.gw.j2005_producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
    //  创建消息队列
    @Bean
    public Queue topicQueue() {
        return new Queue("topic.one");
    }


    // 创建一个topic类型的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    // 设置交换机和队列的绑定
    @Bean
    public Binding bindingBuilder(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.one");
    }

}

  • 创建一个接口通过交换机发送消息:
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RequestMapping("/sendMsg2")
    public String sendMsg2(String msg) {
        try {
            log.info("发送消息: " + msg);
            //参数分别对应:交换机名,路由规则,消息内容
            rabbitTemplate.convertAndSend("topicExchange", "topic.one", msg);
            log.info("发送消息:" + msg + "到交换机:topic.one ");
            return "发送成功";
        } catch (Exception e) {
        }
        return "发送失败";
    }
  • 消费者

创建一个方法用来监听topic.one消息队列

package com.gw.j2005_consumer.service.impl;

import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.one")
@Slf4j
public class MQacceptImpl2 implements MQacceptService {

    @Override
    @RabbitHandler
    public void acceptMsg(Object object) {
        log.info("one  : acceptMsg : " + object);
    }
}

启动项目后,去浏览器发送请求:

消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用

消息中间件-RabbitMQ的使用

4. topic交换机路由规则

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。

2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。

3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。

4、“#”表示0个或若干个关键字,表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

首先对topic规则配置,这里使用两个队列来测试(也就是在MqConfig类中创建和绑定的topic.one和topic.two.queue两个队列),其中

  • 生产者配置类MQConfig.class
package com.gw.j2005_producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MqConfig {
    //  创建topic消息队列
    @Bean
    public Queue topicQueue() {
        return new Queue("topic.one");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.two.queue");
    }

    //    创建一个topic类型的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //    设置交换机和队列1的绑定
    @Bean
    public Binding bindingBuilder(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.one");
    }

    //    设置交换机和队列2的绑定
    @Bean
    public Binding bindingBuilder2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");
    }
}
  • 生产者写一个发送消息的接口:
package com.gw.j2005_producer.controller;

import com.gw.j2005_producer.service.MQsendService;
import com.gw.po.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class MsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMsg3")
    public String sendMsg3(String msg) {
        log.info("发送消息: " + msg);
        try {
            for (int i = 0; i < 10; i++) {
                if (i % 2 == 0) {
                    rabbitTemplate.convertAndSend("topicExchange", "topic.one", msg + i);
                    log.info("发送消息:" + msg + i + " --交换机topicExchange:topic.one ");
                } else {
                    rabbitTemplate.convertAndSend("topicExchange", "topic.abcde", msg + i);
                    log.info("发送消息:" + msg + i + "--交换机topicExchange:topic.abcde");
                }
            }
            return "发送成功";
        } catch (Exception e) {
        }
        return "发送失败";
    }
}

消费者增加一个方法监听topic.two.queue

package com.gw.j2005_consumer.service.impl;

import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.two.queue")
@Slf4j
public class MQacceptImpl3 implements MQacceptService {

    @Override
    @RabbitHandler
    public void acceptMsg(Object object) {
        log.info("two.    : acceptMsg : " + object);
    }
}

消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用
消息中间件-RabbitMQ的使用
可以见得,在按照topic.one的routingKey来想交换机发送消息时,交换机的两个路由规则都可以匹配到topic.one,所以02468的消息被发送到两个消息队列被消费打印.而tipic.abcde,只可以匹配到topic.#,所以13579只能转发到topic.two.queue的队列.

topic交换机的消息发送流程图如下:

消息中间件-RabbitMQ的使用

在发送消息到交换机的时候,交换机会根据传递过来的routingKey1来模糊匹配自己所有绑定的队列中定义的routingKey2,将能与routingKey1匹配的routingKey2都发送一遍消息.如果没有与之匹配的队列,则丢弃消息.

本文地址:https://blog.csdn.net/gwgw0621/article/details/109751553