欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

程序员文章站 2022-06-21 17:07:18
...

在Dubbo服务断线重启后, 服务端和客户端都在报错[DUBBO] disconected from

这个是dubbo框架的一个bug. . .

服务端报错如下:

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

客户端报错如下:

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析


解决方案: 重启服务调用端即可


1. 在dubbo创建客户端连接服务端的时候, 会同时创建一个心跳定时任务, 该任务会每隔两秒发送一次心跳, 但是如果服务端宕机, 那么将会导致心跳超时, 那么客户端将进行重连

(1) 创建定时任务: HeaderExchangeClient#startHeatbeatTimer

private void startHeatbeatTimer() {
    stopHeartbeatTimer();
    if (heartbeat > 0) {
        /**
         * 创建一个心跳定时线程任务, 向服务端发送心跳
         */
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    public Collection<Channel> getChannels() {
                        //创建一个单实例集合用于存储当前的客户端连接
                        return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                    }
                }, heartbeat, heartbeatTimeout),
                heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    }
}

(2) 定时任务逻辑: HeartBeatTask#run

public void run() {
    try {
        long now = System.currentTimeMillis();
        for (Channel channel : channelProvider.getChannels()) {
            if (channel.isClosed()) {
                continue;
            }
            try {
                Long lastRead = (Long) channel.getAttribute(
                        HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                Long lastWrite = (Long) channel.getAttribute(
                        HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                //当前时间戳 减去 最后操作的时间戳大于心跳时间, 则发送心跳
                if ((lastRead != null && now - lastRead > heartbeat)
                        || (lastWrite != null && now - lastWrite > heartbeat)) {
                    Request req = new Request();
                    req.setVersion("2.0.0");
                    req.setTwoWay(true);
                    req.setEvent(Request.HEARTBEAT_EVENT);
                    /**
                     * 发送心跳
                     */
                    channel.send(req);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                    }
                }
                //当前时间戳 减去 最后操作的时间戳不仅大于心跳时间, 还大于了心跳超时时间, 那么可以任务通道已经被关闭, 开始尝试重连
                if (lastRead != null && now - lastRead > heartbeatTimeout) {
                    logger.warn("Close channel " + channel
                            + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                    if (channel instanceof Client) {
                        try {
                            /**
                             * 当连接超时, 执行此方法进行重连
                             */
                            ((Client) channel).reconnect();
                        } catch (Exception e) {
                            //do nothing
                        }
                    } else {
                        channel.close();
                    }
                }
            } catch (Throwable t) {
                logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
            }
        }
    } catch (Throwable t) {
        logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
    }
}

(3) 超时重连: AbstractClient#reconnect

public void reconnect() throws RemotingException {
    /**
     * 断开连接
     */
    disconnect();
    /**
     * 尝试重新连接
     */
    connect();
}

 

2. 在重连方法中, 会先断开连接, 然后重新去连接服务端

(1) 断开连接: AbstractClient#disconnect

在断开连接时会执行 destroyConnectStatusCheckCommand() 方法; 该方法的主要逻辑是取消connect()方法执行时创建的重连任务reconnectExecutorFuture

public void disconnect() {
    connectLock.lock();
    try {
        /**
         * 关闭重连任务
         */
        destroyConnectStatusCheckCommand();
        try {
            /**
             * 关闭通道
             */
            Channel channel = getChannel();
            if (channel != null) {
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            doDisConnect();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    } finally {
        connectLock.unlock();
    }
}

private synchronized void destroyConnectStatusCheckCommand() {
    try {
        if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
            /**
             * 关闭重连任务, 定时任务取消,不再进行重连
             * bug: 满足上面的前提是reconnectExecutorFuture.cancel(true)执行时, 重连的定时任务线程并没有执行到connect()处
             * 否则, 由于zookeeper只会通知一次取消定时任务, 但是在connect()方法中又重新创建了一个定时任务, 这将会导致定时任务将不会再被取消, 客户端将一直进行重连
             */
            reconnectExecutorFuture.cancel(true);
            //清除线程的一些资源信息
            reconnectExecutorService.purge();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
}

(2) 连接服务端: AbstractClient#connect

在连接时会执行两个方法: initConnectStatusCheckCommand() 和 doConnect(), initConnectStatusCheckCommand()方法的主要逻辑是创建一个定时任务线程, 每隔两秒调用一次connect()方法尝试重连服务端;  doConnect()方法的主要逻辑才是去连接服务端

protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) {
            return;
        }
        /**
         * 1. 初始化重连任务
         */
        initConnectStatusCheckCommand();
        /**
         * 2.创建连接
         */
        doConnect();
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        reconnect_count.set(0);
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        connectLock.unlock();
    }
}

AbstractClient#initConnectStatusCheckCommand

 private synchronized void initConnectStatusCheckCommand() {
        //reconnect=false to close reconnect
        int reconnect = getReconnectParam(getUrl());
        if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
            /**
             * 创建一个尝试连接任务线程, 不断的尝试连接服务端
             */
            Runnable connectStatusCheckCommand = new Runnable() {
                public void run() {
                    try {
                        if (!isConnected()) {
                            /**
                             * 在定时任务中执行connect()方法,
                             * 去重新初始化reconnectExecutorFuture, 以及连接重连
                             */
                            connect();
                        } else {
                            lastConnectedTime = System.currentTimeMillis();
                        }
                    } catch (Throwable t) {
                        String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                        // wait registry sync provider list
                        if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                            if (!reconnect_error_log_flag.get()) {
                                reconnect_error_log_flag.set(true);
                                logger.error(errorMsg, t);
                                return;
                            }
                        }
                        if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                            logger.warn(errorMsg, t);
                        }
                    }
                }
            };
            /**
             * 每隔2秒 尝试一次重连
             */
            reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
        }
    }

