欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件之RabbitMQ专题二:RabbitMQ介绍

程序员文章站 2024-03-16 08:22:28
...

一、消息中间件介绍

我们用java来举例子, 打个比方我们客户端发送一个下单请求给订单系统(order)订单系统发送了
一个请求给我们的库存系统告诉他需要更改库存了, 我已经下单了, 这里, 每一个请求我们都可以看作一条消息。
但是 我们客户端需要等待订单系统告诉我这条消息的处理结果(我到底有没有下单成功) 但是订单系统不需要知道库存系统这条消息的处理情况 因为无论你库存有没有改动成功, 我订单还是下了, 因为是先下完了订单(下成功了) 才去更改库存, 库存如果更改出BUG了 那是库存系统的问题, 这个BUG不会影响订单系统。如果这里你能理解的话, 那么我们就能发现 我们用户发送的这条消息(下订单), 是需要同步的(我需要知道结果), 订单发送给库存的消息,是可以异步的(我不想知道你库存到底改了没, 我只是通知你我这边成功下了一个订单)
那么如果我们还按原来的方式去实现这个需求的话, 那么结果会是这样:
消息中间件之RabbitMQ专题二:RabbitMQ介绍
那可能有同学说了, 我们订单系统开辟线程去访问库存系统不就好了吗?
消息中间件之RabbitMQ专题二:RabbitMQ介绍使用线程池解决 确实可以, 但是也有他的缺点, 那么 到底怎么来完美解决这个问题呢?
消息中间件之RabbitMQ专题二:RabbitMQ介绍如果这张图能理解的话, 那么 这个消息系统, 就是我们的消息中间件。
而且如果有其他业务需要处理这些订单信息,比如统计系统、日志系统啥的,你可以直接在消息系统后面添加相应的模块来接收消息进行处理。
消息中间件之RabbitMQ专题二:RabbitMQ介绍

二、RabbitMq介绍&AMQP介绍

导语:我们刚刚介绍了什么是消息中间件, 那么RabbitMq就是对于消息中间件的一种实现,市面上还有很多很多实现, 比如RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ等等 我们这节主要讲RabbitMq。

(一)AMQP

消息中间件之RabbitMQ专题二:RabbitMQ介绍这里引用百度的一句话 再加以我的理解: AMQP 其实和Http一样 都是一种协议, 只不过 Http是针对网络传输的, 而AMQP是基于消息队列的。
模型图:
消息中间件之RabbitMQ专题二:RabbitMQ介绍

(二)AMQP 协议中的基本概念

1)Broker: 接收和分发消息的应用,我们在介绍消息中间件的时候所说的消息系统就是Message Broker。

2)Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。

3)Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。

4)Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

5)Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。
常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。

6)Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。

7)Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

(三)Exchange的类型

  1. fanout:
    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
    消息中间件之RabbitMQ专题二:RabbitMQ介绍

  2. direct :
    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
    消息中间件之RabbitMQ专题二:RabbitMQ介绍当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

  3. topic
    前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:
    (1)routing key为一个句点号"."分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    (2)binding key与routing key一样也是句点号“. ”分隔的字符串
    (3)binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
    消息中间件之RabbitMQ专题二:RabbitMQ介绍当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。

  4. headers
    这个类型的交换机很少用到,他的路由规则 与routingKey无关 而是通过判断header参数来识别的,在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
    基本上没有应用场景,因为上面的三种类型已经能应付了。

这里在对其进行简要的表格整理:
消息中间件之RabbitMQ专题二:RabbitMQ介绍

(四)队列Queue

Queue(队列)RabbitMQ的作用是存储消息,队列的特性是先进先出。上图可以清晰地看到Client A和Client B是生产者,生产者生产消息最终被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据。可以简化成表示为:
消息中间件之RabbitMQ专题二:RabbitMQ介绍
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
消息中间件之RabbitMQ专题二:RabbitMQ介绍这里的prefetchCount=1是指每次从Queue中发送一条消息来。等消费者处理完这条消息后Queue会再发送一条消息给消费者。

(五)Virtual Hosts

在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
一个broker里可以开设多个vhost。 borker简单来说就是消息队列服务器实体:
消息中间件之RabbitMQ专题二:RabbitMQ介绍

(六)基础对象

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
  Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
  Connection就是建立一个TCP连接,生产者和消费者的都是通过TCP的连接到RabbitMQ Server中的,这个后续会再程序中体现出来。
  Channel虚拟连接,建立在上面TCP连接的基础上,数据流动都是通过Channel来进行的。为什么不是直接建立在TCP的基础上进行数据流动呢?如果建立在TCP的基础上进行数据流动,建立和关闭TCP连接有代价。频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。

三、RabbitMq快速开始

因为我们这里是用java来作为客户端, 我们首先引入maven依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

(注意的是, 我这里引入的是5.x的rabbitmq客户端版本, 那么我们jdk的版本最好在8以上,反之, 这里就建议使用4.x的版本,这里仅仅讨论jdk8 其他的版本不做讨论)
首先 我们编写一个连接的工具类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    public static final String QUEUE_NAME = "testQueue";
    public static final String EXCHANGE_NAME = "exchange";
    public static Connection getConnection() throws Exception{
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置rabbitmq 服务端所在地址 我这里在本地就是本地
        connectionFactory.setHost("111.322.232.94");
        //设置端口号,连接用户名,虚拟地址等
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("my_vhost");
        return connectionFactory.newConnection();
    }
}

然后我们编写一个消费者(consumer),和一个生产者(producer):
生产者:

public class Producer {
    public static void sendByExchange(String message) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
        // 声明exchange
        channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
        //交换机和队列绑定
        channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");

        channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null,
                message.getBytes());
        System.out.println("发送的信息为:" + message);
        channel.close();
        connection.close();
    }
}

消费者:

public class Consumer {
    public static void getMessage() throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
        DefaultConsumer deliverCallback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                                                                throws IOException {
                System.out.println("消费消息:"+new String(body, "UTF-8"));
            }
        };
        channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback);
    }

}

client:

public class TestClient {
    public static void main(String[] args) throws Exception {
        Producer.sendByExchange("hello");
        Consumer.getMessage();
    }
}

这里, 我们演示绑定fanout的类型的交换机, 所以不需要routingKey 就可以路由只需要绑定即可
(可能有同学要问了, 如果没有绑定交换机怎么办呢? 没有绑定交换机的话, 消息会发给rabbitmq默认的交换机里面 默认的交换机隐式的绑定了所有的队列,默认的交换机类型是direct 路由建就是队列的名字)
基本上这样子的话就已经进行一个快速入门了, 由于我们现在做项目基本上都是用spring boot(就算没用spring boot也用spring 吧) 所以后面博客直接基于spring boot来讲解(rabbitmq的特性,实战等)