Netty应用实战:客户端重连
程序员文章站
2022-04-22 21:56:26
...
一、什么是客户端的重连?
简单来说,当连接中断时,客户端对运行的服务端进行重新连接。
二、为什么要客户端进行重连?
就是为了防止由于网络或其他原因导致的连接中断。
三、如何实现重连?
3.1 核心思路
在bootstrap的连接方法中,使用ChannelListener监听客户端的连接状态,如果连接失败,则进行重连。
以客户端为例,简单的实现对一个IP进行重新连接,可设置重连时间,本文只解决重连方法的实现,其他的功能不再说明;
思路:将客户端的启动、连接与重连拆分为3个方法(因为要实现代码功能的原子性,可做到对客户端的状态达到原子性的控制),在bootstrap的连接方法中,使用ChannelListener实现重连机制;
3.2 实现步骤(下面都是一些伪代码思路,详细代码请看 3.3可执行代码)
1.实现客户端的启动方法;
public class Client {
/** 客户端 */
private EventLoopGroup eventLoopGroup;
/** 存放客户端bootstrap对象 */
private Bootstrap bootstrap;
/** 存放客户端channel对象 */
private Channel channel;
/**
* 启动客户端
*
*/
public void startup() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).option(ChannelOption.SO_KEEPALIVE, true).channel(NioSocketChannel.class)
.handler(new ClientInitializer());
try {
// 连接服务端
doConnect();
} catch (Exception e) {
log.warn("Connect fail." + e.getMessage());
}
}
/**
* client初始化
*
*/
class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("requesetHander", new ClientHandler());
}
}
}
2.实现连接和重连方法,并在两个方法中实现ChannelListener监听器
public class Client {
/**
* 连接到服务端
* @throws InterruptedException 中断异常
*/
public void doConnect() throws InterruptedException {
log.debug("Start connecting server");
if (channel != null && channel.isActive()) {
return;
}
bootstrap.connect(serverAddr, port).addListener(new ChannelListener()).sync().channel();
}
/**
* 重新连接
* @param serverIp 连接的IP
*
*/
private void reConnect() {
try {
log.debug("Start reconnect to server." + serverIp + ":" + port);
if (channel != null && channel.isOpen()) {
if (log.isDebugEnabled()) {
log.debug(String.format("server [%s] channel is active, close it and reconnect", serverIp));
}
channel.close();
}
bootstrap.connect(new InetSocketAddress(serverIp, port))
.addListener(new ChannelListener()).sync().channel();
} catch (Exception e) {
log.warn("ReConnect to server failure.server=" + serverIp + ":" + port + ":" + e.getMessage());
}
}
}
3.创建业务handler处理器类
/**
* 业务handler分发日志,任务等
*
*/
class ClientHandler extends SimpleChannelInboundHandler<LogMsg> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LogMsg request) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(String.format("the client connect to server=%s, local=%s",
ctx.channel().remoteAddress(), ctx.channel().localAddress()));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
}
4.实现客户端第一次连接服务端失败时的重连ChannelListener;
/**
* ChannelFutureListener
* @author zhangyang
*
*/
private class ChannelListener implements ChannelFutureListener {
// 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 连接成功后保存Channel
if (future.isSuccess()) {
channel = future.channel();
SocketAddress sa = channel.remoteAddress();
connectedServerAddr = ((InetSocketAddress) sa).getAddress().getHostAddress();
log.info("Connect success " + connectedServerAddr + ":" + port);
} else {
// 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连;
future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
// 进行重连
reConnect(serverIp);
}
}, delaySeconds, TimeUnit.SECONDS);
}
}
}
5.实现连接中断channelInactive的重连(即连接成功之后,由于网络中断等其他原因,进行重连)
在业务handler处理器类的channelInactive方法中进行重连;
/**
* 业务handler分发日志,任务等
*
*/
class ClientHandler extends SimpleChannelInboundHandler<LogMsg> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
TimeUnit.SECONDS.sleep(delaySeconds);
// 重连
reConnect(serverIp);
}
// 注意:发生异常不需要进行重连
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error(String.format("Remote service [%s] has exception.", ctx.channel().remoteAddress()), cause);
ctx.close();
}
}
上一篇: 环信即时通讯——集成客户端