欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件MQ之RabbitMQ

程序员文章站 2024-03-15 23:34:18
...

消息中间件MQ之RabbitMQ

MQ

  • Message Queue(消息队列),是在消息的的传输过程中保存消息的容器,多用于分布式系统之间进行通信。

  • 优势:

    • 应用解耦:提高系统容错性和可维护性
    • 异步提速:提升用户体验和系统吞吐量
    • 削峰填谷:提高系统稳定性
  • 劣势:

    • 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
    • 系统复杂度提高:以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用
    • 一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,BCD系统可能处理失败

RabbitMQ简介

  • 是一个基于AMQP协议实现的一款消息中间件

  • AMQP:Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制

  • 基础架构图

消息中间件MQ之RabbitMQ

  • RabbitMQ 中的相关概念:

    • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
    • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
    • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
    • Queue:消息最终被送到这里等待 consumer 取走
    • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ的7种工作模式与JMS

  • 简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics 主题模式、RPC 远程调用模式、Publisher Confirms

  • JMS:即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API,是 JavaEE 规范中的一种,类比JDBC

  • 导入依赖:

    <dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
    

RabbitMQ之简单模式

消息中间件MQ之RabbitMQ

  • producer

    public class ProducerHello {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("192.168.33.129");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/it");//虚拟机 默认值/
            factory.setUsername("gmx");//用户名 默认 guest
            factory.setPassword("gmx");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
             */
            //如果没有一个名字叫hello的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("hello",true,false,false,null);
            /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
             */
            String body = "hello rabbitmq~~~";
            //6. 发送消息
            channel.basicPublish("","hello",null,body.getBytes());
            //7.释放资源
            channel.close();
            connection.close();
        }
    }
    
  • consumer

    public class ConsumerHello {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("192.168.33.129");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/it");//虚拟机 默认值/
            factory.setUsername("gmx");//用户名 默认 guest
            factory.setPassword("gmx");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
             */
            //如果没有一个名字叫hello的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("hello",true,false,false,null);
             /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);
                    System.out.println("body:"+new String(body));
                }
            };
            //监听消息队列
            channel.basicConsume("hello",true,consumer);
            //关闭资源?不要
        }
    }
    

RabbitMQ之work queues模式

消息中间件MQ之RabbitMQ

  • 代码同上,需创建两个consumer来同时接收消息(默认是轮询机制)

RabbitMQ之Publish/Subscribe 发布与订阅模式

消息中间件MQ之RabbitMQ

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

  • producer

    public class Producer_PubSub {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置链接信息
            factory.setHost("192.168.33.129");
            factory.setPort(5672);
            factory.setVirtualHost("/it");
            factory.setUsername("gmx");
            factory.setPassword("gmx");
            //创建连接
            Connection connection = factory.newConnection();
            //创建channel
            Channel channel = connection.createChannel();
            //创建交换机
            String exchangeName = "test_fanout";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //创建队列
            channel.queueDeclare("fanout_queue1",true,false,false,null);
            channel.queueDeclare("fanout_queue2",true,false,false,null);
            //绑定队列和交换机
            channel.queueBind("fanout_queue1",exchangeName,"");
            channel.queueBind("fanout_queue2",exchangeName,"");
            //发送消息
            String body = "log.......";
            channel.basicPublish(exchangeName,"",null,body.getBytes());
            //释放消息
            channel.close();
            connection.close();
        }
    }
    
  • consumer 1 接受处理fanout_queue1的消息

    public class Counstumer_PubSub1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //2. 设置参数
            connectionFactory.setHost("192.168.33.129");//ip  默认值 localhost
            connectionFactory.setPort(5672);//端口  默认值 5672
            connectionFactory.setVirtualHost("/it");//虚拟机 默认值/
            connectionFactory.setUsername("gmx");
            connectionFactory.setPassword("gmx");
            //3. 创建连接 Connection
            Connection connection = connectionFactory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            String queueName1 = "fanout_queue1";
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                    System.out.println(new String(bytes));
                }
            };
            channel.basicConsume(queueName1,true,consumer);
        }
    }
    
  • consumer 2 接受处理fanout_queue2的消息,将fanout_queue1改为fanout_queue2

RabbitMQ之Routing路由模式

消息中间件MQ之RabbitMQ

  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

  • producer

    public class Producer_Routing {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置链接信息
            factory.setHost("192.168.33.129");
            factory.setPort(5672);
            factory.setVirtualHost("/it");
            factory.setUsername("gmx");
            factory.setPassword("gmx");
            //创建连接
            Connection connection = factory.newConnection();
            //创建channel
            Channel channel = connection.createChannel();
            //创建交换机
            String exchangeName = "test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
            //创建队列
            channel.queueDeclare("direct_queue1",true,false,false,null);
            channel.queueDeclare("direct_queue2",true,false,false,null);
            //绑定队列和交换机
            channel.queueBind("direct_queue1",exchangeName,"error");
            channel.queueBind("direct_queue2",exchangeName,"error");
            channel.queueBind("direct_queue2",exchangeName,"info");
            channel.queueBind("direct_queue2",exchangeName,"warning");
            //发送消息
            String body = "error.......";
            String body1 = "info.......";
            String body2 = "warning.......";
            channel.basicPublish(exchangeName,"error",null,body.getBytes());
            channel.basicPublish(exchangeName,"info",null,body1.getBytes());
            channel.basicPublish(exchangeName,"warning",null,body2.getBytes());
            //释放消息
            channel.close();
            connection.close();
        }
    }
    
  • consumer 通过队列名字获取值

RabbitMQ之Topics 主题模式

消息中间件MQ之RabbitMQ

  • 在配置routing key 的时候可以使用通配符

  • *表示一个词,#表示n个词,以.号为分隔符,如*.haha表示匹配a.haha、不匹配a.b.haha

  • producer

    public class Producer_Topic {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置链接信息
            factory.setHost("192.168.33.129");
            factory.setPort(5672);
            factory.setVirtualHost("/it");
            factory.setUsername("gmx");
            factory.setPassword("gmx");
            //创建连接
            Connection connection = factory.newConnection();
            //创建channel
            Channel channel = connection.createChannel();
            //创建交换机
            String exchangeName = "test_TOPIC";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
            //创建队列
            channel.queueDeclare("TOPIC_queue1",true,false,false,null);
            channel.queueDeclare("TOPIC_queue2",true,false,false,null);
            //绑定队列和交换机
            channel.queueBind("TOPIC_queue1",exchangeName,"order.*");
            channel.queueBind("TOPIC_queue2",exchangeName,"#.error");
            channel.queueBind("TOPIC_queue2",exchangeName,"#.info.*");
            //发送消息
            String body = "error.......";
            String body1 = "order....... ";
            channel.basicPublish(exchangeName,"error",null,body.getBytes());
    //        channel.basicPublish(exchangeName,".error",null,body.getBytes());
            channel.basicPublish(exchangeName,"order.hah",null,body1.getBytes());
            //释放消息
            channel.close();
            connection.close();
        }
    }