欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq源码

程序员文章站 2022-03-06 18:15:22
本文以rabbitmq接收端最核心的类SimpleMessageListenerContainer作为切入点,做源码解析...

本文以rabbitmq接收端最核心的类SimpleMessageListenerContainer作为切入点,做源码解析。
容器启动时,会启动SimpleMessageListenerContainer的start方法(在其父类中);
start方法中 调用this.doStart()–>调用this.initializeConsumers()->调用this.createBlockingQueueConsumer();

 protected void doStart() {
        
               int newConsumers = this.initializeConsumers();
                
               BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
               SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
               processors.add(processor);
               this.getTaskExecutor().execute(processor);
       
    }

protected void doStart() {
            。。。。
                 int newConsumers = this.initializeConsumers();//这里
            。。。。
        }
    }


 protected int initializeConsumers() {
        。。。
                    BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();//这里
        。。。
        }
    }
protected BlockingQueueConsumer createBlockingQueueConsumer() {
        String[] queues = this.getQueueNames();
        int actualPrefetchCount = this.getPrefetchCount() > this.txSize ? this.getPrefetchCount() : this.txSize;
        BlockingQueueConsumer consumer = new BlockingQueueConsumer(this.getConnectionFactory(), this.getMessagePropertiesConverter(), this.cancellationLock, this.getAcknowledgeMode(), this.isChannelTransacted(), actualPrefetchCount, this.isDefaultRequeueRejected(), this.getConsumerArguments(), this.isNoLocal(), this.isExclusive(), queues);//这里
        。。。
    }

在createBlockingQueueConsumer方法中新建了一个BlockingQueueConsumer对象,把消息队列的参数传进去,其中包括一个阻塞队列queues,用来存放消息,类型是LinkedBlockingQueue

 public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
        this.consumers = new ConcurrentHashMap();
        this.cancelled = new AtomicBoolean(false);
        this.consumerArgs = new HashMap();
        this.deliveryTags = new LinkedHashSet();
        this.missingQueues = Collections.synchronizedSet(new HashSet());
        this.retryDeclarationInterval = 60000L;
        this.failedDeclarationRetryInterval = 5000L;
        this.declarationRetries = 3;
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = transactional;
        this.prefetchCount = prefetchCount;
        this.defaultRequeueRejected = defaultRequeueRejected;
        if (consumerArgs != null && consumerArgs.size() > 0) {
            this.consumerArgs.putAll(consumerArgs);
        }

        this.noLocal = noLocal;
        this.exclusive = exclusive;
        this.queues = (String[])Arrays.copyOf(queues, queues.length);
        this.queue = new LinkedBlockingQueue(prefetchCount);
    }

回到开始的doStart方法,BlockingQueueConsumer对象被放入AsyncMessageProcessingConsumer,用线程池执行AsyncMessageProcessingConsumer对象的run方法

 protected void doStart() {
        
               int newConsumers = this.initializeConsumers();
                
               BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
               SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
               processors.add(processor);
               this.getTaskExecutor().execute(processor);
       
    }

然后调用 this.initialize()。再调用this.initialize()的this.consumer.start();
注意下面代码中的while循环,下面还会讲到

public void run() {
            if (SimpleMessageListenerContainer.this.isActive()) {
               。。。
                } else {
                    try {
                        this.initialize();//这里1
                        while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                            this.mainLoop();//这里2
                        }
                    } catch (InterruptedException var15) {
                      。。。
                    }
                    。。。
                }
            }
        }
private void initialize() throws Throwable {
            try {
                SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
                this.consumer.start();//这里
                this.start.countDown();
            } catch (QueuesNotAvailableException var3) {
              。。。
            }
        }

这里的consumer就是之前定义的BlockingQueueConsumer,再看里面的start方法。start方法调用了
this.setQosAndreateConsumers();然后调用consumeFromQueue();

