欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ客户端开发向导

程序员文章站 2022-12-25 17:35:21
Ⅰ、高层接口 ConnectionFactory Connection Channel Consumor Ⅱ、操作流程及API 【一】创建连接工厂ConnectionFactory ​ 我们可以为 设置各种参数来进行连接初始化 【二】创建连接Connection 【三】创建信道Channel 知识点 ......

ⅰ、高层接口

  • connectionfactory
  • connection
  • channel
  • consumor

ⅱ、操作流程及api

【一】创建连接工厂connectionfactory

connectionfactory factory = new connectionfactory();

​ 我们可以为actory设置各种参数来进行连接初始化

factory.setusername("guest");//设置服务器登录账号
factory.setpassword("guest");//设置服务器登录密码
factory.sethost("127.0.0.1");//设置服务器ip
factory.setport("15427");//设置服务器端口
factory.setvirtualhost("/");//设置虚拟主机

【二】创建连接connection

connection connection = factory.newconnection();

【三】创建信道channel

channel channel = connection.createchannel();

知识点:

​ 连接和信道的关系:连接是客户端与服务器开启的tcp连接,由于tcp连接的开启和销毁都需要消耗大量的性能,所以我们在连接的基础上使用了信道的概念,对连接进行逻辑再分,每个连接都可以创建多个信道,这些信道共用一个连接。

​ 信道就是rabbitmq中最实用的组件了,几乎所有针对交换器、队列、生产者、消费者的操作全部都在这里面执行。

【四】创建交换器exchange

exchange.declareok exchangedeclare(
    string exchange, // 声明交换器名称
    string type, // 声明交换器类型
    boolean durable, // 声明是否持久化
    boolean autodelete, // 声明是否自动删除
    boolean internal, // 声明是否内置
    map<string, object> arguments // 创建交换器的其他参数
    ) throws ioexception
exchange.declareok exchangedeclare(
    string exchange, // 声明交换器名称
    builtinexchangetype type, // 声明交换器类型
    boolean durable, // 声明是否持久化
    boolean autodelete, // 声明是否自动删除
    boolean internal, // 声明是否内置
    map<string, object> arguments // 创建交换器的其他参数
    ) throws ioexception

​ 创建交换器的方法有上面两种,这两种的区别仅在于声明交换器的类型type参数的类型不同,第一种常用。其实上面的方法每个都有很多个重载的方法,采用一些默认的参数。这里只列举参数最全的方法,来介绍其各个参数的意义。

知识点:交换器类型

rabbitmq的交换器拥有四种类型,分别为fanoutdirecttopicheadersfanout类型的交换器会将消息路由到所有与其绑定的队列中,类似于广播;direct类型是默认的交换器类型,它只会将消息路由到指定的队列中,这个队列的绑定key必须与消息的路由key完全一致;topic类型的交换器是最常使用的交换器类型,是一种模糊匹配的交换器,它会将消息路由到所有绑定key与消息路由key可匹配的队列上;至于headers类型的交换器并不常用,因为其性能较差,并不实用。

​ 这里builtinexchangetype类型是一个枚举,它里面定义了这四种 类型的枚举值:fanoutdirecttopicheaders

知识点:交换器持久化

​ 开启交换器的持久化,那么交换器在创建好之后会持久化到磁盘,一般对于生产环境中长期使用的交换器,最好开启持久化功能,用以提升rabbitmq的可用性(高可用要点之一),服务器异常宕机的情况下可以硬件恢复。

知识点:交换器自动删除

​ 交换器在不再使用的情况下是可以自动删除的,只要在创建交换器的时候设置autodelete属性为true即可开启自动删除功能,默认为false

​ 自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

​ 不再使用的交换器指的即使那些曾经绑定过队列或者交换器,现在已经没有任何绑定队列或者交换器的情况下的交换器。

知识点:内置交换器

​ 内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

​ 交换器的创建可以在代码中实现,也可以不再代码中实现。推荐在代码中实现,因为当要创建的交换器已存在于rabbitmq服务器中时,是不会再次创建的,而是直接返回创建成功。

【五】绑定交换器exchangebind

exchange.bindok exchangebind(
    string destination, // 指定目标交换器
    string source, // 指定源交换器
    string routingkey, // 指定绑定key
    map<string, object> arguments // 其他一些结构化参数
    ) throws ioexception

知识点:交换器绑定

