RabbitMQ的安装及使用
什么是RabbitMQ?
RabbitMQ是一个消息代理:它的主要功能是接收和转发消息。 你可以把它想象成一个邮局:当你把你想要邮寄的邮件放在邮箱后,邮递员最终将邮件发送给你的收件人。在这个例子中,RabbitMQ充当的是邮政信箱,邮局和邮递员的角色。
RabbitMQ和邮局的主要区别在于它处理的不是实物邮件,而是接收,存储和转发二进制数据(我们称之为消息)。
在RabbitMQ中,我们有一些常用的术语:
- Producing(消息生产者):在RabbitMQ的使用场景中,Producing表示消息的生产者,它是用来发送消息的。
- Queue(队列):RabbitMQ接收到消息后,会将其存放在队列中。一个队列受到主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。 许多生产者可以发送消息至同一个队列,许多消费者可以从一个队列中接收数据。
- Consuming(消息消费者):消费者又称为接收者,实际就是消息的接收者,
Ps:需要说明的是,在RabbitMQ的使用场景中,Producing、Queue和Consuming无需部署在同一机器上,仅仅需要互相之间网络联通即可。
什么场景会使用RabbitMQ?
了解了什么是RabbitMQ以后,我们来思考一下,什么场景需要使用RabbitMQ?
RabbitMQ其中一个常用场景是分布式异步任务处理功能。
想象一下,在一个分布式系统中,模块A通过HTTP请求调用模块B的接口,而传递给模块B处理的任务又相对复杂,有一定的耗时。
如果是同步请求,所有的请求都等到模块B完整处理完成后再返回模块A的话,一个直接导致的后果是会在模块A和模块B之间阻塞大量的任务。一方面可能会由于给模块B的压力过大导致调度失败,同时也可能由于模块B连接打满无法接收任务。
此时,一种较好的解决方案如下:
模块A将任务信息发送给RabbitMQ即可,然后RabbitMQ将消息发送给模块B进行处理。一方面,避免了长连接导致的模块B无法访问,另一方面,可以通过调度任务处理机制以及部署多个模块B来进行分布式处理从而缓解服务压力。
RabbitMQ的安装
了解了什么是RabbitMQ,什么场景可以使用RabbitMQ以后,我们来看一下如何安装RabbitMQ。
我们以CentOS系统为例,分别讲解如何使用标准安装方式及Docker安装方式来安装RabbitMQ。
标准安装方式
对于CentOS系统而言,我们可以直接使用yum命令来安装RabbitMQ。
需要注意的是,在安装RabbitMQ之前,我们首先需要安装Erlang。
Ps:RabbitMQ本身是基于Erlang开发的。
# 安装Erlang
yum install erlang
# 安装rabbitmq-server
yum install rabbitmq-server
非常简单,执行上述两条命令后就已经快速安装好了rabbitmq。
那么我们应该如何启动呢?
默认情况下,安装rabbitmq后没有将其设置为守护进程(即开机自启动)。
我们可以以root用户执行如下命令来将其设置为自启动服务:
chkconfig rabbitmq-server on
启动rabbitmq的命令如下:
/sbin/service rabbitmq-server start
停止rabbitmq的命令如下:
/sbin/service rabbitmq-server stop
启动后,我们可以使用如下命令来检测5672端口是否已经被rabbitmq占用来判断服务是否已经启动:
lsof -i:5672
Docker安装方式
标准的安装方式中,我们安装rabbitmq前需要安装erlang。而且可能由于环境依赖等原因导致安装失败,下面,我们来讲解一下如何使用Docker来安装RabbitMQ。
首先,我们需要安装Docker服务,安装的方式如下文:
https://www.missshi.cn/api/view/blog/5a6327650a745f6335000002
安装完成Docker后,我们可以执行如下命令来拉取最新的rabbitmq镜像:
sudo docker pull rabbitmq
然后执行如下命令启动即可:
sudo docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 8439:15672 rabbitmq:management
RabbitMQ的使用:HelloWorld
下面,我们来用一个入门级的Demo程序来了解一下RabbitMQ的使用。
首先,我们需要实现一个Producing,来向RabbitMQ发送消息。
完整的示例代码send.py
如下:
#!/usr/bin/env python
import pika # pika是Python连接RabbitMQ的工具
# 第一步是用于连接RabbitMQ
# Demo程序中,send.py运行机器与RabbitMQ位于同一台机器中,因此使用localhost表示本机,如果部署在不同机器中,可以指定IP地址。
# Demo程序中,RabbitMQ默认启动占用的是5672端口,因此无需指定,如果RabbitMQ使用其他端口,需要使用port=****来指定端口信息。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 第二步,在发送消息之前,我们需要确保收件人队列存在。
# 如果我们发送消息到不存在的位置,RabbitMQ将只删除该消息。
# 下面,我们来创建一个将传递消息的hello队列
channel.queue_declare(queue='hello')
# 创建完成收件人队列后,我们就可以发送消息了。
# 第一条消息此处是一个字符串Hello World
# 在RabbitMQ中,消息永远不会直接发送到队列,它总是需要经过交换来实现的。
# 交换的功能细节我们会在后续进行讲解,现在只需要知道的使用由空字符串标识的默认交换。
# 我们可以使用routing_key参数中指定消息发送至哪个收件人队列。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 在退出程序之前,我们需要确保网络缓冲区被刷新,并且我们的消息被实际传送到RabbitMQ。
# 因此,我们需要执行断开连接功能。
connection.close()
Producing代码完成后,我们来继续学习Consuming部分的代码receiver.py
。
在这个Demo中Consuming实现的功能就是接收到消息后将其打印出来(标准输出)。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 同样进行验证指定队列存在(如果不存在则创建该队列,如果已经存在则忽略)
channel.queue_declare(queue='hello')
# 下面,我们定义一个回调函数。
# 每当我们收到一条消息,这个回调函数就在收到消息后调用。
# 在我们的例子中,这个函数会在屏幕上打印消息的内容。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 接下来,我们需要告诉RabbitMQ这个特定的回调函数应该从我们的hello队列接收消息
channel.basic_consume(callback,
queue='hello',
no_ack=True)
# 最后,我们进入一个永无止境的循环,等待数据在必要时运行回调。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
有可能出现:basic_consume() got multiple values for argument 'queue'
原因:参数位置发生改变,改成到如下代码
channel.basic_consume(‘celery’,callback, False)
下面,我们来运行一下看看吧~
首先,我们需要启动consumer来等待消息发送:
python receiver.py
下面,我们可以用sender.py来发送消息,每次执行后会发送一条消息:
python sender.py
现在,我们可以看到每次发送消息后,在receiver.py运行的终端中,会打印出sender.py中发送的消息HelloWorld。
管理工具
下面,我们来介绍两个rabbitmq的命令行工具:rabbitmqctl和rabbitmqadmin。
rabbitmqctl
rabbitmqctl是rabbitmq默认支持的命令行工具。
rabbitmqadmin
rabbitmqadmin是rabbitmq-management plugin支持的命令行工具。
相比rabbitmqctl而言,其功能更加强大。
rabbitmq-management插件提供了一个基于HTTP的API,用于管理和监控您的RabbitMQ服务器,以及基于浏览器的用户界面和命令行工具rabbitmqadmin。
具体的功能包括:
- 声明,列出和删除交换,队列,绑定,用户,虚拟主机和权限。
- 监控队列长度,全局和每个通道的消息速率,每个连接的数据速率等。
- 监视资源使用情况,如文件描述符,内存使用情况,可用磁盘空间。
- 管理用户(提供当前用户的管理权限)。
- 将对象定义(虚拟主机,用户,权限,队列,交换,绑定,参数,策略)导出和导入JSON。
- 强制关闭连接,清除队列。
- 发送和接收消息(在开发环境和故障排除中很有用)。
在使用前,我们首先需要启用rabbitmq_management。
rabbitmq-plugins enable rabbitmq_management
启动完成后,我们需要重启rabbitmq来使其生效:
/sbin/service rabbitmq-server restart
接下来,我们需要加载rabbitmqadmin。
wget http://127.0.0.1:15672/cli/rabbitmqadmin
下载完成后,我们就可以使用rabbitmqadmin了。
docker安装的话,可以打开 http://192.168.232.134:8439 (端口看自己的定义),默认账号密码:guest guest
Web管理页面
Web管理页面的启动方式同rabbitmqadmin,同样是需要启动rabbitmq_management。
启动完成后,可以访问http://180.76.58.75:15672/来访问
上一篇: 【RabbitMQ】-RabbitMQ安装及简单操作
下一篇: DHCP服务器安装