欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MQ(消息队列)的安装和使用

程序员文章站 2022-05-18 10:21:57
...

概念
1、MQ(消息队列),遵循先进先出的原则
2、提高系统稳定性,操作内容放到消息队列,能有效避免访问服务压力大,导致服务挂掉到的问题
3、解耦:mq也是相当于一个中介,和redis类似,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

执行流程:
用户访问服务时,他会先通过交换器来访问指定的MQ(队列),然后用户会得到一个反馈(类似于排队等待时反馈的消息:您的请求正在处理),然后队列会把这个请求给服务,等到服务处理完成后,队列就会在返回处理完请求的消息给用户。

1、RabbitMQ的下载和安装

1.1 安装erlang语言

RabbitMQ由Erlang语言开发,使用RabbitMQ前提是要安装Erlang。

注意:本人使用的是 Erlang/OTP 20.3版本和RabbitMQ3.7.3版本

下载地址:
http://erlang.org/download/otp_win64_20.3.exe

erlang安装完成需要配置erlang环境变量:

  • ERLANG_HOME(环境变量名)=D:\Program Files\erl9.3(安装的路径)

  • 然后在path中添 加 %ERLANG_HOME%\bin

1.2 安装RabbitMQ

安装后
1)运行服务
rabbitmq-service.bat install 安装服务
rabbitmq-service.bat stop 停止服务
rabbitmq-service.bat start 启动服务
2)安装管理插件 安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ;这个目录下
MQ(消息队列)的安装和使用
管理员身份运行cmd,然后执行 rabbitmq-plugins.bat enable rabbitmq_management
重新启动
3、启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
MQ(消息队列)的安装和使用

初始账号和密码:guest/guest

2、RabbitMQ操做java的helloworld的使用

2.1 导包

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <!--和springboot2.0.5对应-->
      <version>5.4.1</version>
    </dependency>

工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);   //java端的端口
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

2.1 普通使用

生产者(用户)

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //队列名称
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            //获取连接
            connection = ConnectionUtil.getConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             */
            channel.queueDeclare(QUEUE, true, false, false, null);
            //发送的消息
            String message = "helloworld小明" + System.currentTimeMillis();
            /**
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             * param3:消息包含的属性
             * param4:消息体
             */
            /**
             * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显
             示绑定或解除绑定
             * 默认的交换机,routingKey等于队列名称
             */
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
} 

消费者(服务1)

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    //从这个消息队列名里面去获取
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);

        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,让有延迟的处理完这个后才处理一下个消息)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {


            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {



                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE, true, consumer);
    }
} 

消费者(服务2)

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    //从这个消息队列名里面去获取
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE, true, false, false, null);


        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,不会让消息都堵在有延迟的里面)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {

//                int i = 1/0;

                //模仿网络延迟
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);

                //手动签收消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE, false, consumer);
    }
} 

2.2三种订阅模型

2.2.1 广播模式(FANOUT)

广播适合用来做群发短信等等(只要上头一发消息,下面的人就能收到一样的消息)

  • 生产者(发送消息的一方)
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 *  广播适合用来做群发短信等等(只要上头一发消息,下面的人就能收到一样的消息)
 */
public class Producer01 {
    //交换机的名字(消费者根据交换机的名称来绑定队列(MQ))
    public static final String NAME_EXCHANGE_FANOUT = "name_exchange_fanout";

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        Channel channel = connection.createChannel();

        /*
          创建交换机
            参数一:交换机名
            参数二:交换机类型(广播类型)
         */
        channel.exchangeDeclare(NAME_EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);

        //发送的消息
        String message = "helloworld小明" + System.currentTimeMillis();


        channel.basicPublish(NAME_EXCHANGE_FANOUT, "", null, message.getBytes());
        System.out.println("Send Message is:'" + message + "'");
    }
} 
  • 消费者1
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    //从这个消息队列名里面去获取
    private static final String NAME_QUEUE1_FANOUT = "name_queue1_fanout";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(NAME_QUEUE1_FANOUT, true, false, false, null);

        //广播的核心
        //将队列绑定到交换机(消息发布的时候,队列直接从交换机里面去获取)
        channel.queueBind(NAME_QUEUE1_FANOUT, Producer01.NAME_EXCHANGE_FANOUT, "");


        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,让有延迟的处理完这个后才处理一下个消息)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {


            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {



                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);

                //手动签收消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(NAME_QUEUE1_FANOUT, false, consumer);
    }
} 
  • 消费者2
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    //消费者的队列(到时候根据这个队列到交换机里面去获取)
    private static final String NAME_QEUEUE2_FANOUT = "name_qeueue2_fanout";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(NAME_QEUEUE2_FANOUT, true, false, false, null);

        //广播的核心
        //将队列绑定到交换机(消息发布的时候,队列直接从交换机里面去获取)
        channel.queueBind(NAME_QEUEUE2_FANOUT, Producer01.NAME_EXCHANGE_FANOUT, "");


        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,让有延迟的处理完这个后才处理一下个消息)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {


            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {



                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);

                //手动签收消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(NAME_QEUEUE2_FANOUT, false, consumer);
    }
} 

