RabbitMQ快速入门
简介
RabbitMQ是什么?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ是AMQP的一种实现,消息队列的一种,类似的实现还有:ActiveMQ,kafka等。
RabbitMQ可以做什么?
使用RabbitMQ可以将一些无需即时返回且耗时的操作抽离出去,进行异步操作,从而减少服务器的响应时间,提高系统的吞吐量。
例如:业务系统进行增删改查时,需要进行数据统计,就可以将统计数据的操作抽离出来,使用消息队列来异步统计。
AMQP是什么?
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
AMQP和Http类似,是一种协议、规范,而RabbitMQ是根据AMQP协议实现的一款产品。
AMQP的一些概念:
- Broker:接收和分发消息的应用,消息系统。
- Virtual Host:虚拟主机,一个消息服务可以划分多个Virtual Host,不同Virtual Host之间数据是隔离的互不影响,类似于命名空间。
- Connection:网络连接,如TCP/IP。
- Channel:通道,多路复用连接中的一条独立的双向数据流通道。如果每一次与消息服务通信都建立一次TCP连接,这个开销将是巨大的,Channel是建立在Connection内部的逻辑连接,减少了建立TCP连接的开销。
- Exchange:交换机,用来接收生产者发送的消息,并根据不同的规则将消息路由到不同的队列中。
- Queue:队列,存放由Exchange路由过来的消息,并发送给消费者。
- Binding:队列和交换机之间的关联。
服务安装部署
RabbitMQ分为服务端和客户端。
服务端由erlang语言编写,运行需要erlang环境,就如运行Java程序需要安装JDK一样。
所以安装RabbitMQ之前,需要先安装erlang环境。
erlang安装
下载erlang安装包,版本不能太久,具体参考官网版本号。
wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_22.2.1-1~centos~7_amd64.rpm
安装erlang
rpm -ivh esl-erlang_22.2.1-1~centos~7_amd64.rpm
如果安装出错,可能是缺少依赖。
yum install epel-release
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
执行erl
命令可以查看erlang是否安装成功。
RabbitMQ安装与启动
可以直接使用官网提供的rpm安装包,也可以使用二进制文件来构建安装。
安装之前先安装依赖。
yum install -y socat
安装完成后,就可以启动服务了,几个命令:
service rabbitmq-server start #启动
service rabbitmq-server stop #停止
service rabbitmq-server restart #重启
service rabbitmq-server status #查看状态
RabbitMQ开启Web可视化插件
rabbitmq-plugins enable rabbitmq_management
RabbitMQ默认只有一个账号:guest,且只允许本地登录,如果服务不是部署在本地是无法登录的,可以使用命令来创建账号:
# 添加账号 root root
rabbitmqctl add_user root root
# 添加标签 administrator(不代表真正的权限)
rabbitmqctl set_user_tags root administrator
RabbitMQ的Web服务默认端口为15672
,访问如下图所示:
使用刚刚创建的用户登录,即可看到管理页面。
Java操作RabbitMQ
RabbitMQ支持多种客户端,这里使用Java。
导入Maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
编写一个工具类
public class MQUtil {
//队列名
public static final String QUEUE_NAME = "queueTest";
//交换机名
public static final String EXCHANGE_NAME = "exchangeTest";
//连接工厂
private static final ConnectionFactory factory = new ConnectionFactory();
static {
factory.setHost("192.168.1.120");
factory.setPort(5672);
factory.setVirtualHost("test");
factory.setUsername("root");
factory.setPassword("root");
}
public static Connection getConnection(){
try {
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
创建生产者,发送消息
public class Producer {
private Connection connection = MQUtil.getConnection();
//发送消息
public void sendMessage(String message) throws Exception {
Channel channel = connection.createChannel();
/**
* 声明队列,各参数含义:
* 队列名称
* 是否持久化(队列持久化,不是消息持久化)
* 是否排他(只有当前连接可以操作队列)
* 是否自动删除
* 队列其他参数Map
*/
channel.queueDeclare(MQUtil.QUEUE_NAME, true, false, false, null);
//声明交换机 交换机名称、类型
channel.exchangeDeclare(MQUtil.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
/**
* 交换机和队列绑定
* 第三个参数:routingKey路由键,FANOUT类型的交换机无需路由键
*/
channel.queueBind(MQUtil.QUEUE_NAME, MQUtil.EXCHANGE_NAME, "");
channel.basicPublish(MQUtil.EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
}
}
创建消费者,消费消息
public class Consumer {
Connection connection = MQUtil.getConnection();
public void consume() throws Exception {
Channel channel = connection.createChannel();
//消费消息
channel.basicConsume(MQUtil.QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println(message);
}
});
}
}
启动生产者,发送一个消息,Web界面如下图:
- Ready:等待被消费的消息。
- Unacked:消息被取走,但还未确认删除。
- Total:消息总量。
Exchange的类型
fanout
只要队列与交换机绑定了,交换机就会把消息路由到队列中。
与路由键routingKey没有关系。
direct
交换机与队列通过一个路由键routingKey绑定,发送消息时指定routingKey,只有routingKey匹配时,交换机才会将消息路由到队列。
topic
主题模式,将消息进行分类,队列只关心某一类的消息。
在direct的基础上进行了模糊匹配,单词通过.
隔开,*
表示匹配1个单词,#
表示匹配任意个单词。
例如:日志采集系统,将info
和error
的日志单独采集,就可以使用topic模式。
headers
应用场景比较少,不关心routingKey是否匹配,而是根据headers中的Key-Value来匹配。
只有headers中的所有键值对全部匹配时才会路由消息。
与SpringBoot集成
生产者
模拟商品新增时,利用消息队列来异步统计信息。
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
通过Java代码的方式来配置
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("192.168.1.120");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("test");
return factory;
}
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("goodsExchange");
}
//声明队列
@Bean
public Queue queue(){
return new Queue("goodsQueue", true);
}
//声明binding
@Bean
public Binding binding(Queue queue,FanoutExchange exchange){
//将队列绑定到指定交换机上
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
}
商品新增成功后,发送消息,异步统计商品信息
@RestController
@RequestMapping("goods")
public class GoodsController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("save")
public Object save(String name) {
//保存商品
String id = UUID.randomUUID().toString(true);
//写具体的业务逻辑....
System.out.println("商品新增成功.");
//发送消息 商品id
rabbitTemplate.convertAndSend("goodsExchange", "", id);
return id;
}
}
消费者
消费者服务与生产者服务最好是分开,两者不应该耦合在一起,所以创建一个消费项目。
和生产者一样,引入必须的依赖,编写配置类。
消费类
@Component
public class GoodsConsumer {
//监听的队列名称
@RabbitListener(queues = "goodsQueue")
public void get(String message) throws Exception{
System.out.println("消息消费,新增商品统计id:"+message);
//编写统计的业务逻辑...
}
}
启动两个服务,发出商品新增请求,效果如图:
总结
介绍了一下RabbitMq的服务搭建以及简单使用,实际应用还需要考虑到很多问题,例如:消息丢失,消息重复消费等等,进阶使用笔者将另起篇幅,敬请期待!