Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析
在Dubbo服务断线重启后, 服务端和客户端都在报错[DUBBO] disconected from
这个是dubbo框架的一个bug. . .
服务端报错如下:
客户端报错如下:
解决方案: 重启服务调用端即可
1. 在dubbo创建客户端连接服务端的时候, 会同时创建一个心跳定时任务, 该任务会每隔两秒发送一次心跳, 但是如果服务端宕机, 那么将会导致心跳超时, 那么客户端将进行重连
(1) 创建定时任务: HeaderExchangeClient#startHeatbeatTimer
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
/**
* 创建一个心跳定时线程任务, 向服务端发送心跳
*/
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
//创建一个单实例集合用于存储当前的客户端连接
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
(2) 定时任务逻辑: HeartBeatTask#run
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
//当前时间戳 减去 最后操作的时间戳大于心跳时间, 则发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
/**
* 发送心跳
*/
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
//当前时间戳 减去 最后操作的时间戳不仅大于心跳时间, 还大于了心跳超时时间, 那么可以任务通道已经被关闭, 开始尝试重连
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
/**
* 当连接超时, 执行此方法进行重连
*/
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
(3) 超时重连: AbstractClient#reconnect
public void reconnect() throws RemotingException {
/**
* 断开连接
*/
disconnect();
/**
* 尝试重新连接
*/
connect();
}
2. 在重连方法中, 会先断开连接, 然后重新去连接服务端
(1) 断开连接: AbstractClient#disconnect
在断开连接时会执行 destroyConnectStatusCheckCommand() 方法; 该方法的主要逻辑是取消connect()方法执行时创建的重连任务reconnectExecutorFuture
public void disconnect() {
connectLock.lock();
try {
/**
* 关闭重连任务
*/
destroyConnectStatusCheckCommand();
try {
/**
* 关闭通道
*/
Channel channel = getChannel();
if (channel != null) {
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doDisConnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
private synchronized void destroyConnectStatusCheckCommand() {
try {
if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
/**
* 关闭重连任务, 定时任务取消,不再进行重连
* bug: 满足上面的前提是reconnectExecutorFuture.cancel(true)执行时, 重连的定时任务线程并没有执行到connect()处
* 否则, 由于zookeeper只会通知一次取消定时任务, 但是在connect()方法中又重新创建了一个定时任务, 这将会导致定时任务将不会再被取消, 客户端将一直进行重连
*/
reconnectExecutorFuture.cancel(true);
//清除线程的一些资源信息
reconnectExecutorService.purge();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
(2) 连接服务端: AbstractClient#connect
在连接时会执行两个方法: initConnectStatusCheckCommand() 和 doConnect(), initConnectStatusCheckCommand()方法的主要逻辑是创建一个定时任务线程, 每隔两秒调用一次connect()方法尝试重连服务端; doConnect()方法的主要逻辑才是去连接服务端
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
/**
* 1. 初始化重连任务
*/
initConnectStatusCheckCommand();
/**
* 2.创建连接
*/
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
AbstractClient#initConnectStatusCheckCommand
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
/**
* 创建一个尝试连接任务线程, 不断的尝试连接服务端
*/
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
if (!isConnected()) {
/**
* 在定时任务中执行connect()方法,
* 去重新初始化reconnectExecutorFuture, 以及连接重连
*/
connect();
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
/**
* 每隔2秒 尝试一次重连
*/
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
NettyClient#doConnect
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
//关闭旧的通道
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
/**
* 关闭旧的通道
*/
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
/**
* 如果Netty客户端为关闭状态, 则关闭新创建的channel
*/
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
3. 上面说道每隔两秒将会执行一次connect进行重连, 一直持续到zookeeper的会话超时时间到; 那么zookeeper将触发节点变更事件, 通知所有监听者此服务已停用, dubbo客户端接收到此时间后会调用destroyConnectStatusCheckCommand() 方法, 关闭重连的定时任务reconnectExecutorFuture, 不再重连此服务;
但是
问题就出在这里, 由于重连的定时任务与执行destroyConnectStatusCheckCommand() 方法关闭重连定时任务是两个线程来执行的;
假设重连的定时任务是线程A; 执行destroyConnectStatusCheckCommand() 方法关闭重连定时任务是线程B
当线程A进入定时重连任务到connect()处, 但是connect()方法还未执行, 此时线程B执行到了reconnectExecutorFuture.cancel(true)处, 如果接下来线程B执行完reconnectExecutorFuture.cancel(true)将reconnectExecutorFuture定时任务取消, 线程A开始执行connect()方法; 由于在connect()方法中会调用initConnectStatusCheckCommand()方法去创建定时重连任务, 在此方法中有一个判断语句:
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled() ))
在接收节点变更事件之前由于不满足reconnectExecutorFuture == null 和 reconnectExecutorFuture.isCancelled() 这两个条件, 所以定时任务实际上只执行了doconnect()方法进行重连; 但是, 由于在线程A执行connect()方法前, 线程B将reconnectExecutorFuture任务给取消了, 所以此时满足条件reconnectExecutorFuture.isCancelled(), 所以线程A将重新创建一个定时任务;
那么, 由于zookeeper的节点变更事件只会通知一次, 之后destroyConnectStatusCheckCommand() 方法将不会再被调用, 客户端的定时重连任务将一直执行下去. . .
当服务重新启动后, 由于定时重连任务一直存在, 每执行一次重连任务, doConnect()方法都将创建一个新的channel, 那么客户端此时可以重新成功连接到服务端; 但是由于之前zookeeper发送节点变更事件时, 会去关闭已经失去连接的NettyClient (因为如果服务端重启后将创建一个新的NettyClient去连接服务器端), 并将此客户端关闭标识close设置为true, 那么也就导致了每次创建新的channel后, 由于客户端关闭标识为true, 因此会去关闭刚刚创建的channel (在doConnect()方法) , 客户端将channel关闭后也就导致了服务器将不能在连接到该channel, 也就会报disconected from xxx 的错误
但是服务端是可以接受客户端的调用服务的, 因为在服务端重启后, 将会通知dubbo创建一个新的NettyClient; 而旧的NettyClient一直在不断的建新的channel, 一旦创建成功由于此客户端是关闭状态, 就立刻关闭, 也就导致了服务端无法再连接到旧Netty客户端的channel而出现报错
服务端日志信息:
客户端日志信息:
所以, 只需要重启客户端, 将旧的Netty客户端定时重连任务取消即可
利用Debug进行复现
1. 首先在AbstractClient#destroyConnectStatusCheckCommand()方法中的 reconnectExecutorFuture.cancel(true)处打断点
2. 关闭dubbo服务端, 等待zookeeper会话超时, 执行到reconnectExecutorFuture.cancel(true)处
3. 在AbstractClient#initConnectStatusCheckCommand()方法中的connect()处打断点
4. 释放步骤2中的断点, 程序将执行到connect()处, 并且在reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); 处打断点
按照正常逻辑, 此时不应该再进入到定时重连任务中, 因为已经取消了此定时任务; 但是由于我们在线程A开始执行定时任务, 但是还未执行到connect()方法时, 让线程B执行了取消定时任务, 那么在connect()方法中将触发再次创建定时任务
5. 再次释放断点, 程序将执行到创建定时任务处
6. 取消并释放所有断点, 此时定时重连任务将进入死循环
7. 重新启动服务端程序, 客户端可以成功连接到服务端, 但是, 定时重连任务任然每隔2秒执行一次
上一篇: 分布式Session解决方案
下一篇: 分布式Session解决方案