public void start() throws AmqpException {
       。。。
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);
        this.passiveDeclarations();
        this.setQosAndreateConsumers();//这里
    }
 private void setQosAndreateConsumers() {
       。。。。
       this.channel.basicQos(this.prefetchCount);//设置了限流
        。。。。。
       this.consumeFromQueue(queueName);//从队列里消费
    }
private void consumeFromQueue(String queue) throws IOException {
        BlockingQueueConsumer.InternalConsumer consumer = new BlockingQueueConsumer.InternalConsumer(this.channel, queue);
        String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : "", this.noLocal, this.exclusive, this.consumerArgs, consumer);
    }

consumeFromQueue里面的this.channel.basicConsume就是原生最底层的消费消息的代码,然后再回调InternalConsumer的handleDelivery方法

 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {
            。。。
            if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
            。。。。
                } else {
            。。。。。
            }
        }

可以看到回调方法把接收到的信息Delivery压入BlockingQueueConsumer的阻塞队列queue中,然后再回到之前讲的run方法中的while循环

public void run() {
            if (SimpleMessageListenerContainer.this.isActive()) {
               。。。
                } else {
                    try {
                        this.initialize(); 
                        while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                            this.mainLoop();//这里
                        }
                    } catch (InterruptedException var15) {
                      。。。
                    }
                    。。。
                }
            }
        }

在这个while监听阻塞队列queue是否有数据,有数据就执行this.mainLoop();
再看this.mainLoop();调用SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
再调用this.doReceiveAndExecute(consumer);

 private void mainLoop() throws Exception {
            try {
                boolean receivedOk = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
                。。。。。。
        }
 private boolean receiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
            。。。
                    try {
                        return this.doReceiveAndExecute(consumer);
                    } catch (RuntimeException var5) {
            。。。。
    }

在doReceiveAndExecute方法中再从阻塞队列中取出Delivery转成message,并返回;
返回message后调用 this.executeListener(channel, message);

 private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
        Channel channel = consumer.getChannel();
        for(int i = 0; i < this.txSize; ++i) { 
            Message message = consumer.nextMessage(this.receiveTimeout);
            if (message == null) {
                break;
            } 
            try {
                this.executeListener(channel, message);//这里
            } catch (ImmediateAcknowledgeAmqpException var7) {
              。。。
    }
 public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
        。。。
        Message message = this.handle((Delivery)this.queue.poll(timeout, TimeUnit.MILLISECONDS));
        。。。
    }

this.executeListener(channel, message);再调用 this.doExecuteListener(channel, messageIn);
再调用 this.doExecuteListener(channel, messageIn);再调用 this.proxy.invokeListener(channel, message);

protected void executeListener(Channel channel, Message messageIn) {
        if (!this.isRunning()) {
            。。。
            try {
                this.doExecuteListener(channel, messageIn);
            } catch (RuntimeException var4) {
            。。。
        }
    }
private void doExecuteListener(Channel channel, Message messageIn) {
            。。。 
            this.invokeListener(channel, message); 
    }
 protected void invokeListener(Channel channel, Message message) {
        this.proxy.invokeListener(channel, message);
    }

下图可以看到 this.proxy.invokeListener 实际上是actualInvokeListener

public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware {
    。。。
    private final AbstractMessageListenerContainer.ContainerDelegate delegate = this::actualInvokeListener;
    。。。
    private AbstractMessageListenerContainer.ContainerDelegate proxy;

再看actualInvokeListener的this.doInvokeListener((MessageListener)listener, message);
再调用 listener.onMessage(message, channelToUse);也就是我们自定义回调的业务方法。

里面的 Object listener = this.getMessageListener();是可以自己代码里设置的,它决定了哪个类作为监听器来实现并执行onMessage的业务方法

protected void actualInvokeListener(Channel channel, Message message) {
          Object listener = this.getMessageListener();
          。。。
            try {
                this.doInvokeListener((MessageListener)listener, message);
            } finally {
        .。。。
            }
        }
    }
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {
         。。。
            try {
                listener.onMessage(message, channelToUse);
            } catch (Exception var11) {
         。。。
    }

本文地址:https://blog.csdn.net/qq_33788242/article/details/111639247