rabbitMQ基础应用
1、安装erlang
[root@localhost ~]#yum -y install erlang
2、安装rabbitmq
[root@localhost ~]#yum -y install rabbitmq-server
3、添加用户
[root@localhost ~]# rabbitmqctl add_user rabbit_user 123.com // 添加admin的用户密码为123.com
4、将角色添加到管理员组
[root@localhost ~]# rabbitmqctl set_user_tags rabbit_user administrator
5、设置用户权限
[root@localhost ~]# rabbitmqctl set_permissions -p "/" rabbit_user ".*" ".*" ".*" //set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
6、启用web插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
7、启动rabbitmq服务
[root@localhost ~]# systemctl start rabbitmq-server
8、访问,如果一切顺利的话你会看到如下界面
9、输入用户名密码进入rabbitmq后台,你会看到像下面这个样子。
到此rabbitmq已经可以正常运行了,下面我们使用python来操作队列。
1、安装pika模块
[root@localhost ~]# pip3 install pika
2、创建生产者模型
[root@localhost rabbitmq]# vim producer.py #!/usr/bin/env python import pika # 创建凭证,使用rabbitmq用户密码登录 credentials = pika.plaincredentials("rabbit_user","123.com") # 新建连接,这里localhost可以更换为服务器ip connection = pika.blockingconnection(pika.connectionparameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue” channel.queue_declare(queue='test_queue') # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''), # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 while true: content = input("生产者输入数据>>>") if content.upper() == "q": break channel.basic_publish(exchange='', routing_key='test_queue', body=content) print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 connection.close()
3、运行生产者模型
[root@localhost rabbitmq]# python3 producer.py
4、创建消费者模型
[root@localhost rabbitmq]# vim consumer.py #!/usr/bin/env python import pika # 建立与rabbitmq的连接 credentials = pika.plaincredentials("rabbit_user","123.com") connection = pika.blockingconnection(pika.connectionparameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道中创建队列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): print("消费者接收了消息:%r"%body.decode("utf8")) # 有消息来时,立即执行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=true) # 等待接收消息 channel.start_consuming()
auto_ack=true:表示不确认机制也就是说每次消费者接收到数据后,不管是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除,这样会有个缺陷,就是当我们从队列中获取到消息后,碰巧程序崩溃,或者什么其它原因导致程序终,此时消息已经从消息队列中删除,造成数据的不安全。
5、运行消费者模型
[root@localhost rabbitmq]# python3 consumer.py
6、验证消息队列的轮询机制
7、下面演示auto_ack=true的不安全机制,我们在消费者模型中主动抛出异常模拟程序非正常终止,然后查看消息队列。
7.1修改消费者模型(主动抛出异常)如下:
[root@localhost rabbitmq]# vim consumer.py #!/usr/bin/env python import pika # 建立与rabbitmq的连接 credentials = pika.plaincredentials("rabbit_user","123.com") connection = pika.blockingconnection(pika.connectionparameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道中创建队列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): # 主动抛出异常 raise typeerror print("消费者接收了消息:%r"%body.decode("utf8")) # 有消息来时,立即执行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=true) # 等待接收消息 channel.start_consuming()
7.2运行生产者模型
[root@localhost rabbitmq]# python3 producer.py
7.3运行消费之模型,查看队列变化
8、使用相对可靠的消息机制确认来保证数据安全
修改消费者模型的配置文件如下;
[root@localhost rabbitmq]# vim consumer.py #!/usr/bin/env python import pika # 建立与rabbitmq的连接 credentials = pika.plaincredentials("rabbit_user","123.com") connection = pika.blockingconnection(pika.connectionparameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道中创建队列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): # 主动抛出异常 raise typeerror print("消费者接收了消息:%r"%body.decode("utf8")) # 告诉消息队列,我已经确认收到消息了 ch.basic_ack(delivery_tag=method.delivery_tag) # 有消息来时,立即执行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=false) # 等待接收消息 channel.start_consuming()
这样在消费者没有确认的情况下,消息队列中的消息是不会被删除的。如下;
至于是选择auto_ack确认机制还是使用auto_ack不确认机制,还需要你根据实际情况来定。
9、消息队列的持久化
我们上面的配置如果rabbitmq重启服务,或者系统重启都会导致队列里的消息被清除掉。下面我们修改生产者的配置文件使消息队列持久化,即使重启服务,消息队列里的消息也不会被清除。
[root@localhost rabbitmq]# vim producer.py
#!/usr/bin/env python import pika # 创建凭证,使用rabbitmq用户密码登录 credentials = pika.plaincredentials("rabbit_user","123.com") # 新建连接,这里localhost可以更换为服务器ip connection = pika.blockingconnection(pika.connectionparameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue”,durable=true表示持久化队列 channel.queue_declare(queue='delivery_queue',durable=true) # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''), # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 while true: content = input("生产者输入数据>>>") if content.upper() == "q": break channel.basic_publish(exchange='', routing_key='delivery_queue', body=content, properties=pika.basicproperties(delivery_mode=2)) # 2代表的是持久化队列 print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 connection.close()
常用命令
// 新建用户 rabbitmqctl add_user {用户名} {密码} // 设置权限 rabbitmqctl set_user_tags {用户名} {权限} // 查看用户列表 rabbitmqctl list_users // 为用户授权 添加 virtual hosts : rabbitmqctl add_vhost <vhost> // 删除用户 rabbitmqctl delete_user username // 修改用户的密码 rabbitmqctl change_password username newpassword // 删除 virtual hosts : rabbitmqctl delete_vhost <vhost> // 添加 users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 删除 users : delete_user <username> // 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看权限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除权限 rabbitmqctl clear_permissions [-p vhostpath] user rabbitmqctl reset // 重启应用 rabbitmqctl stop_app // 关闭应用 rabbitmqctl start_app // 启动应用 rabbitmqctl list_queues // 查看队列 rabbitmqctl list_exchanges // 查看exchangelist rabbitmqctl list_queues // 查看所有queue rabbitmqctl list_users // 查看所有用户 rabbitmqctl list_bindings // 查看所有绑定exchange和queued 消息 rabbitmqctl list_queues name messages_ready messages_unacknowledged // 查看消息确认 rabbitmqctl status // 查看rabbitmq的状态信息。
上一篇: tcp_tw_recycle参数引发的数据库连接异常
下一篇: SQL语句性能调整原则