​ 交换器一般用来被队列绑定,但其实它也可以被另一个交换器绑定,绑定其实就是将二者关联起来,绑定两个交换器,就是将两个交换器关联起来,这里面有一个主动方,一个被动方,主动方是要执行绑定的交换器(目标交换器),被动方是被绑定的交换器(源交换器),绑定的过程其实就是将目标交换器的绑定key送给源交换器,这时其实可以将目标交换器看成是一个队列,将自己绑定到源交换器上,依靠的也是绑定key,不过这里的绑定key是没明确的路由key,而非topic类似的模糊key。源交换器会将目标交换器当做一个队列进行看待,将接收到的路由key与目标交换器绑定key完全匹配消息路由到这个目标交换器中。

​ 这个方法同样有重载的方法,来默认化一些参数。

【六】创建队列queue

queue.declareok queuedeclare(
    string queue, // 声明队列名称
    boolean durable, // 声明是否持久化
    boolean exclusive, // 声明是否排他
    boolean autodelete, // 声明是否自动删除
    map<string, object> arguments // 设置队列的其他一些参数
    ) throws ioexception

知识点:队列持久化

​ 创建队列的时候也可以设置是否支持持久化到磁盘,生产环境中我们一般都会将其设置为持久化,这也保证了服务器宕机的情况下重启后,队列可以恢复,以保证数据安全(高可用要点之二)

知识点:排他队列

​ 创建队列的时候有一个排他参数exclusive,排他队列只对首次创建该队列的信道所在的连接可见,并且该连接内的所有信道都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。

​ 这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。

​ 非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

知识点:队列自动删除

​ 队列的自动删除类似于交换器的自动删除,都必须是曾经使用过的队列才能执行自动删除,如果是创建之后根本就没有用过的队列是不会触发自动删除的。

​ 这个方法同样有重载的方法,来默认化一些参数。

【七】绑定队列queuebind

queue.bindok queuebind(
    string queue, // 指定队列名称
    string exchange, // 指定交换器名称
    string routingkey, // 声明绑定key
    map<string,object> arguments // 定义绑定的一些参数
    ) throws ioexception

​ 这里routingkey的值需要由指定的交换器的类型累决定使用什么形式的key,如果是fanout类型的交换器,会忽略routingkey的值,如果是direct类型的交换器,使用明确的绑定key,如果是topic类型的交换器,则使用带有匹配符*#的模糊key。

【八】发布消息basicpublish

void basicpublish(
    string exchange, // 指定目的交换器
    string routingkey, // 声明消息的路由key
    boolean mandatory, // 是否为无法路由的消息进行返回处理
    boolean immediate, // 是否对路由到无消费者队列的消息进行返回处理
    basicproperties props, // 消息的一些基本属性设置
    byte[] body // 消息体
    ) throws ioexception

知识点:mandatory

​ 消息发布的时候设置消息的mandatory属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为true表示将消息返回到生产者,否则直接丢弃消息。

​ 上述无法路由的情况可以是在无法找到匹配消息路由key的队列,导致消息无法路由到队列中

​ 当mandatory=true时,出现无法路由消息被返回,那么返回的消息又回到生产者,怎么接收呢,这就要靠返回监听器returnlistener

知识点:returnlistener

​ 我们在设置消息的mandatory=true的时候,就需要对返回的消息进行处理了,我们可以在信道中添加返回监听器来监听返回的消息,一旦监听到这些消息,我们就着手对其进行再处理,一般我们会进行重发。

channel.addreturnlistener(new returnlistener(){
    @override
    publish void handlereturn(
        int repaycode, // 回应码
        string repaytext, // 回应内容
        string exchange, // 来源交换器
        string routingkey, // 消息的路由key
        amqp.basicproperties basicproperties, // 消息的其他属性
        byte[] body // 消息体
    ) throws ioexception {
        // do some thing to handle the return message
    }
});

【九】消费消息basicconsumer、basicget

1、推送消息basicconsume

string basicconsume(
    string queue, // 指定队列名称
    boolean autoack, // 是否自动回应
    string consumertag, // 声明消费者标签,区分不同的消费者
    boolean nolocal, // 是否不能将消息推送给同一个连接内的消费者
    boolean exclusive, // 是否排他
    map<string, object> arguments, // 设置消费者的其他参数 
    consumer callback // 设置消费者回调函数,处理消息
    ) throws ioexception

知识点:自动回应

​ 推送消息时设置消息自动回应,那么消息在推送给消费者后就会自动回应服务器,服务器收到回回应就会删除这条消息,这样无法保证消息能被正确处理,因为消费者虽然收到了消息,但它可能无法处理、或者拒绝等,这时服务器中消息却已被删除,导致消息丢失。(高可用要点之三)

​ 一般情况下我们将其设置为false,然后在回调函数中消息处理完毕之后手动进行回应。

