消息中间件之RabbitMQ基础
消息中间件之RabbitMQ
基础
一、RabbirMQ
介绍
RabbitMQ
使用Erlang
语言开发,支持的并发量不大,适用于中小企业使用,并发量不是很大。
RabbitMQ
是一个在AMQP
基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。
支持多种开发语言支持,Java
、Python
、Ruby
、PHP
、C/C++
等。
RabbitMQ
支持的工作队列
二、RabbitMQ
安装
想吐槽,这个RabbitMQ
和Erlang
安装实在是不好用,可能还是自己太菜,反正挺麻烦的。但是我们还是用Docker
这个好用的,哈哈!
官方也是用的:https://www.rabbitmq.com/download.html
docker pull rabbitmq:3-management # 拉取rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3-management # 启动容器
如果开启的防火墙,需要先开放端口5672
和15672
,然后在更新规则。
[aaa@qq.com ~]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
[aaa@qq.com ~]# firewall-cmd --zone=public --add-port=5672/tcp --permanent
[aaa@qq.com ~]# firewall-cmd --reload
访问:http://IP:15672/#/
用户名和密码:guest
和guest
,输入登录之后就可以进入RabbitMQ
管理页面。
RabbitMQ
支持这几种端口号:
- 5672:消息中间内部通讯的端口
- 15672:管理平台端口号
- 25672:集群的端口号
三、管理界面的使用
3.1. 管理界面介绍
3.2. 创建用户
首先看一下用户角色
角色 | 代码 | 描述 |
---|---|---|
超级管理员 | administrator | 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy )进行操作 |
监控者 | monitoring | 可登陆管理控制台,同时可以查看rabbitmq 节点的相关信息(进程数,内存使用情况,磁盘使用情况等) |
策略制定者 | policymaker | 可登陆管理控制台, 同时可以对policy 进行管理。但无法查看节点的相关信息(上图红框标识的部分) |
普通管理者 | management | 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理 |
其他 | 无法登陆管理控制台,通常就是普通的生产者和消费者 |
创建用户
3.3. 添加Virtual Hosts
3.4. 为用户添加Virtual Hosts
四、五种队列
常用的只有:点对点的简单队列、工作队列、发布订阅、路由、通配符这五种,下面我们详细介绍下
4.1. 点对点模式(简单队列)
4.1.1. 介绍
一个生产者P发送消息到队列Q,一个消费者C接收。有多个消费者会使用轮询方法进行消费队列中信息。
4.1.2. 代码演示
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName RabitConnection.java
* @Description tabbitmq 连接工具
* @createTime 2020年01月29日 - 18:53
*/
public class RabbitConnection {
public static Connection connection(){
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
// step 设置IP
connectionFactory.setHost("192.168.252.132");
// step 设置端口
connectionFactory.setPort(5672);
// step 设置用户名和密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
// step 设置 Virtual Host
connectionFactory.setVirtualHost("/long");
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName MQProducer.java
* @Description 生产者
* @createTime 2020年01月29日 - 18:58
*/
public class MQProducer {
public static void main(String[] args) {
try {
Connection connection = RabbitConnection.connection();
// step 创建通道
Channel channel = connection.createChannel();
String msg = "中国加油!";
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println("生产者发送消息:" + msg);
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName MQConsumer.java
* @Email aaa@qq.com
* @Description 消费者
* @createTime 2020年01月29日 - 19:02
*/
public class MQConsumer {
public static void main(String[] args) {
try {
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者接受的消息:" + msg);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.2. 工作队列
4.2.1. 介绍
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
4.2.2. 图例
采用工作队列,在通道中只需要设置basicQos
为1即可,表示MQ
服务器每次只会给消费者推送1条消息必须手动ack
确认之后才会继续发送。channel.basicQos(1)
。
4.2.3. 实例
生产者:
public class MQProducer {
public static void main(String[] args) {
try {
Connection connection = RabbitConnection.connection();
// step 创建通道
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
String msg = "第" + i + "条,中国加油!";
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println("生产者发送消息:" + msg);
}
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者:
public class MQConsumer {
public static void main(String[] args) {
final int time = 2000;
System.out.println("消费者:" + time);
try {
Connection connection = RabbitConnection.connection();
final Channel channel = connection.createChannel();
// MQ每次只能给消费者发送一条消息,必须返回ack之后才可以继续发送消息给消费者
channel.basicQos(1);
// auto Ack 默认自动签收(true), false(必须手动Ack)
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者接受的消息:" + msg);
// 手动告诉MQ从队列中删除这条消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
结果:
4.2.4. 队列存在的问题:消失丢失
消息丢失可能存在三方面:
- 生产者丢失数据:生产者将数据发送到
RabbitMQ
的时候,可能数据就在半路丢失; -
RabbitMQ
丢失数据:MQ
还没有持久化就自己丢失了 (MQ
挂了、MQ
拒绝接受消息 (队列满了)); - 消费端丢失数据:刚消费到,还没处理,结果进程挂了(重启消费端)
- 其他情况下: 硬盘坏了、持久化的过程断电了 ; 最好通过表记录每次生产者投递消息,如果长期没有被消费,手动的补偿消费。
4.2.5. 消息不丢失解决方法:
-
生产者方面:
-
方案一: 开启
RabbitMQ
事务(同步方法,不推荐)// 开启事务 channel.txSelect try { // 这里发送消息 } catch (Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommit
-
方案二:开启
confirm
模式(异步,推荐)channel.confirmSelect(); String msg = "第" + i + "条,中国加油!"; channel.basicPublish("", "hello", null, msg.getBytes()); if (channel.waitForConfirms()) { System.out.println("生产者发送消息:" + msg + "成功"); } else { System.out.println("生产者发送消息:" + msg + "失败"); }
-
-
MQ
方面:-
开启
RabbitMQ
的持久化(默认的情况下MQ
服务器端创建队列和交换机都是持久化的) -
通过代码设置持久化。
-
-
消费者方面:
- 关闭
RabbitMQ
的自动ACK
- 关闭
4.3. 发布订阅模式
RabbitMQ
支持的后面几种模式都是依赖于交换机。交换机支持一下几种模式:
- Direct exchange(直连交换机)
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
4.3.1. 原理介绍
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。(使用扇形交换机)
4.3.2. 创建交换机
4.3.3. 实例
先创建对应virtual host
的交换机。
生产者:
public class PsProducer {
private final static String EXCHANGE_NAME = "test_long";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("发布订阅模式中生产者启动...");
// 创建新的连接
Connection connection = RabbitConnection.connection();
// 创建通道
Channel channel = connection.createChannel();
// 绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String msg = "这是生产者发送的一个消息...";
// 发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
短信消费者:
public class SmsConsumer {
private final static String EXCHANGE_NAME = "test_long";
private final static String QUEUE_NAME = "sms_queue";
public static void main(String[] args) throws IOException {
System.out.println("短信消费者启动...");
// 创建新的连接
Connection connection = RabbitConnection.connection();
// 创建通道
Channel channel = connection.createChannel();
// 消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者接受的消息:" + msg);
}
});
}
}
邮件消费者:
public class EmailConsumer {
private final static String EXCHANGE_NAME = "test_long";
private final static String QUEUE_NAME = "email_queue";
public static void main(String[] args) throws IOException {
System.out.println("邮件消费者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者接受的消息:" + msg);
}
});
}
}
结果:
4.4. 路由模式
4.4.1. 简介
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息。
4.4.2. 实例代码
生产者
public class Producer {
private final static String EXCHANGE_NAME = "long_direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("发布订阅模式中生产者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
String msg = "这是生产者" + i + "发送邮件的一个消息...";
System.out.println("邮件消息: " + msg);
channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());
} else {
String msg = "这是生产者" + i + "发送短信的一个消息...";
System.out.println("短信消息: " + msg);
channel.basicPublish(EXCHANGE_NAME, "sms", null, msg.getBytes());
}
}
channel.close();
connection.close();
}
}
短信消费者
public class SmsConsumer {
private final static String EXCHANGE_NAME = "long_direct_exchange";
private final static String QUEUE_NAME = "sms_queue";
public static void main(String[] args) throws IOException {
System.out.println("短信消费者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者接受的消息:" + msg);
}
});
}
}
邮件消费者
public class EmailConsumer {
private final static String EXCHANGE_NAME = "long_direct_exchange";
private final static String QUEUE_NAME = "email_queue";
public static void main(String[] args) throws IOException {
System.out.println("邮件消费者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者接受的消息:" + msg);
}
});
}
}
消费结果:
4.5. 通配符模式
4.5.1. 简介
当交换机类型为topic类型时,根据队列绑定的路由键模糊转发到具体的队列中存放。
#
号表示支持匹配多个词;*
号表示只能匹配一个词。
4.5.2. 实例代码
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "long_topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("发布订阅模式中生产者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
// 修改为topic类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
String msg = "这是生产者" + i + "发送邮件的一个消息...";
System.out.println("邮件消息: " + msg);
// 路由键 为 topic.email.long
channel.basicPublish(EXCHANGE_NAME, "topic.email.long", null, msg.getBytes());
} else {
String msg = "这是生产者" + i + "发送短信的一个消息...";
System.out.println("短信消息: " + msg);
// 路由键 为 topic.sms
channel.basicPublish(EXCHANGE_NAME, "topic.sms", null, msg.getBytes());
}
}
channel.close();
connection.close();
}
}
短信消费者:使用#
可以匹配所有topic开头
的消息。
public class SmsConsumer {
private final static String EXCHANGE_NAME = "long_topic_exchange";
private final static String QUEUE_NAME = "sms_queue";
public static void main(String[] args) throws IOException {
System.out.println("短信消费者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.#");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者接受的消息:" + msg);
}
});
}
}
邮件消费者:使用*
可以匹配所有topic.*.*
(*为替代字符)的消息。
public class EmailConsumer {
private final static String EXCHANGE_NAME = "long_topic_exchange";
private final static String QUEUE_NAME = "email_queue";
public static void main(String[] args) throws IOException {
System.out.println("邮件消费者启动...");
Connection connection = RabbitConnection.connection();
Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.email.*");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者接受的消息:" + msg);
}
});
}
}
运行结果
五、Sptingboot
整合RabbitMQ
5.1. 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
5.2. 配置文件
spring:
rabbitmq:
# 连接地址
host: 192.168.252.132
# 端口号
port: 5672
# 账号
username: admin
# 密码
password: admin
# 地址
virtual-host: /long
5.3. 配置类:注册队列和交换机
主要三步:
- 创建队列
- 创建交换机
- 将队列绑定到交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName RabbitConfig.java
* @Description RabbitMQ 配置类
* @createTime 2020年02月17日 - 22:10
*/
@Component
public class RabbitConfig {
/** 交换机名称 */
private final static String EXCHANGE_NAME = "spring_boot_exchange";
/** 短信队列名称 */
private final static String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/** 邮件队列名称 */
private final static String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
/** 创建短信队列 */
@Bean
public Queue smsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
/** 创建邮件队列 */
@Bean
public Queue emailQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
/** 创建交换机 */
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
/** 将短信队列绑定到交换机 */
@Bean
public Binding smsBindingExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(smsQueue).to(fanoutExchange);
}
/** 将邮件队列绑定到交换机 */
@Bean
public Binding emailBindingExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(emailQueue).to(fanoutExchange);
}
}
5.4. 创建生产者和消费者
生产者:
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName HomeController.java
* @Description 消费者
* @createTime 2020年02月17日 - 22:47
*/
@RestController
public class HomeController {
private final static String EXCHANGE_NAME = "spring_boot_exchange";
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 投递消息,客户端不会马上知道消费者是否被消费,但是能够确认知道我们是否投递消息到中间件
* @return
*/
@GetMapping("/send_msg")
public String sendMsg() {
// 参数1 交换机名称 、参数2路由key 参数3 消息
amqpTemplate.convertAndSend(EXCHANGE_NAME, "", "这个是一条消息");
return "success";
}
}
消费者:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName FanoutEmailConsumer.java
* @Description 邮件消费者
* @createTime 2020年02月17日 - 22:52
*/
@Component
@RabbitListener(queues = "fanout_email_queue")
public class FanoutEmailConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("邮件消费者收到消息:" + msg);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 墨龙吟
* @version 1.0.0
* @ClassName FanoutEmailConsumer.java
* @Description 邮件消费者
* @createTime 2020年02月17日 - 22:52
*/
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("短信消费者收到消息:" + msg);
}
}
springboot
会自动创建交换机和队列,不需要我们手动创建。
5.5. 结果
六、欢迎关注个人微信公众号
上一篇: (四)修改webpart并在SharePoint Online中调试
下一篇: C 蛇形数组