Practical Netty (5) TCP反向代理服务器
Practical Netty (5) TCP反向代理服务器
- 作者:柳大·Poechant(钟超)
- 邮箱:zhongchao.ustc#gmail.com(# -> @)
- 博客:Blog.CSDN.net/Poechant
- 微博:weibo.com/lauginhom
- 日期:June 11th, 2012
以下针对 TCP 反向代理服务器。
1. 前端连接被创建时,创建后端连接
一个平凡的 ServerBootstrap 会有如下的一个语句:
serverBootstrap.setPipelineFactory(
new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
p.addLast("handler", new PoechantProxyHandler());
return p;
}
});
这个PoechantProxyHandler
如何实现,就成了关键了。
每个 connection 建立后,会创建一个 channel 专门用于服务这个连接。此时会响应ChannelPipelineHandler
的此方法:
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
这时你可以为这个 connection(不妨称其为前端连接),创建一个与后端连接的 connection(不妨称其为后端连接)。此时对于后端服务器,你要扮演的是 client 的角色,所以需要一个 ClientBootstrap。该 client 连接成功后,就可以从前端连接中读取数据了。
private volatile Channel outboundChannel;
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
final Channel inboundChannel = e.getChannel();
inboundChannel.setReadable(false);
ClientBootstrap cb = new ClientBootstrap(cf);
cb.getPipeline().addLast("handler", new BackendChannelHandler(e.getChannel()));
ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
outboundChannel = f.getChannel();
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
inboundChannel.setReadable(true);
} else {
inboundChannel.close();
}
}
});
}
ClientBootstrap.connect
后会创建一个 Channel,与后端服务器连接。对于 ClientBootstrap 是不存在 parent channel 和 child channel 这样需要考虑的策略的。
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom
2. 前端接收的消息,转发给后端
亲,请边看下面的代码,边读我写的这段话。msg
是从前端Channel
(因为这是一个为前端ServerBootstrap
服务的 ChannelPipelineHandler)拿到的消息,然后把它写入到后端连接的Channel
(outboundChannel
就是前面我们在前端连接被建立时创建的后端Channel
)。
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
synchronized (trafficLock) {
outboundChannel.write(msg);
if (!outboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
}
}
2.1. 对后端通道的操作,要做同步
需要注意的是,凡是对 outboundHandler 的操作都是需要同步的,因为 PoechantProxyHandler 中的 outboundHandler 是 volatile,就是异变的,为什么呢?因为它的OP_READ
(相关的讨论可以参见这里)可能被改变,所以要同步。所以这里就用到了 java 的 synchronized 同步代码块,在每个 ProchantProxyHandler 的实例身上都有一个trafficLock
成员,当做锁来使用,这是一个 Object,如下:
final Object trafficLock = new Object();
2.2. 后端不可写,则前端不要读
当然,说的就是这段:
if (!outboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom
3. 连接可操作状态改变时触发 channelInterestChanged
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e)
throws Exception
简单说,就是“This function was invoked when aChannel
'sinterestOps
was changed.” 那什么是interestOps
呢?
The interestOps value which tells that only read operation has been suspended.
org.jboss.netty.channel.Channel.OP_NONE
org.jboss.netty.channel.Channel.OP_READ
org.jboss.netty.channel.Channel.OP_WRITE
org.jboss.netty.channel.Channel.OP_READ_WRITE
他们的取值范围如下:
public static final int | OP_NONE | 0 = 0000 |
public static final int | OP_READ | 1 = 0001 |
public static final int | OP_WRITE | 4 = 0100 |
public static final int | OP_READ_WRITE | 5 = 0101 |
前端连接可写时,后端连接就需要可读。
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
synchronized (trafficLock) {
if (e.getChannel().isWritable()) {
if (outboundChannel != null) {
outboundChannel.setReadable(true);
}
}
}
}
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom
4. 后端连接的 ChannelPipelineHandler
后端连接有相应的 Handler,在创建后端连接时:
ClientBootstrap cb = new ClientBootstrap(cf);
cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
所以可如下定义:
private class OutboundHandler extends SimpleChannelUpstreamHandler {
private final Channel inboundChannel;
OutboundHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {…}
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {…}
}
其中 messageReceived 和 channelInterestChanged 的实现方式如下:
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
ChannelBuffer msg = (ChannelBuffer) e.getMessage();
synchronized (trafficLock) {
inboundChannel.write(msg);
if (!inboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
}
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
synchronized (trafficLock) {
if (e.getChannel().isWritable()) {
inboundChannel.setReadable(true);
}
}
}
这里可以概括一下。
frontendchannelOpen
|
N/A | set frontend writable |
frontendmessageReceived
|
backend nonwritable | set frontend nonreadable |
frontendchannelInterestChanged
|
frontend writable | set backend readable |
backendmessageReceived
|
frontend nonwritable | set backend nonreadable |
backendchannelInterestChanged
|
backend writable | set frontend readable |
可以看到:
- 当一方收到消息时,关心的是另一方可不可写,如果另一方不可写则设置该方不可读(因为收到消息,自己一定是可读的,而对方的读状态不需考虑);
- 当一方
interestOps
改变时,关心的是自己是否变成可写,如果自己可写则设置对方可读(因为自己的读状态,只需要到messageReceived
时考虑即可,而对于自己变成不可写这种情况,在messageReceived
中已经考虑了)。
-
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom
-
上一篇: 幽默笑段聊生活
下一篇: 爆笑之逗B剧场第85季