浅谈spring-boot-rabbitmq动态管理的方法
使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置
一. 关于rabbitmq监听的配置
- 配置属性类:rabbitproperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
- 自动配置类:rabbitautoconfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
- rabbitmq监听的配置是rabbitannotationdrivenconfiguration,是通过rabbitautoconfiguration引入的
@configuration @conditionalonclass({ rabbittemplate.class, channel.class }) @enableconfigurationproperties(rabbitproperties.class) @import(rabbitannotationdrivenconfiguration.class) public class rabbitautoconfiguration { ... }
rabbitannotationdrivenconfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化
通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂simplerabbitlistenercontainerfactory
@bean @conditionalonmissingbean(name = "rabbitlistenercontainerfactory") public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory( simplerabbitlistenercontainerfactoryconfigurer configurer, connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); configurer.configure(factory, connectionfactory); return factory; }
既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializecontainer(simplemessagelistenercontainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到rabbitlistenerendpointregistry.createlistenercontainer(rabbitlistenerendpoint endpoint,rabbitlistenercontainerfactory<?> factory)方法中有创建监听容器和初始化的代码
/** * create and start a new {@link messagelistenercontainer} using the specified factory. * @param endpoint the endpoint to create a {@link messagelistenercontainer}. * @param factory the {@link rabbitlistenercontainerfactory} to use. * @return the {@link messagelistenercontainer}. */ protected messagelistenercontainer createlistenercontainer(rabbitlistenerendpoint endpoint, rabbitlistenercontainerfactory<?> factory) { messagelistenercontainer listenercontainer = factory.createlistenercontainer(endpoint); if (listenercontainer instanceof initializingbean) { try { ((initializingbean) listenercontainer).afterpropertiesset(); } catch (exception ex) { throw new beaninitializationexception("failed to initialize message listener container", ex); } } int containerphase = listenercontainer.getphase(); if (containerphase < integer.max_value) { // a custom phase value if (this.phase < integer.max_value && this.phase != containerphase) { throw new illegalstateexception("encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerphase); } this.phase = listenercontainer.getphase(); } return listenercontainer; }
继续找调用这个方法的地方,找到rabbitlistenerendpointregistrar.afterpropertiesset()方法之后,发现调用的地方很多了
看看afterpropertiesset方法,是initializingbean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到rabbitlistenerendpointregistrar是在哪里创建的实例。原来是在rabbitlistenerannotationbeanpostprocessor中的私有属性,而rabbitlistenerannotationbeanpostprocessor是在rabbitbootstrapconfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了
二. 动态管理rabbitmq监听
回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在rabbitlistenerendpointregistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下
/** * @return the managed {@link messagelistenercontainer} instance(s). */ public collection<messagelistenercontainer> getlistenercontainers() { return collections.unmodifiablecollection(this.listenercontainers.values()); } @override public void start() { for (messagelistenercontainer listenercontainer : getlistenercontainers()) { startifnecessary(listenercontainer); } } /** * start the specified {@link messagelistenercontainer} if it should be started * on startup or when start is called explicitly after startup. * @see messagelistenercontainer#isautostartup() */ private void startifnecessary(messagelistenercontainer listenercontainer) { if (this.contextrefreshed || listenercontainer.isautostartup()) { listenercontainer.start(); } } @override public void stop() { for (messagelistenercontainer listenercontainer : getlistenercontainers()) { listenercontainer.stop(); } }
写个controller,注入rabbitlistenerendpointregistry,使用start()和stop()对监听进行启用停用的操作,并且rabbitlistenerendpointregistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:
import java.util.set; import javax.annotation.resource; import org.springframework.amqp.rabbit.listener.rabbitlistenerendpointregistry; import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.restcontroller; import com.itopener.framework.resultmap; /** * created by fuwei.deng on 2017年7月24日. */ @restcontroller @requestmapping("rabbitmq/listener") public class rabbitmqcontroller { @resource private rabbitlistenerendpointregistry rabbitlistenerendpointregistry; @requestmapping("stop") public resultmap stop(){ rabbitlistenerendpointregistry.stop(); return resultmap.buildsuccess(); } @requestmapping("start") public resultmap start(){ rabbitlistenerendpointregistry.start(); return resultmap.buildsuccess(); } @requestmapping("setup") public resultmap setup(int consumer, int maxconsumer){ set<string> containerids = rabbitlistenerendpointregistry.getlistenercontainerids(); simplemessagelistenercontainer container = null; for(string id : containerids){ container = (simplemessagelistenercontainer) rabbitlistenerendpointregistry.getlistenercontainer(id); if(container != null){ container.setconcurrentconsumers(consumer); container.setmaxconcurrentconsumers(maxconsumer); } } return resultmap.buildsuccess(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。