欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring-data-redis消息订阅RedisMessageListenerContainer源码解读 博客分类: 架构相关 redis源码

程序员文章站 2024-02-17 08:14:04
...

项目中一直在使用redis的subscribe功能,偶然会发生订阅断开的问题,一直无法定位,在此之下只能阅读源码定位问题

首先从spring.xml入手观察配置

 

    <bean id="twaListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
		<property name="delegate" ref="twocAlertListener"/>
	</bean>
	<bean id="twocAlertListener" class="com.fnic.wifi.server.redis.TwocAlertListener" />
	
	<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnFactory"/>
        <property name="messageListeners">
            <map>
                <entry key-ref="twaListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="c_sta_login" />
                    </bean>
                </entry>
            </map>
        </property>
    </bean>  

RedisMessageListenerContainer类中有个Map专门放MessageListenerAdapter类型的监听,同时这个类也是整个监听的核心类,总共有1000行

 

RedisMessageListenerContainer

其实现了InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle几个接口

InitializingBean:主要实现afterPropertiesSet方法,来定义spring设置完properties后进行的处理,在spring init这个bean时候会被调用

DisposableBean:实现destroy方法,在spring销毁bean时会调用

BeanNameAware:实现setBeanName方法来为bean进行取名,在RedisMessageListenerContainer中该name被用于内部线程的线程名

SmartLifecycle:spring的bean生命周期类,spring会调用start,stop等操作来完成RedisMessageListenerContainer类的启动

 

启动顺序

1.spring先完成对于bean属性的set,其中包含listener map的set操作

2.调用afterPropertiesSet方法

 

	public void afterPropertiesSet() {
		if (taskExecutor == null) {
			manageExecutor = true;
			taskExecutor = createDefaultTaskExecutor();
		}

		if (subscriptionExecutor == null) {
			subscriptionExecutor = taskExecutor;
		}

		initialized = true;
	}

 此方法构造了一个线程池来跑监听线程。

 

3.spring调用start方法来开启这个bean

 

	public void start() {
		if (!running) {
			running = true;
			// wait for the subscription to start before returning
			// technically speaking we can only be notified right before the subscription starts
			synchronized (monitor) {
				lazyListen();
				if (listening) {
					try {
						// wait up to 5 seconds for Subscription thread
						monitor.wait(initWait);
					} catch (InterruptedException e) {
						// stop waiting
					}
				}
			}

			if (logger.isDebugEnabled()) {
				logger.debug("Started RedisMessageListenerContainer");
			}
		}
	}

 4.最重要的一步 lazyListen();方法的调用,下面是其源码

 

 

	private void lazyListen() {
		boolean debug = logger.isDebugEnabled();
		boolean started = false;

		if (isRunning()) {
			if (!listening) {
				synchronized (monitor) {
					if (!listening) {
						if (channelMapping.size() > 0 || patternMapping.size() > 0) {
							subscriptionExecutor.execute(subscriptionTask);
							listening = true;
							started = true;
						}
					}
				}
				if (debug) {
					if (started) {
						logger.debug("Started listening for Redis messages");
					} else {
						logger.debug("Postpone listening for Redis messages until actual listeners are added");
					}
				}
			}
		}
	}

 构造一个SubscriptionTask,并且提交给第二步afterPropertiesSet方法中创建的线程池来执行,SubscriptionTask是一个RedisMessageListenerContainer中的重要的内部类

 

 

 

 

SubscriptionTask

其run方法如下,在spring容器启动时刻,最终会在eventuallyPerformSubscription方法处阻塞,执行底层jedis的监听,调用的也是jedis的subscribe方法

 

		public void run() {
			synchronized (localMonitor) {
				subscriptionTaskRunning = true;
			}
			try {
				connection = connectionFactory.getConnection();
				if (connection.isSubscribed()) {
					throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
				}

				boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);

				// NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
				if (!asyncConnection) {
					synchronized (monitor) {
						monitor.notify();
					}
				}

				SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();

				if (asyncConnection) {
					SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());

					synchronized (monitor) {
						monitor.notify();
					}
				}
			} catch (Throwable t) {
				handleSubscriptionException(t);
			} finally {
				// this block is executed once the subscription thread has ended, this may or may not mean
				// the connection has been unsubscribed, depending on driver
				synchronized (localMonitor) {
					subscriptionTaskRunning = false;
					localMonitor.notify();
				}
			}
		}

 当有链接异常发生时,会进入catch代码块的handleSubscriptionException(t);方法,其实现如下

 

 

	protected void handleSubscriptionException(Throwable ex) {
		listening = false;
		subscriptionTask.closeConnection();
		if (ex instanceof RedisConnectionFailureException) {
			if (isRunning()) {
				logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
				sleepBeforeRecoveryAttempt();
				lazyListen();
			}
		} else {
			logger.error("SubscriptionTask aborted with exception:", ex);
		}
	}

 RedisMessageListenerContainer在jedis的subscribe抛异常时重新调用了lazyListen();试图重新加载监听线程

 

 

 

 

理论上来说当链接断开时只要底层jedis的subscribe处报异常时,spring-data-redis的RedisMessageListenerContainer的内部线程都应该处在异常->重连->异常->重连的循环之中。

 

 

 

 

相关标签: redis 源码