欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ 介绍与测试与使用与基本命令

程序员文章站 2022-06-04 09:59:05
...


RabbitMQ 介绍与使用

RabbitMq是一个消息代理,接收消息和转发消息,类似一个邮局,当你把邮件放在邮箱时候,你可以确定邮差最终会吧邮件发送给你的收件人。

先看RabbitMQ架构:

                    RabbitMQ 介绍与测试与使用与基本命令

可看到RabbitMQ 有如下几个组件:


生产者

   Producer 一个消息发送的程序就是一个生产者。

RabbitMQ 介绍与测试与使用与基本命令

队列:

   Queue相当于一个邮箱,本质上是一个消息缓存器,生产者往队列上生产消息,消费者从队列上接收消息

RabbitMQ 介绍与测试与使用与基本命令

消费者: 

 Consumer 也生产者定义类似,也是一个程序,来接收队列上的消息。

 RabbitMQ 介绍与测试与使用与基本命令

交换机 Exchange

  Producer 不会直接把消息发送到Queue,会先发送到Exchange,然后Exchange按照一定规则发送到MQ

      RabbitMQ 介绍与测试与使用与基本命令

上面的X就是Exchange


名词解释:

Routing Key : 生产者将消息发送给Exchange时,一般会指定一个RoutingKey

    <bean id="bgateQp1021RequestProducer" class="cpcn.payment.tool.middleware.mq.SmartMQProducer">
        <property name="messageConverter" ref="messageConverter" />
        <property name="connectionFactory" ref="jyMqConnectionFactory" />
        <property name="retryTemplate" ref="retryTemplate" />
        <property name="exchange" value="PaytExchange" />
        <property name="queue" value="Payt_Bgate1021RequestRetryVO_Payt" />
        <property name="routingKey" value="Payt_Bgate1021RequestRetryVO_Payt_key" />
    </bean>

通过这个RoutingKey 可以知道消息流向哪,需要和Exchange  Type 和 Binding key 一起使用。

  <!--  queue binging key 绑定 -->
    <!-- durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 -->
    <!-- auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 -->
    <rabbit:direct-exchange name="PaytExchange" durable="true" auto-delete="false" id="PaytExchange">
        <rabbit:bindings>
            <rabbit:binding queue="Payt_Bgate1001Request_Payt" key="Payt_Bgate1001Request_Payt_key" />
            <rabbit:binding queue="Payt_Bgate1021RequestRetryVO_Payt" key="Payt_Bgate1021RequestRetryVO_Payt_key" />
            <rabbit:binding queue="Payt_Bgate1022RequestRetryVO_Payt" key="Payt_Bgate1022RequestRetryVO_Payt_key" />
            <rabbit:binding queue="Payt_Bgate3001Request_Payt" key="Payt_Bgate3001Request_Payt_key" />
            <rabbit:binding queue="Payt_Bgate3002Request_Payt" key="Payt_Bgate3002Request_Payt_key" />
            <rabbit:binding queue="Payt_Bgate3003Request_Payt" key="Payt_Bgate3003Request_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate1001Response_Payt" key="Bgate_Bgate1001Response_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate1021Response_Payt" key="Bgate_Bgate1021Response_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate1022Response_Payt" key="Bgate_Bgate1022Response_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate3001Response_Payt" key="Bgate_Bgate3001Response_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate3002Response_Payt" key="Bgate_Bgate3002Response_Payt_key" />
            <rabbit:binding queue="Bgate_Bgate3003Response_Payt" key="Bgate_Bgate3003Response_Payt_key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

上面的 direct-exchange就是 交换机的Exchange类型,binding 绑定的就是bindingKey .

Binding:

      就是说将队列绑定到交换机上有一个Binding Key 

            RabbitMQ 介绍与测试与使用与基本命令

exchange  有如下几种类型:

Direct

   处理路由建,将一个队列绑定到交换机上,改消息需要与一个特定的路由键(routingkey)完全匹配

是将消息发送到binding Key 与route key 一致的队列中。

              RabbitMQ 介绍与测试与使用与基本命令

Topic:与direct类似,功能更强,支持模糊绑定

* 表示通配一个词

# 表示通配0个或者多个词

DirectKey 是完全匹配,Topic 是模糊匹配:

   RabbitMQ 介绍与测试与使用与基本命令


fanout

   规则比较简单,是将发送到Exchange上面所有的消息都路由到他绑定的队列中。


                           RabbitMQ 介绍与测试与使用与基本命令

headers

   不依赖bindingKey和routeKey进行匹配,主要通过heads属性进行匹配。


工作原理:

假设P1和C1同时注册了相同的brocker,Exchange


1.P1生产消息发送给Exchange

2. Exchange根据RoutingKey,找到匹配的队列Queue1

3.Queue1将消息发送给消费者C1

4. C1接收到消息后,发送ACk给队列Queue1确认消息

5.Queue1接收到ACK后,删除消息缓存。



RabbitMQ 使用代码:

发送者:

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws java.io.IOException {
        
        try {
            System.setProperty("USER_NAME","root"); 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.145.102");
            factory.setPort(5672);
            factory.setUsername("root");
            factory.setPassword("root");
            factory.setVirtualHost("/");
            
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            // 声明一个 exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = getMessage(argv);

            // 向指定 exchange 发送一个消息。由于是 fanout 模式, routing key 为空
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            channel.close();
            connection.close();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally{
           
        }
     
    }

    // ...

    /**
     * @param argv
     * @return
     */
    private static String getMessage(String[] argv) {
        return new String("test");
    }
}

消费者:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.145.102");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 生命一个队列,名字随机
        String queueName = channel.queueDeclare().getQueue();
        // 将队列绑定到 exchange 上
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // consumer 简单的将接收到的消息打印出来
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}


RabbitMQ基本使用命令:

1.查看用户权限

rabbitmqctl list_user_permissions root

2. 查看交换机 后面grep 是过滤的

rabbitmqctl list_exchanges |grep Sche
3. 查看用户

rabbitmqctl list_users

4. 新增用户

rabbitmqctl  add_user  Username  Password

5. 删除用户

rabbitmqctl  delete_user  Username

6. 修改密码

rabbitmqctl  change_password  Username  Newpassword
7.设置用户角色

rabbitmqctl  set_user_tags  User  Tag

User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。

角色名:

(1) 超级管理员(administrator)

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

(2) 监控者(monitoring)

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

(3) 策略制定者(policymaker)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

与administrator的对比,administrator能看到这些内容

(4) 普通管理者(management)

仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

(5) 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。


RabbitMQ遇到问题:

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'Sche' in vhost '/', class-id=60, method-id=40)

发现用户权限 全部是 / .* .* .* 

权限应该正常

然后发现 其中一个队列的 name  id  其中id被其他队列的id占用了


  <rabbit:direct-exchange name="Sche" durable="true" auto-delete="false" id="Sche">


<rabbit:direct-exchange name="Stat" durable="true" auto-delete="false" id="Stat">


如果上面id='Stat' 改成Sche就会报错





RabbitMQ 介绍与测试与使用与基本命令