NettyClient#doConnect

protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);

        if (ret && future.isSuccess()) {
            Channel newChannel = future.channel();
            try {
                //关闭旧的通道
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        /**
                         * 关闭旧的通道
                         */
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                /**
                 * 如果Netty客户端为关闭状态, 则关闭新创建的channel
                 */
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.cause() != null) {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
        } else {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        if (!isConnected()) {
            //future.cancel(true);
        }
    }
}

3. 上面说道每隔两秒将会执行一次connect进行重连, 一直持续到zookeeper的会话超时时间到;  那么zookeeper将触发节点变更事件, 通知所有监听者此服务已停用, dubbo客户端接收到此时间后会调用destroyConnectStatusCheckCommand() 方法, 关闭重连的定时任务reconnectExecutorFuture, 不再重连此服务; 

但是

问题就出在这里, 由于重连的定时任务与执行destroyConnectStatusCheckCommand() 方法关闭重连定时任务是两个线程来执行的; 

假设重连的定时任务是线程A; 执行destroyConnectStatusCheckCommand() 方法关闭重连定时任务是线程B

当线程A进入定时重连任务到connect()处, 但是connect()方法还未执行, 此时线程B执行到了reconnectExecutorFuture.cancel(true)处, 如果接下来线程B执行完reconnectExecutorFuture.cancel(true)将reconnectExecutorFuture定时任务取消, 线程A开始执行connect()方法;  由于在connect()方法中会调用initConnectStatusCheckCommand()方法去创建定时重连任务, 在此方法中有一个判断语句:

if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled() ))

接收节点变更事件之前由于不满足reconnectExecutorFuture == null 和 reconnectExecutorFuture.isCancelled() 这两个条件, 所以定时任务实际上只执行了doconnect()方法进行重连; 但是, 由于在线程A执行connect()方法前, 线程B将reconnectExecutorFuture任务给取消了, 所以此时满足条件reconnectExecutorFuture.isCancelled(), 所以线程A将重新创建一个定时任务;  

那么, 由于zookeeper的节点变更事件只会通知一次, 之后destroyConnectStatusCheckCommand() 方法将不会再被调用, 客户端的定时重连任务将一直执行下去. . .

当服务重新启动后, 由于定时重连任务一直存在, 每执行一次重连任务, doConnect()方法都将创建一个新的channel, 那么客户端此时可以重新成功连接到服务端;  但是由于之前zookeeper发送节点变更事件时, 会去关闭已经失去连接的NettyClient (因为如果服务端重启后将创建一个新的NettyClient去连接服务器端), 并将此客户端关闭标识close设置为true, 那么也就导致了每次创建新的channel后, 由于客户端关闭标识为true, 因此会去关闭刚刚创建的channel (在doConnect()方法) , 客户端将channel关闭后也就导致了服务器将不能在连接到该channel, 也就会报disconected from xxx 的错误

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

但是服务端是可以接受客户端的调用服务的, 因为在服务端重启后, 将会通知dubbo创建一个新的NettyClient; 而旧的NettyClient一直在不断的建新的channel, 一旦创建成功由于此客户端是关闭状态, 就立刻关闭, 也就导致了服务端无法再连接到旧Netty客户端的channel而出现报错

服务端日志信息: 

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

 客户端日志信息:

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

 

所以, 只需要重启客户端, 将旧的Netty客户端定时重连任务取消即可


利用Debug进行复现

1. 首先在AbstractClient#destroyConnectStatusCheckCommand()方法中的 reconnectExecutorFuture.cancel(true)处打断点

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

2. 关闭dubbo服务端, 等待zookeeper会话超时, 执行到reconnectExecutorFuture.cancel(true)处

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

3. 在AbstractClient#initConnectStatusCheckCommand()方法中的connect()处打断点

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

4. 释放步骤2中的断点, 程序将执行到connect()处, 并且在reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); 处打断点

按照正常逻辑, 此时不应该再进入到定时重连任务中, 因为已经取消了此定时任务;  但是由于我们在线程A开始执行定时任务, 但是还未执行到connect()方法时, 让线程B执行了取消定时任务, 那么在connect()方法中将触发再次创建定时任务

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

5. 再次释放断点, 程序将执行到创建定时任务处

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

6. 取消并释放所有断点, 此时定时重连任务将进入死循环

7. 重新启动服务端程序, 客户端可以成功连接到服务端, 但是, 定时重连任务任然每隔2秒执行一次

Dubbo服务重启后不停报错[DUBBO] disconected from 问题原因解析

相关标签: 分布式架构