RabbitMQ 介绍与测试与使用与基本命令
RabbitMQ 介绍与使用
RabbitMq是一个消息代理,接收消息和转发消息,类似一个邮局,当你把邮件放在邮箱时候,你可以确定邮差最终会吧邮件发送给你的收件人。
先看RabbitMQ架构:
可看到RabbitMQ 有如下几个组件:
生产者
Producer 一个消息发送的程序就是一个生产者。
队列:
Queue相当于一个邮箱,本质上是一个消息缓存器,生产者往队列上生产消息,消费者从队列上接收消息
消费者:
Consumer 也生产者定义类似,也是一个程序,来接收队列上的消息。
交换机 Exchange
Producer 不会直接把消息发送到Queue,会先发送到Exchange,然后Exchange按照一定规则发送到MQ
上面的X就是Exchange
名词解释:
Routing Key : 生产者将消息发送给Exchange时,一般会指定一个RoutingKey
<bean id="bgateQp1021RequestProducer" class="cpcn.payment.tool.middleware.mq.SmartMQProducer">
<property name="messageConverter" ref="messageConverter" />
<property name="connectionFactory" ref="jyMqConnectionFactory" />
<property name="retryTemplate" ref="retryTemplate" />
<property name="exchange" value="PaytExchange" />
<property name="queue" value="Payt_Bgate1021RequestRetryVO_Payt" />
<property name="routingKey" value="Payt_Bgate1021RequestRetryVO_Payt_key" />
</bean>
通过这个RoutingKey 可以知道消息流向哪,需要和Exchange Type 和 Binding key 一起使用。
<!-- queue binging key 绑定 -->
<!-- durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 -->
<!-- auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 -->
<rabbit:direct-exchange name="PaytExchange" durable="true" auto-delete="false" id="PaytExchange">
<rabbit:bindings>
<rabbit:binding queue="Payt_Bgate1001Request_Payt" key="Payt_Bgate1001Request_Payt_key" />
<rabbit:binding queue="Payt_Bgate1021RequestRetryVO_Payt" key="Payt_Bgate1021RequestRetryVO_Payt_key" />
<rabbit:binding queue="Payt_Bgate1022RequestRetryVO_Payt" key="Payt_Bgate1022RequestRetryVO_Payt_key" />
<rabbit:binding queue="Payt_Bgate3001Request_Payt" key="Payt_Bgate3001Request_Payt_key" />
<rabbit:binding queue="Payt_Bgate3002Request_Payt" key="Payt_Bgate3002Request_Payt_key" />
<rabbit:binding queue="Payt_Bgate3003Request_Payt" key="Payt_Bgate3003Request_Payt_key" />
<rabbit:binding queue="Bgate_Bgate1001Response_Payt" key="Bgate_Bgate1001Response_Payt_key" />
<rabbit:binding queue="Bgate_Bgate1021Response_Payt" key="Bgate_Bgate1021Response_Payt_key" />
<rabbit:binding queue="Bgate_Bgate1022Response_Payt" key="Bgate_Bgate1022Response_Payt_key" />
<rabbit:binding queue="Bgate_Bgate3001Response_Payt" key="Bgate_Bgate3001Response_Payt_key" />
<rabbit:binding queue="Bgate_Bgate3002Response_Payt" key="Bgate_Bgate3002Response_Payt_key" />
<rabbit:binding queue="Bgate_Bgate3003Response_Payt" key="Bgate_Bgate3003Response_Payt_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
上面的 direct-exchange就是 交换机的Exchange类型,binding 绑定的就是bindingKey .
Binding:
就是说将队列绑定到交换机上有一个Binding Key
exchange 有如下几种类型:
Direct :
处理路由建,将一个队列绑定到交换机上,改消息需要与一个特定的路由键(routingkey)完全匹配
是将消息发送到binding Key 与route key 一致的队列中。
Topic:与direct类似,功能更强,支持模糊绑定
* 表示通配一个词
# 表示通配0个或者多个词
DirectKey 是完全匹配,Topic 是模糊匹配:
fanout :
规则比较简单,是将发送到Exchange上面所有的消息都路由到他绑定的队列中。
headers
不依赖bindingKey和routeKey进行匹配,主要通过heads属性进行匹配。工作原理:
假设P1和C1同时注册了相同的brocker,Exchange
1.P1生产消息发送给Exchange
2. Exchange根据RoutingKey,找到匹配的队列Queue1
3.Queue1将消息发送给消费者C1
4. C1接收到消息后,发送ACk给队列Queue1确认消息
5.Queue1接收到ACK后,删除消息缓存。
RabbitMQ 使用代码:
发送者:
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws java.io.IOException {
try {
System.setProperty("USER_NAME","root");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.145.102");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
// 向指定 exchange 发送一个消息。由于是 fanout 模式, routing key 为空
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
}
}
// ...
/**
* @param argv
* @return
*/
private static String getMessage(String[] argv) {
return new String("test");
}
}
消费者:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.145.102");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明 exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 生命一个队列,名字随机
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到 exchange 上
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// consumer 简单的将接收到的消息打印出来
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
RabbitMQ基本使用命令:
1.查看用户权限
rabbitmqctl list_user_permissions root
2. 查看交换机 后面grep 是过滤的
rabbitmqctl list_exchanges |grep Sche
3. 查看用户
rabbitmqctl list_users
4. 新增用户
rabbitmqctl add_user Username Password
5. 删除用户
rabbitmqctl delete_user Username
6. 修改密码
rabbitmqctl change_password Username Newpassword
7.设置用户角色
rabbitmqctl set_user_tags User Tag
User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
角色名:
(1) 超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
(2) 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
与administrator的对比,administrator能看到这些内容
(4) 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
(5) 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
RabbitMQ遇到问题:
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'Sche' in vhost '/', class-id=60, method-id=40)
发现用户权限 全部是 / .* .* .*
权限应该正常
然后发现 其中一个队列的 name id 其中id被其他队列的id占用了
<rabbit:direct-exchange name="Sche" durable="true" auto-delete="false" id="Sche">
<rabbit:direct-exchange name="Stat" durable="true" auto-delete="false" id="Stat">
如果上面id='Stat' 改成Sche就会报错
上一篇: 部署开源blog——wordpress
下一篇: 沙虾营养价值和功效,这篇文章告诉你