欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty应用实战:客户端重连

程序员文章站 2022-04-22 21:56:26
...

一、什么是客户端的重连?

简单来说,当连接中断时,客户端对运行的服务端进行重新连接。

二、为什么要客户端进行重连?

就是为了防止由于网络或其他原因导致的连接中断。

三、如何实现重连?

3.1 核心思路

在bootstrap的连接方法中,使用ChannelListener监听客户端的连接状态,如果连接失败,则进行重连。

以客户端为例,简单的实现对一个IP进行重新连接,可设置重连时间,本文只解决重连方法的实现,其他的功能不再说明;

思路:将客户端的启动、连接与重连拆分为3个方法(因为要实现代码功能的原子性,可做到对客户端的状态达到原子性的控制),在bootstrap的连接方法中,使用ChannelListener实现重连机制;

3.2 实现步骤(下面都是一些伪代码思路,详细代码请看 3.3可执行代码)

1.实现客户端的启动方法;
public class Client {
    /** 客户端 */
    private EventLoopGroup eventLoopGroup;
    /** 存放客户端bootstrap对象 */
    private Bootstrap bootstrap;
    /** 存放客户端channel对象 */
    private Channel channel;
    
    /**
     * 启动客户端
     *
     */
    public void startup() {
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).option(ChannelOption.SO_KEEPALIVE, true).channel(NioSocketChannel.class)
            .handler(new ClientInitializer());
        try {
            // 连接服务端
            doConnect();
        } catch (Exception e) {
            log.warn("Connect fail." + e.getMessage());
        }
    }
    
    /**
     * client初始化
     *
     */
    class ClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            pipeline.addLast("requesetHander", new ClientHandler());
        }
    }
}
2.实现连接和重连方法,并在两个方法中实现ChannelListener监听器
public class Client {    
    /**
     * 连接到服务端
     * @throws InterruptedException 中断异常
     */
    public void doConnect() throws InterruptedException {
        log.debug("Start connecting server");
        if (channel != null && channel.isActive()) {
            return;
        }
        bootstrap.connect(serverAddr, port).addListener(new ChannelListener()).sync().channel();
    }
    
    /**
     * 重新连接
     * @param serverIp 连接的IP
     *
     */
    private void reConnect() {
        try {
            log.debug("Start reconnect to server." + serverIp + ":" + port);
            if (channel != null && channel.isOpen()) {
                if (log.isDebugEnabled()) {
                    log.debug(String.format("server [%s] channel is active, close it and reconnect", serverIp));
                }
                channel.close();
            }
            bootstrap.connect(new InetSocketAddress(serverIp, port))
            .addListener(new ChannelListener()).sync().channel();
        } catch (Exception e) {
            log.warn("ReConnect to server failure.server=" + serverIp + ":" + port + ":" + e.getMessage());
        }
    }
}  
3.创建业务handler处理器类
/**
     * 业务handler分发日志,任务等
     *
     */
    class ClientHandler extends SimpleChannelInboundHandler<LogMsg> {
       @Override
        protected void channelRead0(ChannelHandlerContext ctx, LogMsg request) throws Exception {
            
        }
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info(String.format("the client connect to server=%s, local=%s",
                    ctx.channel().remoteAddress(), ctx.channel().localAddress()));
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.close();
        }
    }
4.实现客户端第一次连接服务端失败时的重连ChannelListener;
    /**
     * ChannelFutureListener
     * @author zhangyang
     *
     */
    private class ChannelListener implements ChannelFutureListener {
        // 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // 连接成功后保存Channel
            if (future.isSuccess()) {
                channel = future.channel();
                SocketAddress sa = channel.remoteAddress();
                connectedServerAddr = ((InetSocketAddress) sa).getAddress().getHostAddress();
                log.info("Connect success " + connectedServerAddr + ":" + port);
            } else {
                // 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连;
                future.channel().eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        // 进行重连
                        reConnect(serverIp);
                    }
                }, delaySeconds, TimeUnit.SECONDS);
            }
        }
    }
5.实现连接中断channelInactive的重连(即连接成功之后,由于网络中断等其他原因,进行重连)

在业务handler处理器类的channelInactive方法中进行重连;

/**
     * 业务handler分发日志,任务等
     *
     */
    class ClientHandler extends SimpleChannelInboundHandler<LogMsg> {
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.close();
            TimeUnit.SECONDS.sleep(delaySeconds);
            // 重连
            reConnect(serverIp);
        }
        
        // 注意:发生异常不需要进行重连
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error(String.format("Remote service [%s] has exception.", ctx.channel().remoteAddress()), cause);
            ctx.close();
        }
    }
相关标签: 项目总结