项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
在使用Netty进行通信开发,如何选择编码器?在TCP粘包/拆包的问题如何解决?服务端在启动
流程是什么样的?连接服务流程是什么?
一 编解码器
1.1 什么叫编解码器
在网络传输的过程中,数据都是以字节流的方式进行传递。客户端在进行数据传递的时候
将原来的数据格式转化为字节,叫编码。服务端将字节转化为原来的格式,叫解码。统称
codec。
编解码器分为两部分-编码器和解码器,编码器负责出站,解码器负责入站。
1.2 解码器
1.2.1 概述
解码器负责入站操作,那么也一定要实现ChannelInboundHandler接口,所以解码器本质上也是ChannelHandler。
我们自定义编解码器只需要继承ByteToMessageDecoder(Netty提供抽象类,继承 ChannelInboundHandlerAdapter),实现decode()。Netty提供一些常用的解码器实现,
开箱即用。如下:
1 RedisDecoder 基于Redis协议的解码器
2 XmlDecoder 基于XML格式的解码器
3 JsonObjectDecoder 基于json数据格式的解码器
4 HttpObjectDecoder 基于http协议的解码器
Netty也提供了MessageToMessageDecoder,将⼀种格式转化为另⼀种格式的解码器,也提供了⼀些
实现,如下:
1 StringDecoder 将接收到ByteBuf转化为字符串
2 ByteArrayDecoder 将接收到ByteBuf转化字节数组
3 Base64Decoder 将由ByteBuf或US-ASCII字符串编码的Base64解码为ByteBuf。
1.2.2 将字节流转化为Intger类型(案例)
1 字节解码器
package com.haopt.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
/**
*
* @param ctx 上下⽂
* @param in 输⼊的ByteBuf消息数据
* @param out 转化后输出的容器
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() >= 4){ //int类型占⽤4个字节,所以需要判断是否存在有4个字节,再进⾏读取
out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
}
}
}
2 Handler
package com.haopt.netty.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Integer i = (Integer) msg; //这⾥可以直接拿到Integer类型的数据
System.out.println("服务端接收到的消息为:" + i);
}
}
3 在pipeline中添加解码器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ByteToIntegerDecoder())
.addLast(new ServerHandler());
}
1.3 编码器
1.3.1 概述
将原来的格式转化为字节。我们要实现自定义解码器只要继承MessageToByteEncoder
(实现了ChannelOutboundHandler接⼝),本质上也是ChannelHandler。
Netty中一些实现的编码器,如下:
1 ObjectEncoder 将对象(需要实现Serializable接⼝)编码为字节流
2 SocksMessageEncoder 将SocksMessage编码为字节流
3 HAProxyMessageEncoder 将HAProxyMessage编码成字节流
Netty也提供了MessageToMessageEncoder,将⼀种格式转化为另⼀种格式的编码器,也提供了⼀些
实现:
1 RedisEncoder 将Redis协议的对象进⾏编码
2 StringEncoder 将字符串进⾏编码操作
3 Base64Encoder 将Base64字符串进⾏编码操作
1.3.2 将Integer类型编码为字节进⾏传递(案例)
- 自定义编码器
package com.haopt.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
out.writeInt(msg);
}
}
- Handler
package com.haopt.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:" +
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- pipeline
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IntegerToByteEncoder());
ch.pipeline().addLast(new ClientHandler());
}
二 开发Http服务器
通过Netty中提供的http的解码器,进行http服务器开发。
2.1 Netty配置
- server
package com.haopt.netty.codec.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyHttpServer {
public static void main(String[] args) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//http请求的解码器
//将http请求中的uri以及请求体聚合成⼀个完整的FullHttpRequest对象
.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(1024 * 128))
.addLast(new HttpResponseEncoder()) //http响应的编码器
.addLast(new ChunkedWriteHandler()) //⽀持异步的⼤⽂件传输,防⽌内存溢出
.addLast(new ServerHandler());
}
}); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(8080).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- ServerHandler
package com.haopt.netty.codec.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.util.Map;
public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>{
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
//解析FullHttpRequest,得到请求参数
Map<String, String> paramMap = new RequestParser(request).parse();
String name = paramMap.get("name");
//构造响应对象
FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
StringBuilder sb = new StringBuilder();
sb.append("<h1>");
sb.append("你好," + name);
sb.append("</h1>");
httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8));
//操作完成后,将channel关闭
ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE);
}
}
- RequestParser
package com.haopt.netty.codec.http;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HTTP请求参数解析器, ⽀持GET, POST
*/
public class RequestParser {
private FullHttpRequest fullReq;
/**
* 构造⼀个解析器
* @param req
*/
public RequestParser(FullHttpRequest req) {
this.fullReq = req;
}
/**
* 解析请求参数
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
*/
public Map<String, String> parse() throws Exception {
HttpMethod method = fullReq.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是⼀个List, 只取第⼀个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new
HttpPostRequestDecoder(fullReq);
decoder.offer(fullReq);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不⽀持其它⽅法
throw new RuntimeException("不⽀持其它⽅法"); // 可以用自定义异常来替代
}
return parmMap;
}
}
- 对象
package com.haopt.netty.codec.obj;
public class User implements java.io.Serializable {
private static final long serialVersionUID = -89217070354741790L;
private Long id;
private String name;
private Integer age;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
2.2 服务端
- NettyObjectServer
package com.haopt.netty.codec.obj;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
public class NettyObjectServer {
public static void main(String[] args) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(
this.getClass().getClassLoader()
)))
.addLast(new ServerHandler());
}
}); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(6677).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- ServerHandler
package com.haopt.netty.codec.obj;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
//获取到user对象
System.out.println(user);
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
2.3 客户端
- NettyObjectClient
package com.haopt.netty.codec.obj;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyObjectClient {
public static void main(String[] args) throws Exception{
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- ClientHandler
package com.haopt.netty.codec.obj;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:" +
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setId(1L);
user.setName("张三");
user.setAge(20);
ctx.writeAndFlush(user);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2.4 JDK序列化的优化
JDK序列化使⽤是⽐较⽅便,但是性能较差,序列化后的字节⽐较⼤,所以⼀般在项⽬中不
会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架Hessian编解码。
- 导入依赖
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.63</version>
</dependency>
- User对象
package com.haopt.netty.codec.hessian;
public class User implements java.io.Serializable{
private static final long serialVersionUID = -8200798627910162221L;
private Long id;
private String name;
private Integer age;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
- Hessian序列化⼯具类
package com.haopt.netty.codec.hessian.codec;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Hessian序列化⼯具类
*
*/
public class HessianSerializer {
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(os);
try {
ho.writeObject(obj);
ho.flush();
return os.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
ho.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
os.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(is);
try {
return (T) hi.readObject(clazz);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
hi.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
is.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
- 编码器
package com.haopt.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HessianEncoder extends MessageToByteEncoder<User> {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
byte[] bytes = hessianSerializer.serialize(msg);
out.writeBytes(bytes);
}
}
- 解码器
public class HessianDecoder extends ByteToMessageDecoder {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
//复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉
//避免出现异常:did not read anything but decoded a message
//Netty检测没有读取任何字节就会抛出该异常
ByteBuf in2 = in.retainedDuplicate();
byte[] dst;
if (in2.hasArray()) {//堆缓冲区模式
dst = in2.array();
} else {
dst = new byte[in2.readableBytes()];
in2.getBytes(in2.readerIndex(), dst);
}
//跳过所有的字节,表示已经读取过了
in.skipBytes(in.readableBytes());
//反序列化
Object obj = hessianSerializer.deserialize(dst, User.class);
out.add(obj);
}
}
- 服务端
public class NettyHessianServer {
public static void main(String[] args) throws Exception {
// System.setProperty("io.netty.noUnsafe", "true");
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HessianDecoder())
.addLast(new ServerHandler());
}
}); //worker线程的处理器
// serverBootstrap.childOption(ChannelOption.ALLOCATOR,
UnpooledByteBufAllocator.DEFAULT);
ChannelFuture future = serverBootstrap.bind(6677).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
public class ServerHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelRead0(ChannelHandlerContext ctx, User user) throws
Exception {
//获取到user对象
System.out.println(user);
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
- 客户端
public class NettyHessianClient {
public static void main(String[] args) throws Exception {
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HessianEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
System.out.println("接收到服务端的消息:" +
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setId(1L);
user.setName("张三");
user.setAge(20);
ctx.writeAndFlush(user);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
二 TCP的粘包/拆包的问题以及解决
2.1 ReplayingDecoder
- 自定义解码器,将buf变为int
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
2. 使用ReplayingDecoder进行优化
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
- ReplayingDecoder使用说明
1 使⽤了特殊的ByteBuf,叫做ReplayingDecoderByteBuf,扩展了ByteBuf
2 重写了ByteBuf的readXxx()等⽅法,会先检查可读字节⻓度,⼀旦检测到不满⾜要求就直接抛出
REPLAY(REPLAY继承ERROR)
3 ReplayingDecoder重写了ByteToMessageDecoder的callDecode()⽅法,捕获Signal并在catch块
中重置ByteBuf的readerIndex。
4 继续等待数据,直到有了数据后继续读取,这样就可以保证读取到需要读取的数据。
5 类定义中的泛型S是⼀个⽤于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等⽅
法中会⽤到。在简单解码时也可以⽤java.lang.Void来占位。
- 注意
1 buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等⽅法会直接抛出异常)
2 在某些情况下会影响性能(如多次对同⼀段消息解码)
继承ReplayingDecoder,错误示例和修改
//这是⼀个错误的例⼦:
//消息中包含了2个integer,代码中decode⽅法会被调⽤两次,此时队列size不等于2,这段代码达不到期望结果。
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
//正确的做法:
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
ByteToIntegerDecoder2的实现
public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> {
/**
* @param ctx 上下⽂
* @param in 输⼊的ByteBuf消息数据
* @param out 转化后输出的容器
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
}
}
2.2 拆包和粘包问题重现(客户端向服务端发送十条数据)
- 客户端启动类
public class NettyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- 客户端ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:" +
msg.toString(CharsetUtil.UTF_8));
System.out.println("接收到服务端的消息数量:" + (++count));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!",
CharsetUtil.UTF_8));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 服务端NettyServer
public class NettyServer {
public static void main(String[] args) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ServerHandler());
}
}); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(5566).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("服务端接收到消息:" +
msg.toString(CharsetUtil.UTF_8));
System.out.println("服务端接收到消息数量:" + (++count));
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
2.2 什么是TCP的粘包和拆包问题
TCP是流传递的,所谓流,就是没有界限的数据。服务端接受客户端数据,并不知道是一条还是多条。
服务端如何拆包并不知道。
因此服务端和客户端进行数据传递的时候,要制定好拆包规则。客户端按照该规则进行粘包,服务端
按照该规则拆包。如果有任意违背该规则,服务端就不能拿到预期的数据。
- 解决思路(三种)
1. 在发送的数据包中添加头,在头⾥存储数据的⼤⼩,服务端就可以按照此⼤⼩来读取数据,这样就
知道界限在哪⾥了。
2. 以固定的⻓度发送数据,超出的分多次发送,不⾜的以0填充,接收端就以固定⻓度接收即可。
3. 在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。
2.3 实战:解决TCP的粘包/拆包问题
- 自定义协议
public class MyProtocol {
private Integer length; //数据头:⻓度
private byte[] body; //数据体
public Integer getLength() {
return length;
}
public void setLength(Integer length) {
this.length = length;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
- 编码器
public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLength());
out.writeBytes(msg.getBody());
}
}
- 解码器
public class MyDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int length = in.readInt(); //获取⻓度
byte[] data = new byte[length]; //根据⻓度定义byte数组
in.readBytes(data); //读取数据
MyProtocol myProtocol = new MyProtocol();
myProtocol.setLength(length);
myProtocol.setBody(data);
out.add(myProtocol);
}
}
- 客户端ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
System.out.println("接收到服务端的消息:" + new String(msg.getBody(),
CharsetUtil.UTF_8));
System.out.println("接收到服务端的消息数量:" + (++count));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8);
MyProtocol myProtocol = new MyProtocol();
myProtocol.setLength(data.length);
myProtocol.setBody(data);
ctx.writeAndFlush(myProtocol);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- NettyClient
public class NettyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyEncoder());
ch.pipeline().addLast(new MyDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
System.out.println("服务端接收到消息:" + new String(msg.getBody(),
CharsetUtil.UTF_8));
System.out.println("服务端接收到消息数量:" + (++count));
byte[] data = "ok".getBytes(CharsetUtil.UTF_8);
MyProtocol myProtocol = new MyProtocol();
myProtocol.setLength(data.length);
myProtocol.setBody(data);
ctx.writeAndFlush(myProtocol);
}
}
- NettyServer
public class NettyServer {
public static void main(String[] args) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new MyDecoder())
.addLast(new MyEncoder())
.addLast(new ServerHandler());
}
}); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(5566).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- 测试
三 Netty核心源码解析
3.1 服务端启动过程刨析
- 创建服务端Channel
1 ServerBootstrap对象的bind()⽅法,也是⼊⼝⽅法
2 AbstractBootstrap中的initAndRegister()进⾏创建Channel
创建Channel的⼯作由ReflectiveChannelFactory反射类中的newChannel()⽅法完成。
3 NioServerSocketChannel中的构造⽅法中,通过jdk nio底层的SelectorProvider打开ServerSocketChannel。
4 在AbstractNioChannel的构造⽅法中,设置channel为⾮阻塞:ch.configureBlocking(false);
5 通过的AbstractChannel的构造⽅法,创建了id、unsafe、pipeline内容。
6 通过NioServerSocketChannelConfig获取tcp底层的⼀些参数
- 初始化服务端Channel
1 AbstractBootstrap中的initAndRegister()进⾏初始化channel,代码:init(channel);
2 在ServerBootstrap中的init()⽅法设置channelOptions以及Attributes。
3 紧接着,将⽤户⾃定义参数、属性保存到局部变量currentChildOptions、currentChildAttrs,以
供后⾯使⽤
4 如果设置了serverBootstrap.handler()的话,会加⼊到pipeline中。
5 添加连接器ServerBootstrapAcceptor,有新连接加⼊后,将⾃定义的childHandler加⼊到连接的
pipeline中:
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(
new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//当客户端有连接时才会执⾏
final Channel child = (Channel) msg;
//将⾃定义的childHandler加⼊到连接的pipeline中
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 注册selector
//进⾏注册
1 initAndRegister()⽅法中的ChannelFuture regFuture = config().group().register(channel);
2 在io.netty.channel.AbstractChannel.AbstractUnsafe#register()中完成实际的注册
2.1 AbstractChannel.this.eventLoop = eventLoop; 进⾏eventLoop的赋值操作,后续的IO事件
⼯作将在由该eventLoop执⾏。
2.2 调⽤register0(promise)中的doRegister()进⾏实际的注册
3 io.netty.channel.nio.AbstractNioChannel#doRegister进⾏了⽅法实现
//通过jdk底层进⾏注册多路复⽤器
//javaChannel() --前⾯创建的channel
//eventLoop().unwrappedSelector() -- 获取selector
//注册感兴趣的事件为0,表明没有感兴趣的事件,后⾯会进⾏重新注册事件
//将this对象以attachment的形式注册到selector,⽅便后⾯拿到当前对象的内容
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 绑定端口
1 ⼊⼝在io.netty.bootstrap.AbstractBootstrap#doBind0(),启动⼀个线程进⾏执⾏绑定端⼝操作
2 调⽤io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress,
io.netty.channel.ChannelPromise)⽅法,再次启动线程执⾏
3 最终调⽤io.netty.channel.socket.nio.NioServerSocketChannel#doBind()⽅法进⾏绑定操作
//通过jdk底层的channel进⾏绑定
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress,
config.getBacklog());
}
}
什么时候进⾏更新selector的主从事件?
最终在io.netty.channel.nio.AbstractNioChannel#doBeginRead()⽅法中完成的
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp); //设置
感兴趣的事件为OP_ACCEPT
}
}
//在NioServerSocketChannel的构造⽅法中进⾏了赋值
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this,
javaChannel().socket());
}
3.2 连接请求过程源码刨析
- 新连接接入
入口在
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,
io.netty.channel.nio.AbstractNioChannel)中
进⼊NioMessageUnsafe的read()⽅法
调⽤io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() ⽅法,创建
jdk底层的channel,封装成NioSocketChannel添加到List容器中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an
accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
创建NioSocketChannel对象
new NioSocketChannel(this, ch),通过new的⽅式进⾏创建
调⽤super的构造⽅法
传⼊SelectionKey.OP_READ事件标识
创建id、unsafe、pipeline对象
设置⾮阻塞 ch.configureBlocking(false);
创建NioSocketChannelConfig对象
- 注册读事件
在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe中的:
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //传播读事件
}
在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)⽅
法中
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//执⾏channelRead,需要注意的是,第⼀次执⾏是HeadHandler,第⼆次是
ServerBootstrapAcceptor
//通过ServerBootstrapAcceptor进⼊和 新连接接⼊的 注册selector相同的
逻辑进⾏注册以及事件绑定
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
四 使用Netty优化点
4.1 零拷贝
1 Bytebuf 使⽤的是⽤池化的Direct Buffer类型使⽤的堆外内存,不需要进⾏字节缓冲区的⼆次拷
⻉,如果使⽤堆内存,JVM会先拷⻉到堆内,再写⼊Socket,就多了⼀次拷⻉。
2 CompositeByteBuf将多个ByteBuf封装成⼀个ByteBuf,在添加ByteBuf时不需要进程拷⻉。
3 Netty的⽂件传输类DefaultFileRegion的transferTo⽅法将⽂件发送到⽬标channel中,不需要进
⾏循环拷⻉,提升了性能。
4.2 EventLoop的任务调度
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.writeAndFlush(data)
}
});
而不是使用hannel.writeAndFlush(data);
EventLoop的任务调度直接放入到channel所对应的EventLoop的执行队列,后者会导致线程切换。
备注:在writeAndFlush的底层,如果没有通过eventLoop执行的话,就会启动新的线程。
4.3 减少ChannelPipline的调⽤⻓度
public class YourHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//msg从整个ChannelPipline中⾛⼀遍,所有的handler都要经过。
ctx.channel().writeAndFlush(msg);
//从当前handler⼀直到pipline的尾部,调⽤更短。
ctx.writeAndFlush(msg);
}
}
4.4 减少ChannelHandler的创建(基本上不会配置)
如果channelhandler是⽆状态的(即不需要保存任何状态参数),那么使⽤Sharable注解,并在
bootstrap时只创建⼀个实例,减少GC。否则每次连接都会new出handler对象。
@ChannelHandler.Shareable
public class StatelessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {}
}
public class MyInitializer extends ChannelInitializer<Channel> {
private static final ChannelHandler INSTANCE = new StatelessHandler();
@Override
public void initChannel(Channel ch) {
ch.pipeline().addLast(INSTANCE);
}
}
注意:ByteToMessageDecoder之类的编解码器是有状态的,不能使⽤Sharable注解。
4.5 配置参数的设置
服务端的bossGroup只需要设置为1即可,因为ServerSocketChannel在初始化阶段,只会
注册到某⼀个eventLoop上,⽽这个eventLoop只会有⼀个线程在运⾏,所以没有必要设置为
多线程。⽽ IO 线程,为了充分利⽤ CPU,同时考虑减少线上下⽂切换的开销,通常workGroup
设置为CPU核数的两倍,这也是Netty提供的默认值。
在对于响应时间有⾼要求的场景,使⽤.childOption(ChannelOption.TCP_NODELAY, true)
和.option(ChannelOption.TCP_NODELAY, true)来禁⽤nagle算法,不等待,⽴即发送。
五 ByteBuf的api
- 顺序读api
- 顺序写操作
本文地址:https://blog.csdn.net/flowerAndJava/article/details/109908839