Java SpringBoot集成RabbitMq实战和总结
目录
在公司里一直在用rabbitmq,由于api已经封装的很简单,关于rabbitmq本身还有封装的实现没有了解,最近在看rabbitmq实战这本书,结合网上的一些例子和spring文档,实现了rabbitmq和spring的集成,对着自己平时的疑惑做了一些总结。
关于rabbitmq基础不在详细讲解(本文不适合rabbitmq零基础),rabbitmq实战的1,2,4三章讲的非常不错。因为书中讲的都是python和php的例子,所以自己结合springboot文档和做了一些总结,写了一些springboot的。
交换器、队列、绑定的声明
springamqp项目对rabbitmq做了很好的封装,可以很方便的手动声明队列,交换器,绑定。如下:
/** * 队列 * @return */ @bean @qualifier(rabbitmqconstant.programmatically_queue) queue queue() { return new queue(rabbitmqconstant.programmatically_queue, false, false, true); } /** * 交换器 * @return */ @bean @qualifier(rabbitmqconstant.programmatically_exchange) topicexchange exchange() { return new topicexchange(rabbitmqconstant.programmatically_exchange, false, true); } /** * 声明绑定关系 * @return */ @bean binding binding(@qualifier(rabbitmqconstant.programmatically_exchange) topicexchange exchange, @qualifier(rabbitmqconstant.programmatically_queue) queue queue) { return bindingbuilder.bind(queue).to(exchange).with(rabbitmqconstant.programmatically_key); } /** * 声明简单的消费者,接收到的都是原始的{@link message} * * @param connectionfactory * * @return */ @bean simplemessagelistenercontainer simplecontainer(connectionfactory connectionfactory) { simplemessagelistenercontainer container = new simplemessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.setmessagelistener(message -> log.info("simple receiver,message:{}", message)); container.setqueuenames(rabbitmqconstant.programmatically_queue); return container; }
消费者和生产者都可以声明,交换器这种一般经常创建,可以手动创建。需要注意对于没有路由到队列的消息会被丢弃。
如果是spring的话还需要声明连接:
@bean connectionfactory connectionfactory(@value("${spring.rabbitmq.port}") int port, @value("${spring.rabbitmq.host}") string host, @value("${spring.rabbitmq.username}") string username, @value("${spring.rabbitmq.password}") string password, @value("${spring.rabbitmq.publisher-confirms}") boolean isconfirm, @value("${spring.rabbitmq.virtual-host}") string vhost) { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(host); connectionfactory.setvirtualhost(vhost); connectionfactory.setport(port); connectionfactory.setusername(username); connectionfactory.setpassword(password); connectionfactory.setpublisherconfirms(isconfirm); }
在配置类使用@enablerabbit
的情况下,也可以基于注解进行声明,在bean的方法上加上@rabbitlistener
,如下:
/** * 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitmq中已经存在的不一致的话 * 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除 * {@link rabbitlistener}是可以重复的。并且声明队列绑定的key也可以有多个. * * @param headers * @param msg */ @rabbitlistener( bindings = @queuebinding( exchange = @exchange(value = rabbitmqconstant.default_exchange, type = exchangetypes.topic, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), value = @queue(value = rabbitmqconstant.default_queue, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), key = dkey ), //手动指明消费者的监听容器,默认spring为自动生成一个simplemessagelistenercontainer containerfactory = "container", //指定消费者的线程数量,一个线程会打开一个channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些 concurrency = "5-10" ) public void process(@headers map<string, object> headers, @payload exampleevent msg) { log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); } /** * {@link queue#ignoredeclarationexceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可用的 * * @param headers * @param msg */ @rabbitlistener(queuestodeclare = @queue(value = rabbitmqconstant.default_queue, ignoredeclarationexceptions = rabbitmqconstant.true_constant)) public void process2(@headers map<string, object> headers, @payload exampleevent msg) { log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); }
关于消息序列化
这个比较简单,默认采用了java序列化,我们一般使用的json格式,所以配置了jackson,根据自己的情况来,直接贴代码:
@bean messageconverter messageconverter() { return new jackson2jsonmessageconverter(); }
同一个队列多消费类型
如果是同一个队列多个消费类型那么就需要针对每种类型提供一个消费方法,否则找不到匹配的方法会报错,如下:
@component @slf4j @rabbitlistener( bindings = @queuebinding( exchange = @exchange(value = rabbitmqconstant.multipart_handle_exchange, type = exchangetypes.topic, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), value = @queue(value = rabbitmqconstant.multipart_handle_queue, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), key = rabbitmqconstant.multipart_handle_key ) ) @profile(springconstant.multipart_profile) public class multipartconsumer { /** * rabbithandler用于有多个方法时但是参数类型不能一样,否则会报错 * * @param msg */ @rabbithandler public void process(exampleevent msg) { log.info("param:{msg = [" + msg + "]} info:"); } @rabbithandler public void processmessage2(exampleevent2 msg) { log.info("param:{msg2 = [" + msg + "]} info:"); } /** * 下面的多个消费者,消费的类型不一样没事,不会被调用,但是如果缺了相应消息的处理handler则会报错 * * @param msg */ @rabbithandler public void processmessage3(exampleevent3 msg) { log.info("param:{msg3 = [" + msg + "]} info:"); } }
注解将消息和消息头注入消费者方法
在上面也看到了@payload
等注解用于注入消息。这些注解有:
- @header 注入消息头的单个属性
- @payload 注入消息体到一个javabean中
- @headers 注入所有消息头到一个map中
这里有一点主要注意,如果是com.rabbitmq.client.channel
,org.springframework.amqp.core.message
和org.springframework.messaging.message
这些类型,可以不加注解,直接可以注入。
如果不是这些类型,那么不加注解的参数将会被当做消息体。不能多于一个消息体。如下方法exampleevent就是默认的消息体:
public void process2(@headers map<string, object> headers,exampleevent msg);
关于消费者确认
rabbitmq消费者可以选择手动和自动确认两种模式,如果是自动,消息已到达队列,rabbitmq对无脑的将消息抛给消费者,一旦发送成功,他会认为消费者已经成功接收,在rabbitmq内部就把消息给删除了。另外一种就是手动模式,手动模式需要消费者对每条消息进行确认(也可以批量确认),rabbitmq发送完消息之后,会进入到一个待确认(unacked)的队列,如下图红框部分:
如果消费者发送了ack,rabbitmq将会把这条消息从待确认中删除。如果是nack并且指明不要重新入队列,那么该消息也会删除。但是如果是nack且指明了重新入队列那么这条消息将会入队列,然后重新发送给消费者,被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利用这一点,如果每次都重新回队列会导致同一消息不停的被发送和拒绝。消费者在确认消息之前和rabbitmq失去了连接那么消息也会被重新投递。所以手动确认模式很大程度上提高可靠性。自动模式的消息可以提高吞吐量。
spring手动确认消息需要将simplerabbitlistenercontainerfactory
设置为手动模式:
simplerabbitlistenercontainerfactory.setacknowledgemode(acknowledgemode.manual);
手动确认的消费者代码如下:
@sneakythrows @rabbitlistener(bindings = @queuebinding( exchange = @exchange(value = rabbitmqconstant.confirm_exchange, type = exchangetypes.topic, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), value = @queue(value = rabbitmqconstant.confirm_queue, durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant), key = rabbitmqconstant.confirm_key), containerfactory = "containerwithconfirm") public void process(exampleevent msg, channel channel, @header(name = "amqp_deliverytag") long deliverytag, @header("amqp_redelivered") boolean redelivered, @headers map<string, string> head) { try { log.info("consumerwithconfirm receive message:{},header:{}", msg, head); channel.basicack(deliverytag, false); } catch (exception e) { log.error("consume confirm error!", e); //这一步千万不要忘记,不会会导致消息未确认,消息到达连接的qos之后便不能再接收新消息 //一般重试肯定的有次数,这里简单的根据是否已经重发过来来决定重发。第二个参数表示是否重新分发 channel.basicreject(deliverytag, !redelivered); //这个方法我知道的是比上面多一个批量确认的参数 // channel.basicnack(deliverytag, false,!redelivered); } }
关于spring的acknowledgemode需要说明,他一共有三种模式:none,manual,auto,默认是auto模式。这比rabbitmq原生多了一种。这一点很容易混淆,这里的none对应其实就是rabbitmq的自动确认,manual是手动。而auto其实也是手动模式,只不过是spring的一层封装,他根据你方法执行的结果自动帮你发送ack和nack。如果方法未抛出异常,则发送ack。如果方法抛出异常,并且不是amqprejectanddontrequeueexception
则发送nack,并且重新入队列。如果抛出异常时amqprejectanddontrequeueexception
则发送nack不会重新入队列。我有一个例子专门测试none,见cunsumerwithnonetest
。
还有一点需要注意的是消费者有一个参数prefetch,它表示的是一个
考虑这样一个场景:你发送了一个消息给rabbitmq,rabbitmq接收了但是存入磁盘之前服务器就挂了,消息也就丢了。为了保证消息的投递有两种解决方案,最保险的就是事务(和db的事务没有太大的可比性), 但是因为事务会极大的降低性能,会导致生产者和rabbitmq之间产生同步(等待确认),这也违背了我们使用rabbitmq的初衷。所以一般很少采用,这就引入第二种方案:发送者确认模式。 发送者确认模式是指发送方发送的消息都带有一个id,rabbitmq会将消息持久化到磁盘之后通知生产者消息已经成功投递,如果因为rabbitmq内部的错误会发送ack。注意这里的发送者和rabbitmq之间是异步的,所以相较于事务机制性能大大提高。其实很多操作都是不能保证绝对的百分之一百的成功,哪怕采用了事务也是如此,可靠性和性能很多时候需要做一些取舍,想很多互联网公司吹嘘的5个9,6个9也是一样的道理。如果不是重要的消息性能计数器,完全可以不采用发送者确认模式。 这里有一点我当时纠结了很久,我一直以为发送者确认模式的回调是客户端的ack触发的,这里是大大的误解!发送者确认模式和消费者没有一点关系,消费者确认也和发送者没有一点关系,两者都是在和rabbitmq打交道,发送者不会管消费者有没有收到,只要消息到了rabbitmq并且已经持久化便会通知生产者,这个ack是rabbitmq本身发出的,和消费者无关 发送者确认模式需要将channel设置成confirm模式,这样才会收到通知。spring中需要将连接设置成confirm模式: 然后在rabbittemplate中设置确认的回调,correlationdata是消息的id,如下(只是简单打印下): 发送时需要给出唯一的标识( 还有一个参数需要说下:mandatory。这个参数为true表示如果发送消息到了rabbitmq,没有对应该消息的队列。那么会将消息返回给生产者,此时仍然会发送ack确认消息。 设置rabbittemplate的回调如下: 另外如果是rabbitmq内部的错误,不会调用该方法。所以如果消息特别重要,对于未确认的消息,生产者应该在内存用保存着,在确认时候根据返回的id删除该消息。如果是nack可以将该消息记录专门的日志或者转发到相应处理的逻辑进行后续补偿。rabbittemplate也可以配置retrytemplate,发送失败时直接进行重试,具体还是要结合业务。 最后关于发送者确认需要提的是spring,因为spring默认的bean是单例的,所以针对不同的确认方案(其实有不同的确认方案是比较合理的,很多消息不需要确认,有些需要确认)需要配置不同的bean. 上面也提到了如果消费者抛出异常时默认的处理逻辑。另外我们还可以给消费者配置retrytemplate,如果是采用springboot的话,可以在application.yml配置中配置如下: 如上,如果消费者失败的话会进行重试,默认是3次。注意这里的重试机制rabbitmq是为感知的!到达3次之后会抛出异常调用 我比较推荐前两种。这里说下死信队列,死信队列其实就是普通的队列,只不过一个队列声明的时候指定的属性,会将死信转发到该交换器中。声明死信队列方法如下: 其实也就只是在声明的时候多加了两个参数x-dead-letter-exchange和x-dead-letter-routing-key。这里一开始踩了一个坑,因为 : 我们用到的就是第一种。 本来生产者和消费者是没有耦合的,但是可以通过一些属性产生耦合。在早期版本中,如果一个生产者想要收到消费者的回复,实现方案是生产者在消息头中加入reply-to属性也就是队列(一般是私有,排他,用完即销毁)的名字,然后在这个队列上进行监听,消费者将回复发送到这个队列中。rabbitmq3.3之后有了改进,就是不用没有都去创建一个临时队列,这样很耗费性能,可以采用drect-to模式,省去了每次创建队列的性能损耗,但是还是要创建一次队列。现在spring默认的就是这个模式。rabbittemplate中有一系列的 消费者的方法通过返回值直接返回消息(下面的方法是有返回值的): 这里的提一下最后一个注解 上面的代码就是会将消息直接发送到默认交换器,并且以queue.reply.s作为路由键。@sendto的格式为exchange/routingkey用法如下: 这里还需要提一下,因为默认所有的队列都会绑定到空交换器,并且以队列名字作为routekey, 所以sendto里面可以直接填写队列名字机会发送到相应的队列.如日志队列。因为rpc模式不常用,专业的东西做专业的事,就像我们一般不用redis来做消息队列一样(虽然他也可以实现),一般公司都有特定的技术栈,肯定有更合适的rpc通信框架。当然如果要跨语言的集成这个方案也是一种不错的方案,可以继续考虑采用异步发送 rabbitmq底层的消费模型有两种push和pull。我在网上查阅资料的时候发现有很多教程采用了pull这种模式。rabbitmq实战和 这里讲的是消费者的,生产者没什么好讲的。先看消息流转图: 图中椭圆表示线程,矩形是队列。消息到达amqp的连接线程,然后分发到client线程池,随后分发到监听器。注意除了监听器的线程,其他都是在 client线程池见: listener的线程设置如下: 注意:simpleasynctaskexecutor每次执行一个任务都会新建一个线程,对于生命周期很短的任务不要使用这个线程池(如client线程池的任务), 这里的消费者线程生命周期直到simplemessagelistenercontainer停止所以没有适合这个场景 修改过之后的线程如下: 消息投递过程如下: 上面的是默认的消费者监听器。springamqp 2.0引入了一个新的监听器实现 这时的消息流转图如下: 还有一些关于监听器的例子和springboot配置我放在了源码里,这里不再讲述。关于发送者确认模式
connectionfactory.setpublisherconfirms(isconfirm);
// 设置rabbittemplate每次发送消息都会回调这个方法
rabbittemplate.setconfirmcallback((correlationdata, ack, cause)
-> log.info("confirm callback id:{},ack:{},cause:{}", correlationdata, ack, cause));
correlationdata
): rabbittemplatewithconfirm.convertandsend(rabbitmqconstant.default_exchange, rabbitmqconstant.default_key,
new exampleevent(i, "confirm message id:" + i),
new correlationdata(integer.tostring(i)));
rabbittemplate.setreturncallback((message, replycode, replytext, exchange, routingkey)
-> log.info("return callback message:{},code:{},text:{}", message, replycode, replytext));
消费消息、死信队列和retrytemplate
spring:
rabbitmq:
listener:
retry:
# 重试次数
max-attempts: 3
# 开启重试机制
enabled: true
messagerecoverer
。默认的实现为rejectanddontrequeuerecoverer,也就是打印异常,发送nack,不会重新入队列。
我想既然配置了重试机制消息肯定是很重要的,消息肯定不能丢,仅仅是日志可能会因为日志滚动丢失而且信息不明显,所以我们要讲消息保存下来。可以有如下这些方案:
@rabbitlistener(
bindings = @queuebinding(
exchange = @exchange(value = rabbitmqconstant.default_exchange, type = exchangetypes.topic,
durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant),
value = @queue(value = rabbitmqconstant.default_queue, durable = rabbitmqconstant.false_constant,
autodelete = rabbitmqconstant.true_constant, arguments = {
@argument(name = rabbitmqconstant.dead_letter_exchange, value = rabbitmqconstant.dead_exchange),
@argument(name = rabbitmqconstant.dead_letter_key, value = rabbitmqconstant.dead_key)
}),
key = rabbitmqconstant.default_key
))
@queuebinding
注解中也有arguments属性,我一开始将参数声明到@queuebinding
中,导致一直没绑定成功。如果绑定成功可以在控制台看到queue的featrues有dlx(死信队列交换器)和dlk(死信队列绑定)。如下:
rpc模式的消息(不常用)
sendandreceivexx
方法。默认等待5秒,超时返回null。用
法和不带返回的差不多。 public string receive(@headers map<string, object> headers, @payload exampleevent msg) {
log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:");
return reply;
}
@sendto
,用在消费方法上,指明返回值的目的地,默认不用的话就是返回给发送者,可以通过这个注解改变这种行为。如下代码: @rabbitlistener(
bindings = @queuebinding(
exchange = @exchange(value = rabbitmqconstant.reply_exchange, type = exchangetypes.topic,
durable = rabbitmqconstant.false_constant, autodelete = rabbitmqconstant.true_constant),
value = @queue(value = rabbitmqconstant.reply_queue, durable = rabbitmqconstant.false_constant,
autodelete = rabbitmqconstant.true_constant),
key = rabbitmqconstant.reply_key
)
)
@sendto("queue.reply.s")
public exampleevent log(exampleevent event) {
log.info("log receive message:o{}", event);
return new exampleevent(1, "log result");
}
asyncrabbittemplate
来降低延迟等优化方案!关于消费模型
rabbitmq之consumer消费模式(push & pull)都指出这种模式性能低,会影响消息的吞吐量,增加不必要的io,所以除非有特殊的业务需求,不要采用这种方案。spring的封装就是采用了push的方案。关于rabbitmq客户端的线程模型
com.rabbitmq.client.impl.amqconnection
中创建的线程,我们对线程池做一些修改。连接线程名字不能修改就是amqp connection打头。心跳线程可以设置setconnectionthreadfactory来设置名字。如下: connectionfactory.setconnectionthreadfactory(new threadfactory() {
public final atomicinteger id = new atomicinteger();
@override
public thread newthread(runnable r) {
return new thread(r, messageformat.format("amqp-heart-{0}", id.getandincrement()));
}
});
com.rabbitmq.client.impl.consumerworkservice
构造方法。executors.newfixedthreadpool(default_num_threads, threadfactory)。 final executorservice executorservice = executors.newfixedthreadpool(5, new threadfactory() {
public final atomicinteger id = new atomicinteger();
@override
public thread newthread(runnable r) {
return new thread(r, messageformat.format("amqp-client-{0}", id.getandincrement()));
}
});
simplerabbitlistenercontainerfactory.settaskexecutor(new simpleasynctaskexecutor"amqp-consumer-"));
public void startmainloop() {
mainloop loop = new mainloop();
final string name = "amqp connection " + gethostaddress() + ":" + getport();
mainloopthread = environment.newthread(threadfactory, loop, name);
mainloopthread.start();
}
分发逻辑详见com.rabbitmq.client.impl.channeln#processasync->com.rabbitmq.client.impl.consumerdispatcher#handledelivery->投递到线程池.directmessagelistenercontainer
。这个实现最大的变化在于消费者的处理逻辑不是在自己的线程池中执行而是直接在client线程池中处理,这样最明显的是省去了线程的上下文切换的开销,而且设计上也变得更为直观。所以如果采用这个监听器需要覆盖默认的线程池加大connection的线程池。采用这个监听器只需要设置@rabbitlistener
的containerfactory属性。声明方法如下: @bean
directrabbitlistenercontainerfactory directrabbitlistenercontainerfactory(connectionfactory connectionfactory) {
final directrabbitlistenercontainerfactory directrabbitlistenercontainerfactory = new directrabbitlistenercontainerfactory();
directrabbitlistenercontainerfactory.setconsumersperqueue(runtime.getruntime().availableprocessors());
directrabbitlistenercontainerfactory.setconnectionfactory(connectionfactory);
directrabbitlistenercontainerfactory.setmessageconverter(new jackson2jsonmessageconverter());
directrabbitlistenercontainerfactory.setconsumersperqueue(10);
return directrabbitlistenercontainerfactory;
}
上一篇: Linux基本文件类型