Netty4.0学习笔记系列之四:混合使用coder和handler
Handler如何使用在前面的例子中已经有了示范,那么同样是扩展自ChannelHandler的Encoder和Decoder,与Handler混合后又是如何使用的?本文将通过一个实际的小例子来展示它们的用法。
该例子模拟一个Server和Client,两者之间通过http协议进行通讯,在Server内部通过一个自定义的StringDecoder把httprequest转换成String。Server端处理完成后,通过StringEncoder把String转换成httpresponse,发送给客户端。具体的处理流程如图所示:
其中红色框中的Decoder、Encoder及request都是Netty框架自带的,灰色框中的三个类是我自己实现的。
Server端的类有:Server StringDecoder BusinessHandler StringEncoder四个类。
1.Server.java启动netty服务,并注册handler、coder,注意注册的顺序:
package com.bijian.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; // 测试coder 和 handler 的混合使用 public class Server { public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 都属于ChannelOutboundHandler,逆序执行 ch.pipeline().addLast(new HttpResponseEncoder()); ch.pipeline().addLast(new StringEncoder()); // 都属于ChannelIntboundHandler,按照顺序执行 ch.pipeline().addLast(new HttpRequestDecoder()); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new BusinessHandler()); } }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Server server = new Server(); server.start(8000); } }
2.StringDecoder.java把httpRequest转换成String,其中ByteBufToBytes是一个工具类,负责对ByteBuf中的数据进行读取
package com.bijian.netty.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.bijian.netty.util.ByteBufToBytes; public class StringDecoder extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(StringDecoder.class); private ByteBufToBytes reader; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("StringDecoder : msg's type is " + msg.getClass()); if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(request)); } if (msg instanceof HttpContent) { HttpContent content = (HttpContent) msg; reader.reading(content.content()); if (reader.isEnd()) { byte[] clientMsg = reader.readFull(); logger.info("StringDecoder : change httpcontent to string "); ctx.fireChannelRead(new String(clientMsg)); } } } }
3.BusinessHandler.java具体处理业务的类,把客户端的请求打印出来,并向客户端发送信息
package com.bijian.netty.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BusinessHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(BusinessHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String clientMsg = "client said : " + (String) msg; logger.info("BusinessHandler read msg from client :" + clientMsg); ctx.write("I am very OK!"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
4.StringEncoder.java把字符串转换成HttpResponse
package com.bijian.netty.server; import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders.Values; // 把String转换成httpResponse public class StringEncoder extends ChannelOutboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(StringEncoder.class); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { logger.info("StringEncoder response to client."); String serverMsg = (String) msg; FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(serverMsg .getBytes())); response.headers().set(CONTENT_TYPE, "text/plain"); response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); response.headers().set(CONNECTION, Values.KEEP_ALIVE); ctx.write(response); ctx.flush(); } }
Client端有两个类:Client.java、ClientInitHandler.java
1.Client.java与Server端建立连接,并向Server端发送HttpRequest请求。
package com.bijian.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpVersion; import java.net.URI; public class Client { public void connect(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpResponseDecoder()); ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new ClientInitHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); URI uri = new URI("http://127.0.0.1:8000"); String msg = "Are you ok?"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes())); request.headers().set(HttpHeaders.Names.HOST, host); request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); f.channel().write(request); f.channel().flush(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Client client = new Client(); client.connect("127.0.0.1", 8000); } }
2.ClientInitHandler.java从Server端读取响应信息
package com.bijian.netty.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; import com.bijian.netty.util.ByteBufToBytes; public class ClientInitHandler extends ChannelInboundHandlerAdapter { private ByteBufToBytes reader; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; if (HttpHeaders.isContentLengthSet(response)) { reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(response)); } } if (msg instanceof HttpContent) { HttpContent httpContent = (HttpContent) msg; ByteBuf content = httpContent.content(); reader.reading(content); content.release(); if (reader.isEnd()) { String resultStr = new String(reader.readFull()); System.out.println("Server said:" + resultStr); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.close(); } }
工具类:ByteBufToBytes 对ByteBuf的数据进行读取,支持流式读取(reading 和 readFull方法结合使用)
ByteBufToBytes.java
package com.bijian.netty.util; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; public class ByteBufToBytes { private ByteBuf temp; private boolean end = true; public ByteBufToBytes(int length) { temp = Unpooled.buffer(length); } public void reading(ByteBuf datas) { datas.readBytes(temp, datas.readableBytes()); if (this.temp.writableBytes() != 0) { end = false; } else { end = true; } } public boolean isEnd() { return end; } public byte[] readFull() { if (end) { byte[] contentByte = new byte[this.temp.readableBytes()]; this.temp.readBytes(contentByte); this.temp.release(); return contentByte; } else { return null; } } public byte[] read(ByteBuf datas) { byte[] bytes = new byte[datas.readableBytes()]; datas.readBytes(bytes); return bytes; } }
运行结果:
23:22:40.846 INFO com.bijian.netty.server.StringDecoder 22 channelRead - StringDecoder : msg's type is class io.netty.handler.codec.http.DefaultHttpRequest 23:22:40.848 INFO com.bijian.netty.server.StringDecoder 22 channelRead - StringDecoder : msg's type is class io.netty.handler.codec.http.DefaultLastHttpContent 23:22:40.848 INFO com.bijian.netty.server.StringDecoder 34 channelRead - StringDecoder : change httpcontent to string 23:22:40.849 INFO com.bijian.netty.server.BusinessHandler 16 channelRead - BusinessHandler read msg from client :client said : Are you ok? 23:22:40.849 INFO com.bijian.netty.server.StringEncoder 28 write - StringEncoder response to client.
可以看到执行顺序为:StringDecoder、BusinessHandler、StringEncoder,其它的都是Netty自身的,没有打印。
通过该实例证明,Encoder、Decoder的本质也是Handler,它们的执行顺序、使用方法与Handler保持一致。
执行顺序是:Encoder 先注册的后执行,与OutboundHandler一致;Decoder是先注册的先执行,与InboundHandler一致。
文章来源:http://blog.csdn.net/u013252773/article/details/21564301
上一篇: 从“HDU 2005 第几天?”谈起