消息中间件-RabbitMQ的使用
初学消息中间件RabbitMQ
一 . 环境安装
1. 安装Erlang
http://www.erlang.org/downloads
2. .安装RabbitMQ
http://www.rabbitmq.com/download.html
默认安装的RabbitMQ 监听端口是5672
RabbitMQ版本 | Erlang最低要求 | Erlang最高要求 |
---|---|---|
3.7.7 - 3.7.12 | 20.3.x | 21.x |
3.7.0 - 3.7.6 | 19.3 | 20.3.x |
3. 配置RabbitMQ
- 激活 RabbitMQ
在RabbitMQ的安装目录的sbin下.打开cmd,执行rabbitmq-plugins.bat" enable rabbitmq_management
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
- 启动RabbitMQ服务
net start RabbitMQ
- 查看用户列表
rabbitmqctl.bat list_user
-
打开网址 http://localhost:15672
使用默认的用户名和密码登录网址 -
创建新用户
rabbitmqctl.bat add_user 用户名 密码 -
为用户分配权限
rabbitmqctl.bat set_user_tags 用户名 权限
-
rabbitmq用户角色可分为五类:超级管理员, 监控者, 策略制定者, 普通管理者以及其他。
(1) 超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
(2) 监控者(monitoring)
可登陆管理控制台(启用management
plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。
(4) 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
(5) 其他的
无法登陆管理控制台,通常就是普通的生产者和消费者。
二 . java连接rabbieMQ
一 . 简单实现消息队列
写一个简单的程序来使用rabbitMQ,创建一个生产者类,一个消费者类.生产者写入消息到队列,消费者从队列消费消息.
1.创建项目,并添加rabbieMQ依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
2. 创建生产者类
package com.gw.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
* 生产者
* */
public class Producer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 创建连接
Connection connection = factory.newConnection();
// 3. 创建信道
Channel channel = connection.createChannel();
/*
* 4. 声明一个消息队列:
* 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
* 参数二: 是否需要持久化
* 参数三: 设置当前队列是否只能被一个消费者消费
* 参数四: 在没有消费者时是否自动删除这个队列
* 参数五: 指定这个队列的其他消息
* */
channel.queueDeclare("hello", false, false, false, null);
String msg = "my first rabbitMQ msg!!!";
// 5. 发送消息到指定的队列
/*
* 参数1:交换机
* 参数2:路由规则routingKey
* 参数3:指定传递的消息锁携带的propertites
* 参数4:要发布的消息,类型必须为bytes[]
* */
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println("生产者发送消息成功");
//6. 关闭资源
channel.close();
connection.close();
}
}
3. 创建消费者类
package com.gw.test1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
* 消费者
* */
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 获取连接对象
Connection connection = factory.newConnection();
// 3. 根据连接获取信道
Channel channel = connection.createChannel();
/*
* 4. 声明一个消息队列:
* 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
* 参数二: 是否需要持久化
* 参数三: 设置当前队列是否只能被一个消费者消费
* 参数四: 在没有消费者时是否自动删除这个队列
* 参数五: 指定这个队列的其他消息
* */
channel.queueDeclare("hello", false, false, false, null);
// 6. 开启监听队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者收到消息: " + new String(body, "UTF-8"));
}
};
System.out.println("消费者正在等待消息...");
// 5. 消费者消费队列的设置
/*
* 参数1:指定消费队列的名字
* 参数2:是否自动ACK.--收到消息后是否马上回馈RabbitMQ
* 参数3: 指定消费回馈,--调用哪个方法来监听队列
* */
channel.basicConsume("hello", true, consumer);
//7. 阻塞程序,防止程序结束
System.in.read();
//8. 环比资源
channel.close();
connection.close();
}
}
4. 运行程序
二 . 功能升级
1. 公平分发
使用工作队列将工作分配给不同的消费者来提高效率是很有效的,rabbitMq在分配工作时是使用的轮询机制,就是平均分配给每一个消费者.
但是实际情况中,很多时候不同工作所用的时间不等,不同消费者的效率也不相同,轮询可能会造成部分消费者繁忙,剩余消费在空闲的情况.
为了使得每个消费者都按照自己的能力来消费,我们采用能者多劳的策略来分配任务.
实现这个功能只需要在消费者端为其增加Qos能力,并更改为手动ack,就可以让消费者根据自己的能力去消费,达到物尽其用.
- 生产者使用for循环发送20次消息到队列.其他不变
- 将其中一个消费者进行如下配置
001 配置消费者每次消费1条消息
002 手动回馈,表示已经完成当前工作,delivery.getEnvelope().getDeliveryTag()收到消息
003 设置ack为手动
package com.gw.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer21 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 获取连接对象
Connection connection = factory.newConnection();
// 3. 根据连接获取信道
Channel channel = connection.createChannel();
// 001 配置消费者每次消费1条消息
channel.basicQos(1);
/*
* 4. 声明一个消息队列:
* 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
* 参数二: 是否需要持久化
* 参数三: 设置当前队列是否只能被一个消费者消费
* 参数四: 在没有消费者时是否自动删除这个队列
* 参数五: 指定这个队列的其他消息
* */
channel.queueDeclare("hello", false, false, false, null);
// 5. 开启监听队列
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1接受msg:" + new String(body));
// 002 手动回馈,表示已经完成当前工作delivery.getEnvelope().getDeliveryTag()收到消息的唯一标识
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
System.out.println("消费者1正在等待消息...");
// 6. 消费者消费队列的设置
/*
* 参数1:指定消费队列的名字
* 参数2:是否自动ACK.--收到消息后是否马上回馈RabbitMQ
* 参数3: 指定消费回馈,--调用哪个方法来监听队列
* */
//003 设置ack为手动
channel.basicConsume("hello", false, consumer);
//7 . 阻塞程序,防止程序结束
System.in.read();
//8 . 释放资源
channel.close();
connection.close();
}
}
两个消费者,其中一个进行了公平分发的配置,并且每次接受任务都会进行1毫秒的休眠,而另一个消费者采用默认的分发方式分配任务.由于只是发送打印消息所用时间很少,所以在消费者1休眠的1毫秒内,消费者2会消费大多数任务.
2. 队列的持久化
当RabbitMQ奔溃时,或者当服务重启时,前面所声明创建的队列都会消失,队列中的消息也会消失.所以需要对队列进行持久化设置.
在生产者和消费者声明队列的时候,将其第二个参数设置为true.
/*
* 4. 声明一个消息队列:
* 参数一: 指定队列的名称,必须是已经在rabbitMq注册上的
* 参数二: 是否需要持久化
* 参数三: 设置当前队列是否只能被一个消费者消费
* 参数四: 在没有消费者时是否自动删除这个队列
* 参数五: 指定这个队列的其他消息
* */
channel.queueDeclare("hello", false, false, false, null);
因为消息队列的声明创建是在按照运行顺序来进行的,而我们不确定每次运行是生产者在前还是消费者在前,所以两者都需要这只持久化.
三 . spring-boot整合RabbitMQ
1. 环境搭建
- 创建一个maven项目,删除其src文件夹,
- 在maven中创建两个Spring-boot的模型,生产者和消费者
- 在两个项目中的pop文件中导入RabbitMQ的包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 参数配置
生产者参数配置
# 应用名称
spring.application.name=j2005_producer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
消费者参数配置
# 应用名称
spring.application.name=j2005_consumer
# 应用服务 WEB 访问端口
server.port=8081
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
2. 简单实现消息队列
简单实现一个消息队列,生产者发送消息,消费者接受消息:
- 消费者代码
创建一个service包,创建一个MQacceptImpl1类用来接听对应的消息队列
package com.gw.j2005_consumer.service.impl;
import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component//将其对象的创建交给Spring管理
@RabbitListener(queues = "hello")//指定的消息队列名
@Slf4j
public class MQacceptImpl1{
@RabbitHandler
public void acceptMsg(Object object) {
//将接收到的消息打印
log.info("hello:acceptMsg:" + object);
}
}
- 生产者代码
生产者在向消息队列发送消息之前需要先创建一个消息队列,因此使用配置类MQConfig.class来对其配置:
@Configuration
public class MqConfig {
// 创建hello消息队列
@Bean
public Queue topicQueue() {
return new Queue("hello");
}
}
创建一个controller来发送消息:
package com.gw.j2005_producer.controller;
import com.gw.j2005_producer.service.MQsendService;
import com.gw.po.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MsgController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/sendMsg")
public String sendMsg(String msg) {
try {
log.info("发送消息: " + msg);
amqpTemplate.convertAndSend("hello", msg);
return "发送成功";
} catch (Exception e) {
}
return "发送失败";
}
}
在浏览器访问此接口
然后去消费者的控制台查看是否收到消息.
由于队列实在生产者中配置的,所以需要先启动生产者的项目,创建队列,否则消费者监听不存在的队列会报错.
3 . 交换机
生产者发送消息到队列,实际上并不是直接发送到队列的.而是先发送到交换机,然后再由路由规则将消息转发到不同的消息队列.
RabbitMQ交换机有很多种,topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key*的绑定不同的队列:
如果我们不对交换机进行配置,RabbitMq会有一个默认的交换机来转发消息;
在spring-boot中配置交换机很简单:
- 生产者:
在MQConfig.class配置文件中配置交换机,并将交换机和消息队列绑定
package com.gw.j2005_producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
// 创建消息队列
@Bean
public Queue topicQueue() {
return new Queue("topic.one");
}
// 创建一个topic类型的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
// 设置交换机和队列的绑定
@Bean
public Binding bindingBuilder(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.one");
}
}
- 创建一个接口通过交换机发送消息:
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg2")
public String sendMsg2(String msg) {
try {
log.info("发送消息: " + msg);
//参数分别对应:交换机名,路由规则,消息内容
rabbitTemplate.convertAndSend("topicExchange", "topic.one", msg);
log.info("发送消息:" + msg + "到交换机:topic.one ");
return "发送成功";
} catch (Exception e) {
}
return "发送失败";
}
- 消费者
创建一个方法用来监听topic.one消息队列
package com.gw.j2005_consumer.service.impl;
import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.one")
@Slf4j
public class MQacceptImpl2 implements MQacceptService {
@Override
@RabbitHandler
public void acceptMsg(Object object) {
log.info("one : acceptMsg : " + object);
}
}
启动项目后,去浏览器发送请求:
4. topic交换机路由规则
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4、“#”表示0个或若干个关键字,表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
首先对topic规则配置,这里使用两个队列来测试(也就是在MqConfig类中创建和绑定的topic.one和topic.two.queue两个队列),其中
- 生产者配置类MQConfig.class
package com.gw.j2005_producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
// 创建topic消息队列
@Bean
public Queue topicQueue() {
return new Queue("topic.one");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.two.queue");
}
// 创建一个topic类型的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
// 设置交换机和队列1的绑定
@Bean
public Binding bindingBuilder(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.one");
}
// 设置交换机和队列2的绑定
@Bean
public Binding bindingBuilder2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");
}
}
- 生产者写一个发送消息的接口:
package com.gw.j2005_producer.controller;
import com.gw.j2005_producer.service.MQsendService;
import com.gw.po.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg3")
public String sendMsg3(String msg) {
log.info("发送消息: " + msg);
try {
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("topicExchange", "topic.one", msg + i);
log.info("发送消息:" + msg + i + " --交换机topicExchange:topic.one ");
} else {
rabbitTemplate.convertAndSend("topicExchange", "topic.abcde", msg + i);
log.info("发送消息:" + msg + i + "--交换机topicExchange:topic.abcde");
}
}
return "发送成功";
} catch (Exception e) {
}
return "发送失败";
}
}
消费者增加一个方法监听topic.two.queue
package com.gw.j2005_consumer.service.impl;
import com.gw.j2005_consumer.service.MQacceptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.two.queue")
@Slf4j
public class MQacceptImpl3 implements MQacceptService {
@Override
@RabbitHandler
public void acceptMsg(Object object) {
log.info("two. : acceptMsg : " + object);
}
}
可以见得,在按照topic.one的routingKey来想交换机发送消息时,交换机的两个路由规则都可以匹配到topic.one,所以02468的消息被发送到两个消息队列被消费打印.而tipic.abcde,只可以匹配到topic.#,所以13579只能转发到topic.two.queue的队列.
topic交换机的消息发送流程图如下:
在发送消息到交换机的时候,交换机会根据传递过来的routingKey1来模糊匹配自己所有绑定的队列中定义的routingKey2,将能与routingKey1匹配的routingKey2都发送一遍消息.如果没有与之匹配的队列,则丢弃消息.
本文地址:https://blog.csdn.net/gwgw0621/article/details/109751553
上一篇: AMD RX 6500显卡曝光:与RX 6400区别不大
下一篇: MyBatis-plus自动填充