void basicack(
    long deliverytag, // 指定推送标识编号
    boolean multiple // 是否批量回应
    ) throws ioexception

注意:推送标识

deliverytag表示的是推送编号,是一个单调递增正整数编号,它用来标识channel中一次消息推送,与消息绑在一起。

​ 当一个消费者向服务器注册basicconsume之后,服务器就会使用basic.delivery给消费者推送消息,deliverytag就用来标识这样一个推送。但要注意,它只在当前信道channel内有效。

​ 消费者受到消息的时候同时会收到这个推送标识,当要回应、拒绝消息的时候就需要带着这个标识。

注意:批量回应

multiple参数为boolean值,用来表示是否进行批量回应,当值为true时表示进行批量回应,它会对推送编号小于给定编号的所有消息进行回应。false表示只回应指定编号的消息。

知识点:推送范围

nolocal这个参数表示是否可以将消息推送给与消息发布者同一连接内的消费者,如果nolocal=false表示可以推送,nolocal=true则表示不能推送,那么就只能推送到其他连接中的消费者。

知识点:排他

知识点:回调函数

​ 回调函数主要用于定义针对消息的处理逻辑,一般采用如下方式定义:

consumer consumer = new defaultconsumer(channel){
    @override
    public void handledelivery(
        string consumertag, // 消费者标签,可用于消费者验证发送目标的正确性
        envelope envelope, // amqp操作参数,可以获取其中参数进行消息处理
        amqp.basicproperties properties, // 消息的基本属性
        byte[] body // 消息体,用于存放推送的消息
    ){
        // do something to handle deleveried message
    }
}

知识点:amqp操作参数

enveloperabbitmq定义的用于封装amqp操作参数的类,里面主要封装了四个参数:

private final long _deliverytag;// 推送编号
private final boolean _redeliver; // 是否重发标签
private final string _exchange; // 当前操作对应的交换器
private final string _routingkey; // 关联的路由key

​ 其中_redeliber=true表示这是一个失败的basicack之后的消息的重新推送。这里面的这四个参数我们都可以在消费者客户端处理消息的时候使用。

​ 其实basicconsume是消费者订阅方法,目的在于订阅某个队列,是消息推送的前提,真正的消息推送并没有在channel类中实现,因为推送操作是由rabbitmq服务器自动发起的,不需要生产者或者消费者手动触发,所以也就没用提供接口。

​ 真正的消息推送是由rabbitmq服务器的basic.delivery方法实现的。

2、拉取消息basicget

getresponse basicget(
    string queue, // 指定队列名称
    boolean autoack // 是否自动回应
    ) throws ioexception;

​ 拉取消息就是指消费者主动从服务器获取消息,每次只能获取一条消息。推送消息是被动的获取。这里的autoack和之前推送的设置一样,一般设置为false,表示不主动回应,采用手动回应(高可用要点之三)

【十】拒绝消息basicreject、basicnack

1、拒绝一个消息basicreject

void basicreject(
    long deliverytag, // 消息推送编号
    boolean requeue // 是否重新入队
    ) throws ioexception;

2、拒绝多个消息basicnack

void basicnack(
    long deliverytag, // 消息推送编号
    boolean multiple, // 是否批量拒绝
    boolean requeue // 是否重新入队
    )throws ioexception;

知识点:重新入队

​ 当一个消息推送到某一个消费者,这个消费者无法处理时它进行了拒绝操作,如果指定requeue值为true,表示被拒绝的消息还可以重新发送到队列,可被继续推送到其他消费者,如果设置为false,那么这条消息会被立刻从队列删除。

​ 如果将这两个方法的requeue参数设置为false,那么可以启用死信队列功能,因为这样的话,返回的消息会变成死信,如果服务器中设置有死信交换器dlx,并且已关联到该队列,那么这个消息就会被发送到死信交换器,从而被路由到绑定的死信队列中得以保留,我们可以通过排查这些消息来进行服务器优化。

知识点:批量拒绝

​ 消息的单个拒绝与批量拒绝使用的不是同一个方法,批量拒绝的方法basicnack中有个决定是否批量拒绝的参数mutiple,如果设置为false,表示不执行批量拒绝,那么它的效果等同于basicreject方法,如果设置为true,表示拒绝小于指定推送编号的所有未被当前消费者消费的消息。

【十一】取消消费者basiccancel

void basiccancel(string consumertag // 指定要取消的消费者标签
    ) throws ioexception;

【十二】关闭连接

channel.close();// 关闭信道
connection.close();// 关闭tcp连接