2.2.2 定向模式(Direct)

定向模式,将消息发布给指定的消费者(1或n个)

  •  生成者发布一个消息给交换机(给交换机时会指定一个路由的名称,到时候交换机就会根据这个路由的名称去匹配合适的队列),
    

生产者

/**
 *  定向模式,将消息发布给指定的消费者(1或n个)
 *      生成者发布一个消息给交换机(给交换机时会指定一个路由的名称,到时候交换机就会根据这个路由的名称去匹配合适的队列),
 */
public class Producer01 {
    //交换机名称
    public static final String EXCHANGE_NAME_direct = "direct_exchange_test";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = null;
        Channel channel = null;
        try {
            //获取连接
            connection = ConnectionUtil.getConnection();
            //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
            channel = connection.createChannel();

            //指定交换接的类型(定向类型)
            channel.exchangeDeclare(EXCHANGE_NAME_direct, BuiltinExchangeType.DIRECT);

            //发送的消息
            String message = "helloworld小明" + System.currentTimeMillis();

            channel.basicPublish(EXCHANGE_NAME_direct,"query" , null, message.getBytes());
            System.out.println("Send Message is:'" + message + "'");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
} 

消费者1

public class Consumer01 {
    //消息队列名
    private static final String NAME_QUEUE1_DIRECT = "name_queue1_direct";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(NAME_QUEUE1_DIRECT, true, false, false, null);

        //定向的核心
        // 绑定队列到交换机,同时指定需要订阅的routing key(路由)。假设此处需要update和delete消息(接收两种路由的消息)
        channel.queueBind(NAME_QUEUE1_DIRECT,Producer01.EXCHANGE_NAME_direct , "insert");
        channel.queueBind(NAME_QUEUE1_DIRECT,Producer01.EXCHANGE_NAME_direct , "update");


        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,让有延迟的处理完这个后才处理一下个消息)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {


            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {



                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);

                //手动签收消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(NAME_QUEUE1_DIRECT, false, consumer);
    }
} 

消费者2

public class Consumer02 {
    //消息队列名
    private static final String NAME_QUEUE2_DIRECT = "name_queue2_direct";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(NAME_QUEUE2_DIRECT, true, false, false, null);

        //定向的核心(到时候直接从交换机里面去获取值到队列里面,但要根据路由名)
        // 绑定队列到交换机,同时指定需要订阅的routing key(路由)。假设此处需要update和delete消息(接收两种路由的消息)
        channel.queueBind(NAME_QUEUE2_DIRECT,Producer01.EXCHANGE_NAME_direct , "insert");
        channel.queueBind(NAME_QUEUE2_DIRECT,Producer01.EXCHANGE_NAME_direct , "update");
        channel.queueBind(NAME_QUEUE2_DIRECT,Producer01.EXCHANGE_NAME_direct , "query");


        //让这个一次只能处理(接收)一条信息(能者多劳,实现另一个消费者网络有问题时,下一次就这个来处理,让有延迟的处理完这个后才处理一下个消息)
        channel.basicQos(1);

        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {


            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {



                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);

                //手动签收消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(NAME_QUEUE2_DIRECT, false, consumer);
    }
} 

2.2.3 通配符模式(Topics)

通配符模式(和定向模式一样,都是发送到指定的routingKey(路由)里面,但是这个消费者可以使用*来匹配)

修改生产方的模式

//指定交换机的模式(通配符)
channel.exchangeDeclare(EXCHANGE_NAME_TOPIC, BuiltinExchangeType.TOPIC);

消费方指定通配符

//通配符的核心
// 将队列绑定到交换机,同时指定需要订阅的routing key(路由)。假设此处需要update和delete消息(接收两种路由的消息)
//只要前缀匹配上,后面的都接收
channel.queueBind(NAME_QUEUE2_TOPIC,Producer01.EXCHANGE_NAME_TOPIC , "item.*");