欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitMQ

程序员文章站 2022-07-12 12:48:57
...

RabbitMQ安装

我的系统版本:

[[email protected] ~]#cat /etc/issue
CentOS release 6.5 (Final)
  1. 安装epel库:EPEL 是yum的一个软件源,里面包含了许多基本源里没有的软件。
wget  http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
rpm -ivh epel-release-6-8.noarch.rpm
yum repolist    #看到epel,说明安装成功了

2.安装erlang,rabbitmq

yum install erlang -y    #rabbitmq是erlang语言开发的
yum install rabbitmq-server -y
  1. service rabbitmq-server start 默认端口5672

  2. 启用维护插件:rabbitmq-plugins enable rabbitmq_management
    界面 http://ip:15672/ 用户名密码 guest
    无法登陆解决方法: vi /etc/rabbitmq.config 写入信息,
    [{rabbit, [{loopback_users, []}]}]. 注意 . 一定要有,保存
    service rabbitmq-server restart(如果重启出现 错误 请把楼上的配置文件保存Ansi 编码)

Python操作RabbitMQ

基本用法

发布端:

import pika

#创建一个基本的socket连接对象
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.1.200')
)

channel = connection.channel()  #创建一个管道对象

#声明queue
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

connection.close()

接收端:

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.200'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("Received %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端主动确认


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

channel.start_consuming()
no-ack
  • no-ack=False 表示消费完以后不主动把状态通知rabbitmq
  • no-ack=True 当程序断开将丢掉消息

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed,
or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

  • 回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False
消息持久化

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

channel.queue_declare(queue='hello', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                      delivery_mode = 2,       # make message persistent
                      ))

生产者和消费者端:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
channel = connection.channel()
channel.queue_declare(queue='cc', durable=True)  #如果有cc的队列,略过;如果没有,创建cc的队列(持久化队列)

channel.basic_publish(exchange='',
                      routing_key='cc',
                      body='hello world!!!',
                      properties=pika.BasicProperties(delivery_mode=2))  #消息持久化
connection.close()
import pika

connection =pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.200"))
channel = connection.channel()
channel.queue_declare(queue='cc', durable=True)

def callback(ch, method, properties, body):
    print('Received %r' %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='cc')
channel.start_consuming()

查看当前队列: #rabbitmqctl list_queues (usr/sbin/rabbitmqctl)

上一篇: RabbitMq简介

下一篇: 安装rabbitmq