消息中间件之RabbitMQ专题二:RabbitMQ介绍
文章目录
一、消息中间件介绍
我们用java来举例子, 打个比方我们客户端发送一个下单请求给订单系统(order)订单系统发送了
一个请求给我们的库存系统告诉他需要更改库存了, 我已经下单了, 这里, 每一个请求我们都可以看作一条消息。
但是 我们客户端需要等待订单系统告诉我这条消息的处理结果(我到底有没有下单成功) 但是订单系统不需要知道库存系统这条消息的处理情况 因为无论你库存有没有改动成功, 我订单还是下了, 因为是先下完了订单(下成功了) 才去更改库存, 库存如果更改出BUG了 那是库存系统的问题, 这个BUG不会影响订单系统。如果这里你能理解的话, 那么我们就能发现 我们用户发送的这条消息(下订单), 是需要同步的(我需要知道结果), 订单发送给库存的消息,是可以异步的(我不想知道你库存到底改了没, 我只是通知你我这边成功下了一个订单)
那么如果我们还按原来的方式去实现这个需求的话, 那么结果会是这样:
那可能有同学说了, 我们订单系统开辟线程去访问库存系统不就好了吗?
使用线程池解决 确实可以, 但是也有他的缺点, 那么 到底怎么来完美解决这个问题呢?
如果这张图能理解的话, 那么 这个消息系统, 就是我们的消息中间件。
而且如果有其他业务需要处理这些订单信息,比如统计系统、日志系统啥的,你可以直接在消息系统后面添加相应的模块来接收消息进行处理。
二、RabbitMq介绍&AMQP介绍
导语:我们刚刚介绍了什么是消息中间件, 那么RabbitMq就是对于消息中间件的一种实现,市面上还有很多很多实现, 比如RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ等等 我们这节主要讲RabbitMq。
(一)AMQP
这里引用百度的一句话 再加以我的理解: AMQP 其实和Http一样 都是一种协议, 只不过 Http是针对网络传输的, 而AMQP是基于消息队列的。
模型图:
(二)AMQP 协议中的基本概念
1)Broker: 接收和分发消息的应用,我们在介绍消息中间件的时候所说的消息系统就是Message Broker。
2)Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
3)Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
4)Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
5)Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。
常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
6)Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
7)Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
(三)Exchange的类型
-
fanout:
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。 -
direct :
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。 -
topic
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:
(1)routing key为一个句点号"."分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
(2)binding key与routing key一样也是句点号“. ”分隔的字符串
(3)binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。 -
headers
这个类型的交换机很少用到,他的路由规则 与routingKey无关 而是通过判断header参数来识别的,在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
基本上没有应用场景,因为上面的三种类型已经能应付了。
这里在对其进行简要的表格整理:
(四)队列Queue
Queue(队列)RabbitMQ的作用是存储消息,队列的特性是先进先出。上图可以清晰地看到Client A和Client B是生产者,生产者生产消息最终被送到RabbitMQ的内部对象Queue中去,而消费者则是从Queue队列中取出数据。可以简化成表示为:
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
这里的prefetchCount=1是指每次从Queue中发送一条消息来。等消费者处理完这条消息后Queue会再发送一条消息给消费者。
(五)Virtual Hosts
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
一个broker里可以开设多个vhost。 borker简单来说就是消息队列服务器实体:
(六)基础对象
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Connection就是建立一个TCP连接,生产者和消费者的都是通过TCP的连接到RabbitMQ Server中的,这个后续会再程序中体现出来。
Channel虚拟连接,建立在上面TCP连接的基础上,数据流动都是通过Channel来进行的。为什么不是直接建立在TCP的基础上进行数据流动呢?如果建立在TCP的基础上进行数据流动,建立和关闭TCP连接有代价。频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。
三、RabbitMq快速开始
因为我们这里是用java来作为客户端, 我们首先引入maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.2</version>
</dependency>
(注意的是, 我这里引入的是5.x的rabbitmq客户端版本, 那么我们jdk的版本最好在8以上,反之, 这里就建议使用4.x的版本,这里仅仅讨论jdk8 其他的版本不做讨论)
首先 我们编写一个连接的工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String QUEUE_NAME = "testQueue";
public static final String EXCHANGE_NAME = "exchange";
public static Connection getConnection() throws Exception{
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置rabbitmq 服务端所在地址 我这里在本地就是本地
connectionFactory.setHost("111.322.232.94");
//设置端口号,连接用户名,虚拟地址等
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
return connectionFactory.newConnection();
}
}
然后我们编写一个消费者(consumer),和一个生产者(producer):
生产者:
public class Producer {
public static void sendByExchange(String message) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
// 声明exchange
channel.exchangeDeclare(ConnectionUtil.EXCHANGE_NAME, "fanout");
//交换机和队列绑定
channel.queueBind(ConnectionUtil.QUEUE_NAME, ConnectionUtil.EXCHANGE_NAME, "");
channel.basicPublish(ConnectionUtil.EXCHANGE_NAME, "", null,
message.getBytes());
System.out.println("发送的信息为:" + message);
channel.close();
connection.close();
}
}
消费者:
public class Consumer {
public static void getMessage() throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
DefaultConsumer deliverCallback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out.println("消费消息:"+new String(body, "UTF-8"));
}
};
channel.basicConsume(ConnectionUtil.QUEUE_NAME, deliverCallback);
}
}
client:
public class TestClient {
public static void main(String[] args) throws Exception {
Producer.sendByExchange("hello");
Consumer.getMessage();
}
}
这里, 我们演示绑定fanout的类型的交换机, 所以不需要routingKey 就可以路由只需要绑定即可
(可能有同学要问了, 如果没有绑定交换机怎么办呢? 没有绑定交换机的话, 消息会发给rabbitmq默认的交换机里面 默认的交换机隐式的绑定了所有的队列,默认的交换机类型是direct 路由建就是队列的名字)
基本上这样子的话就已经进行一个快速入门了, 由于我们现在做项目基本上都是用spring boot(就算没用spring boot也用spring 吧) 所以后面博客直接基于spring boot来讲解(rabbitmq的特性,实战等)