欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码学习-ReadTimeoutHandler

程序员文章站 2022-04-22 22:20:58
...
ReadTimeoutHandler的实现思路:
开启一个定时任务,如果在指定时间内没有接收到消息,则抛出ReadTimeoutException
这个异常的捕获,在开发中,交给跟在ReadTimeoutHandler后面的ChannelHandler,例如


private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);

public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
timeoutHandler, uptimeHandler);
}

public class UptimeClientHandler ...{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
Throwable cause = e.getCause();
if (cause instanceof ReadTimeoutException) {
// The connection was OK but there was no traffic for last period.
println("Disconnecting due to no inbound traffic");
} else {
cause.printStackTrace();
}
ctx.getChannel().close();
}
}


ReadTimeoutHandler的关键源码:


//ChannelOpen时启动定时任务:
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
initialize(ctx);
ctx.sendUpstream(e);
}
private void initialize(ChannelHandlerContext ctx) {
State state = state(ctx);
state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
}


//每次接收到消息时更新lastReadTime
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
State state = (State) ctx.getAttachment();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e);
}

/*定时任务:判断指定时间内是否有消息到达
举例:
假设超时时间设为30秒,初始的lastReadTime=10:00:00
那么,定时任务在10:00:30时执行run方法,如果:
1.在10:00:18有消息到达(lastReadTime更新为10:00:18),则表示没有超时,
继续监听下一个30秒,也就是定时任务需要在10:00:48再跑一次
因此下一次定时任务的执行距离现在是:nextDelay=30-(30-18)=18(秒)
2.没有消息到达,超时,抛异常
*/
private final class ReadTimeoutTask implements TimerTask {
public void run(Timeout timeout) throws Exception {
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback.
state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
state.timeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}

//为什么这里会调用initialize方法?分析在下面
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.getPipeline().isAttached()) {
// channelOpen event has been fired already, which means
// this.channelOpen() will not be invoked.
// We have to initialize here instead.
initialize(ctx);
} else {
// channelOpen event has not been fired yet.
// this.channelOpen() will be invoked and initialization will occur there.
}
}



上面的beforeAdd方法不太好理解
先看看ClientBootstrap的connect方法:

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

ChannelPipeline pipeline;
try {

//这里调用ChannelPipeline.addLast,在真正往链表里面插入之前,调用beforeAdd
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}

//创建一个代表Client的SocketChannel,SocketChannel的构造函数里会调用:
// pipeline.attach(this, sink);
//然后会fireChannelOpen
Channel ch = getFactory().newChannel(pipeline);

//...
}


从正常的流程来说,是先创建创建pipeline再创建channel,
也就是beforeAdd会在channel创建之前调用,那么beforeAdd里面的判断:
if (ctx.getPipeline().isAttached()) 就不会返回true(因为此时channel还未创建,更不可能与pipeline关联了)
这样看来,只需要在channelOpen中调用initialize就可以了?
不是的,
因为还有一种情况:动态添加ChannelHandler
有可能channel已经创建(与pipeline关联了),且channelOpen已经执行过了,
那就需要在添加ReadTimeoutHandler时,执行initialize
相关标签: java netty