rabbitmq源码
本文以rabbitmq接收端最核心的类SimpleMessageListenerContainer作为切入点,做源码解析。
容器启动时,会启动SimpleMessageListenerContainer的start方法(在其父类中);
start方法中 调用this.doStart()–>调用this.initializeConsumers()->调用this.createBlockingQueueConsumer();
protected void doStart() {
int newConsumers = this.initializeConsumers();
BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.getTaskExecutor().execute(processor);
}
protected void doStart() {
。。。。
int newConsumers = this.initializeConsumers();//这里
。。。。
}
}
protected int initializeConsumers() {
。。。
BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();//这里
。。。
}
}
protected BlockingQueueConsumer createBlockingQueueConsumer() {
String[] queues = this.getQueueNames();
int actualPrefetchCount = this.getPrefetchCount() > this.txSize ? this.getPrefetchCount() : this.txSize;
BlockingQueueConsumer consumer = new BlockingQueueConsumer(this.getConnectionFactory(), this.getMessagePropertiesConverter(), this.cancellationLock, this.getAcknowledgeMode(), this.isChannelTransacted(), actualPrefetchCount, this.isDefaultRequeueRejected(), this.getConsumerArguments(), this.isNoLocal(), this.isExclusive(), queues);//这里
。。。
}
在createBlockingQueueConsumer方法中新建了一个BlockingQueueConsumer对象,把消息队列的参数传进去,其中包括一个阻塞队列queues,用来存放消息,类型是LinkedBlockingQueue
public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
this.consumers = new ConcurrentHashMap();
this.cancelled = new AtomicBoolean(false);
this.consumerArgs = new HashMap();
this.deliveryTags = new LinkedHashSet();
this.missingQueues = Collections.synchronizedSet(new HashSet());
this.retryDeclarationInterval = 60000L;
this.failedDeclarationRetryInterval = 5000L;
this.declarationRetries = 3;
this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
this.defaultRequeueRejected = defaultRequeueRejected;
if (consumerArgs != null && consumerArgs.size() > 0) {
this.consumerArgs.putAll(consumerArgs);
}
this.noLocal = noLocal;
this.exclusive = exclusive;
this.queues = (String[])Arrays.copyOf(queues, queues.length);
this.queue = new LinkedBlockingQueue(prefetchCount);
}
回到开始的doStart方法,BlockingQueueConsumer对象被放入AsyncMessageProcessingConsumer,用线程池执行AsyncMessageProcessingConsumer对象的run方法
protected void doStart() {
int newConsumers = this.initializeConsumers();
BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.getTaskExecutor().execute(processor);
}
然后调用 this.initialize()。再调用this.initialize()的this.consumer.start();
注意下面代码中的while循环,下面还会讲到
public void run() {
if (SimpleMessageListenerContainer.this.isActive()) {
。。。
} else {
try {
this.initialize();//这里1
while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
this.mainLoop();//这里2
}
} catch (InterruptedException var15) {
。。。
}
。。。
}
}
}
private void initialize() throws Throwable {
try {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
this.consumer.start();//这里
this.start.countDown();
} catch (QueuesNotAvailableException var3) {
。。。
}
}
这里的consumer就是之前定义的BlockingQueueConsumer,再看里面的start方法。start方法调用了
this.setQosAndreateConsumers();然后调用consumeFromQueue();
public void start() throws AmqpException {
。。。
this.deliveryTags.clear();
this.activeObjectCounter.add(this);
this.passiveDeclarations();
this.setQosAndreateConsumers();//这里
}
private void setQosAndreateConsumers() {
。。。。
this.channel.basicQos(this.prefetchCount);//设置了限流
。。。。。
this.consumeFromQueue(queueName);//从队列里消费
}
private void consumeFromQueue(String queue) throws IOException {
BlockingQueueConsumer.InternalConsumer consumer = new BlockingQueueConsumer.InternalConsumer(this.channel, queue);
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : "", this.noLocal, this.exclusive, this.consumerArgs, consumer);
}
consumeFromQueue里面的this.channel.basicConsume就是原生最底层的消费消息的代码,然后再回调InternalConsumer的handleDelivery方法
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {
。。。
if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
。。。。
} else {
。。。。。
}
}
可以看到回调方法把接收到的信息Delivery压入BlockingQueueConsumer的阻塞队列queue中,然后再回到之前讲的run方法中的while循环
public void run() {
if (SimpleMessageListenerContainer.this.isActive()) {
。。。
} else {
try {
this.initialize();
while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
this.mainLoop();//这里
}
} catch (InterruptedException var15) {
。。。
}
。。。
}
}
}
在这个while监听阻塞队列queue是否有数据,有数据就执行this.mainLoop();
再看this.mainLoop();调用SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
再调用this.doReceiveAndExecute(consumer);
private void mainLoop() throws Exception {
try {
boolean receivedOk = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
。。。。。。
}
private boolean receiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
。。。
try {
return this.doReceiveAndExecute(consumer);
} catch (RuntimeException var5) {
。。。。
}
在doReceiveAndExecute方法中再从阻塞队列中取出Delivery转成message,并返回;
返回message后调用 this.executeListener(channel, message);
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
Channel channel = consumer.getChannel();
for(int i = 0; i < this.txSize; ++i) {
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
this.executeListener(channel, message);//这里
} catch (ImmediateAcknowledgeAmqpException var7) {
。。。
}
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
。。。
Message message = this.handle((Delivery)this.queue.poll(timeout, TimeUnit.MILLISECONDS));
。。。
}
this.executeListener(channel, message);再调用 this.doExecuteListener(channel, messageIn);
再调用 this.doExecuteListener(channel, messageIn);再调用 this.proxy.invokeListener(channel, message);
protected void executeListener(Channel channel, Message messageIn) {
if (!this.isRunning()) {
。。。
try {
this.doExecuteListener(channel, messageIn);
} catch (RuntimeException var4) {
。。。
}
}
private void doExecuteListener(Channel channel, Message messageIn) {
。。。
this.invokeListener(channel, message);
}
protected void invokeListener(Channel channel, Message message) {
this.proxy.invokeListener(channel, message);
}
下图可以看到 this.proxy.invokeListener 实际上是actualInvokeListener
public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware {
。。。
private final AbstractMessageListenerContainer.ContainerDelegate delegate = this::actualInvokeListener;
。。。
private AbstractMessageListenerContainer.ContainerDelegate proxy;
再看actualInvokeListener的this.doInvokeListener((MessageListener)listener, message);
再调用 listener.onMessage(message, channelToUse);也就是我们自定义回调的业务方法。
里面的 Object listener = this.getMessageListener();是可以自己代码里设置的,它决定了哪个类作为监听器来实现并执行onMessage的业务方法
protected void actualInvokeListener(Channel channel, Message message) {
Object listener = this.getMessageListener();
。。。
try {
this.doInvokeListener((MessageListener)listener, message);
} finally {
.。。。
}
}
}
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {
。。。
try {
listener.onMessage(message, channelToUse);
} catch (Exception var11) {
。。。
}
本文地址:https://blog.csdn.net/qq_33788242/article/details/111639247
上一篇: 什么耳机音质最好性价比高? 2021音质顶尖的蓝牙耳机推荐榜
下一篇: Hadoop相关概念