欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

浅谈spring-boot-rabbitmq动态管理的方法

程序员文章站 2023-12-17 20:59:28
使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停...

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  1. 配置属性类:rabbitproperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  2. 自动配置类:rabbitautoconfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  3. rabbitmq监听的配置是rabbitannotationdrivenconfiguration,是通过rabbitautoconfiguration引入的
@configuration
@conditionalonclass({ rabbittemplate.class, channel.class })
@enableconfigurationproperties(rabbitproperties.class)
@import(rabbitannotationdrivenconfiguration.class)
public class rabbitautoconfiguration {
  ...
}

rabbitannotationdrivenconfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化

通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂simplerabbitlistenercontainerfactory

@bean
@conditionalonmissingbean(name = "rabbitlistenercontainerfactory")
public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(
 simplerabbitlistenercontainerfactoryconfigurer configurer,
 connectionfactory connectionfactory) {
  simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
  configurer.configure(factory, connectionfactory);
  return factory;
}

既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializecontainer(simplemessagelistenercontainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到rabbitlistenerendpointregistry.createlistenercontainer(rabbitlistenerendpoint endpoint,rabbitlistenercontainerfactory<?> factory)方法中有创建监听容器和初始化的代码

/**
 * create and start a new {@link messagelistenercontainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link messagelistenercontainer}.
 * @param factory the {@link rabbitlistenercontainerfactory} to use.
 * @return the {@link messagelistenercontainer}.
 */
protected messagelistenercontainer createlistenercontainer(rabbitlistenerendpoint endpoint,
 rabbitlistenercontainerfactory<?> factory) {
  messagelistenercontainer listenercontainer = factory.createlistenercontainer(endpoint);
  if (listenercontainer instanceof initializingbean) {
   try {
      ((initializingbean) listenercontainer).afterpropertiesset();
   }
   catch (exception ex) {
      throw new beaninitializationexception("failed to initialize message listener container", ex);
   }
  }
  int containerphase = listenercontainer.getphase();
  if (containerphase < integer.max_value) { // a custom phase value
   if (this.phase < integer.max_value && this.phase != containerphase) {
      throw new illegalstateexception("encountered phase mismatch between container factory definitions: " +
       this.phase + " vs " + containerphase);
   }
   this.phase = listenercontainer.getphase();
  }  
  return listenercontainer;
}

继续找调用这个方法的地方,找到rabbitlistenerendpointregistrar.afterpropertiesset()方法之后,发现调用的地方很多了

浅谈spring-boot-rabbitmq动态管理的方法

看看afterpropertiesset方法,是initializingbean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到rabbitlistenerendpointregistrar是在哪里创建的实例。原来是在rabbitlistenerannotationbeanpostprocessor中的私有属性,而rabbitlistenerannotationbeanpostprocessor是在rabbitbootstrapconfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在rabbitlistenerendpointregistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下

/**
 * @return the managed {@link messagelistenercontainer} instance(s).
 */
public collection<messagelistenercontainer> getlistenercontainers() {
  return collections.unmodifiablecollection(this.listenercontainers.values());
}
 
@override
public void start() {
  for (messagelistenercontainer listenercontainer : getlistenercontainers()) {
   startifnecessary(listenercontainer);
  }
}

/**
 * start the specified {@link messagelistenercontainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see messagelistenercontainer#isautostartup()
 */
private void startifnecessary(messagelistenercontainer listenercontainer) {
  if (this.contextrefreshed || listenercontainer.isautostartup()) {
   listenercontainer.start();
  }
}

@override
public void stop() {
  for (messagelistenercontainer listenercontainer : getlistenercontainers()) {
   listenercontainer.stop();
  }
}

写个controller,注入rabbitlistenerendpointregistry,使用start()和stop()对监听进行启用停用的操作,并且rabbitlistenerendpointregistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:

import java.util.set;
import javax.annotation.resource;
import org.springframework.amqp.rabbit.listener.rabbitlistenerendpointregistry;
import org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
import com.itopener.framework.resultmap;
/**
 * created by fuwei.deng on 2017年7月24日.
 */
@restcontroller
@requestmapping("rabbitmq/listener")
public class rabbitmqcontroller {

  @resource
  private rabbitlistenerendpointregistry rabbitlistenerendpointregistry;
  
  @requestmapping("stop")
  public resultmap stop(){
   rabbitlistenerendpointregistry.stop();
   return resultmap.buildsuccess();
  }
  
  @requestmapping("start")
  public resultmap start(){
   rabbitlistenerendpointregistry.start();
   return resultmap.buildsuccess();
  }
  
  @requestmapping("setup")
  public resultmap setup(int consumer, int maxconsumer){
   set<string> containerids = rabbitlistenerendpointregistry.getlistenercontainerids();
   simplemessagelistenercontainer container = null;
   for(string id : containerids){
   container = (simplemessagelistenercontainer) rabbitlistenerendpointregistry.getlistenercontainer(id);
   if(container != null){
    container.setconcurrentconsumers(consumer);
    container.setmaxconcurrentconsumers(maxconsumer);
   }
   }
   return resultmap.buildsuccess();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: