互联网技术体验 - RabbitMQ
RabbitMQ (https://www.rabbitmq.com/)
是款使用Erlang
语言构建的消息队列服务器(Message Queue Server
),该类服务也称为消息中间件是对AMQP
协议的应用实现,用于各应用程序之间的异步消息传递。
Installation
因为RabbitMQ
基于Erlang
开发,因此Erlang
环境需要提前部署到服务器,可参考官网页面:https://www.rabbitmq.com/download.html,因对软件版本没有特定要求,所以个人实践中直接使用发行版Repo
提供的Erlang
和RabbitMQ
。
apt-get install erlang
which erl
apt-get install rabbitmq-server
service rabbitmq-server status
RabbitMQ
配置文件为/etc/rabbitmq/rabbitmq-env.conf
,可用来定义监听IP
和端口,一般无需修改。
Command Line Tools + Management Plugin
RabbitMQ
提供的命令管理工具可参考https://www.rabbitmq.com/cli.html,主要有以下三个命令:
rabbitmqctl for general administrative/operator tasks
rabbitmq-plugins for plugin management
rabbitmqadmin for operator tasks over HTTP API
其中rabbitmqadmin
命令是对RabbitMQ
提供的管理插件基于HTTP
协议接口的封装以便于编程控制,实际中主要使用该插件提供的web
界面管理和监控MQ
服务。
1. rabbitmqctl
rabbitmqctl status
rabbitmqctl list_queues
rabbitmqctl list_users
rabbitmqctl add_user root 123
rabbitmqctl change_password root 456
rabbitmqctl set_user_tags root administrator
rabbitmqctl list_vhosts
rabbitmqctl list_permissions
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
2. rabbitmq-plugins
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins list
3. rabbitmq_management HTTP GUI/API
参考https://www.rabbitmq.com/management.html
http://<IP>:15672/
http://<IP>:15672/api
http://<IP>:15672/cli
Note:
默认guest/guest
口令因安全问题只能通过本机127.0.0.1
接口访问,因此如果网络访问服务需要添加其他账户并设置VHOST
权限,一般添加admin
用户访问GUI
和使用库编程。
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
Client Library
RabbitMQ
提供了多种语言编写的API
库,并且官方提供了详细的程序实例参考https://www.rabbitmq.com/getstarted.html,如果选用Java
库通过网络访问RabbitMQ
服务需要在ConnectionFactory
对象中设置用户名密码覆盖默认的guest/guest
。
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost("192.168.19.131");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
官方提供的简单实例:
export CLASSPATH=.:/home/ce/Desktop/jar/amqp-client-4.0.2.jar:/home/ce/Desktop/jar/slf4j-api-1.7.21.jar:/home/ce/Desktop/jar/slf4j-simple-1.7.22.jar
#send.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class send {
private final static String queue_name = "test";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.19.131");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queue_name, false, false, false, null);
String message = "Test Message @#$%^&";
channel.basicPublish("", queue_name, null, message.getBytes());
System.out.println("[x] sent '" + message + "'");
channel.close();
connection.close();
}
}
#recv.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class recv {
private final static String queue_name = "test";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.19.131");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queue_name, false, false, false, null);
System.out.println("Message Receiving. if exit CTRL + C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
}
};
channel.basicConsume(queue_name, true, consumer);
}
}
执行结果如下:
aaa@qq.com:/home/ce/Desktop# java send
[x] sent 'Test Message @#$%^&'
aaa@qq.com:/home/ce/Desktop# java recv
Message Receiving. if exit CTRL + C
[x] Received 'Test Message @#$%^&'
.........
对于服务器上的消息状态可以通过rabbitmqctl list_queues
查看也可以通过网页上对应Queue_Name
页面里的Get Message(s)
查看。