rabbitMq 学习笔记(一)
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题
实现高性能,高可用,可伸缩和最终一致性架构。
rabbitmq 是采用 erlang 语言实现 amqp (advanced message queuing protocol,高级消息 队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
rabbitmq 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, rabbitmq 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,rabbitmq 模型更像是一种交换机模型
producer:生产者,就是投递消息的一方。
生产者创建消息,然后发布到 rabbitmq 中。消息一般可以包含 2 个部分:消息体和标签 clabel)。消息体也可以称之为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 json 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息 , 比如一个交换器的名称和一个路由键。 生产者把消息交由 rabbitmq, rabbitmq 之后会根据标签把消息发送给感兴趣的消费者 cconsumer)。
consumer: 消费者, 就是接收消息的一方。
消费者连接到 rabbitmq 服务器,并订阅到队列上。 当消费者消费一条消息时, 只是消费 消息的消息体 cpayload)。 在消息路由的过程中 , 消息的标签会丢弃, 存入到队列中的消息只 有消息体,消费者也只会消费到消息体, 也就不知道消息的生产者是谁,当然消费者也不需要 知道。
broker: 消息中间件的服务节点。
对于 rabbitmq 来说, 一个 rabbitmq broker 可 以简单地看作一个 rabbitmq 服务节点 , 或者 rabbitmq 服务实例 。 大多数情况下也可以将一个 rabbitmq broker 看作一台 rabbitmq 服务器。
交换器(exchange)
rabbitmq中,生产者会将消息先发送到交换器,然后由交换器根据路由规则将消息转发到队列中,如果路由不到,或许会返回给生产者,或许直接丢弃。
交换器有四种类型:fanout,direct,topic,header
binding: 绑定。 rabbitmq 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (bindingkey),这样 rabbitmq 就知道如何正确地将消息路由到队列了。
routingkey:路由键。生产者将消息发送给交换器时,一般会指定一个routingkey,用来指定消息的路由规则, routingkey需要与交换器类型和绑定键 (bindingkey) 联合使用才能最终生效。
队列(queue)
队列是rabbitmq的内部对象,用于存储消息。rabbitmq的消息只能存储在队列中。消费者可以从队列中获取消息并消费。如多个消费者订阅同一个队列,这时队列内的消息会被平均分摊(轮询)给多个消费者处理。
交换器类型:
fanout :将所有消息转发至交换器绑定的所有队列中。
direct :它会把消息路由到那些 bindingkey 和 routingkey完全匹配的队列中。
topic :它与 direct 类型的交换器相似,也是将消息路由到 bindingkey 和 routingkey 相匹配的队 列中,但这里的匹配规则有些不同,它约定:
- routingkey 为一个点号". "分隔的字符串(被点号" "分隔开的每一段独立的字符 串称为一个单词 ),如“com.rabbitmq.client”;
- bindingkey 和 routingkey 一样也是点号". "分隔的字符串;
- bindingkey 中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"*"用于匹配一个单词,"#"用于匹配多规格单词(可以是零个)。
header :该类型的交换器性能很差,而且也不实用,基本上不会看到它的存在。
生产者代码:
nuget:添加rabbitmq.client;
1 iconnectionfactory confactory = new connectionfactory//创建连接工厂对象 2 { 3 hostname = "*.*.*.*",//ip地址 4 port = 5672,//端口号 5 username = "yan",//用户账号 6 password = "yan"//用户密码 7 };
1 using (iconnection con = confactory.createconnection()) 2 { 3 using (imodel channel = con.createmodel()) 4 { 5 channel.exchangedeclare("exchangename", "direct", true, false, null); //声明交换器,dureable:是否持久化,autodelete:是否自动删除 6 channel.queuedeclare("queuename", true, false, false, null); //声明队列 dureable:是否持久化,exclusive:是否排他 autodelete:是否自动删除 7 channel.queuebind("queuename", "exchangename", "routingkey", null); 8 var properties = channel.createbasicproperties(); 9 properties.deliverymode = 2; //消息持久化 10 channel.basicpublish("exchangename", "routingkey", properties, encoding.utf8.getbytes("helloword")); //发布消息 11 } 12 }
声明交换器参数:
durable: 设置是否持久化。 durable 设置为 true 表示持久化, 反之是非持久化。持 久化可以将交换器存盘,在服务器重启 的时候不会丢失相关信息。
autodelete: 设置是否自动删除。 autodelete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个交换器绑
定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: "当与此交换器 连接的客户端都断开时, rabbitmq 会自动删除本交换器"。
声明队列参数:
durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候可以保证不丢失相关信息。
exclusive: 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接(connection) 可见的,同一个连接的不同信道 (channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
autodelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
消费者代码:
1 iconnectionfactory confactory = new connectionfactory//创建连接工厂对象 2 { 3 hostname = "*.*.*.*",//ip地址 4 port = 5672,//端口号 5 username = "yan",//用户账号 6 password = "yan"//用户密码 7 }; 8 using (iconnection con = confactory.createconnection()) 9 { 10 using (imodel channel = con.createmodel()) 11 { 12 channel.exchangedeclare("exchangename", "direct", true, false, null); //声明交换器,dureable:是否持久化,autodelete:是否自动删除 13 channel.queuedeclare("queuename", true, false, false, null); //声明队列 dureable:是否持久化,exclusive:是否排他 autodelete:是否自动删除 14 channel.queuebind("queuename", "exchangename", "routingkey", null); 15 eventingbasicconsumer consumer = new eventingbasicconsumer(channel); //声明事件基本消费者 16 consumer.received += (ch,ea) => { 17 var message = encoding.utf8.getstring(ea.body); 18 console.writeline($"收到消息: {message}"); //消费消息 19 channel.basicack(ea.deliverytag, false); //确认该消息已被消费 20 }; 21 channel.basicconsume("queuename", false, consumer); //启动消费者,并设置为手动应答消息 22 } 23 }
channel.basicreject(ea.deliverytag,false); //拒绝消息 requeue true:消息重新存入队列。false:立即会把消息从队列中移除。
为了保证消息从队列可靠地达到消费者, rabbitmq 提供了消息确认机制( message acknowledgement)。 消费者在订阅队列时,可以指定 autoack 参数,当 autoack 等于 false 时, rabbitmq 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoack 等于 true 时, rabbitmq 会自动把发送出去的 消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置 autoack 参数为 false,消费者就有足够的时间处理消息 (任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 rabbitmq 会一直 等待持有消息直到消费者显式调用 basic.ack 命令为止。
当 autoack 参数置为 false,对于 rabbitmq 服务端而言,队列中的消息分成了两个部分: 一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果 rabbitmq 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 rabbitmq 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
rabbitmq 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 rabbitmq 允许消费者 消费一条消息的时间可以很久很久。