欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)

程序员文章站 2022-06-19 08:47:21
在使用Netty进行通信开发,如何选择编码器?在TCP粘包/拆包的问题如何解决?服务端在启动流程是什么样的?连接服务流程是什么?一 编解码器1.1 什么叫编解码器在网络传输的过程中,数据都是以字节流的方式进行传递。客户端在进行数据传递的时候将原来的数据格式转化为字节,叫编码。服务端将字节转化为原来的格式,叫解码。统称codec。编解码器分为两部分-编码器和解码器,编码器负责出站,解码器负责入站。1.2 解码器1.2.1 概述解码器负责入站操作,那么也一定要实现ChannelInbound...

在使用Netty进行通信开发,如何选择编码器?在TCP粘包/拆包的问题如何解决?服务端在启动
流程是什么样的?连接服务流程是什么?

一 编解码器

1.1 什么叫编解码器

在网络传输的过程中,数据都是以字节流的方式进行传递。客户端在进行数据传递的时候
将原来的数据格式转化为字节,叫编码。服务端将字节转化为原来的格式,叫解码。统称
codec。
编解码器分为两部分-编码器和解码器,编码器负责出站,解码器负责入站。

1.2 解码器

1.2.1 概述

解码器负责入站操作,那么也一定要实现ChannelInboundHandler接口,所以解码器本质上也是ChannelHandler。
我们自定义编解码器只需要继承ByteToMessageDecoder(Netty提供抽象类,继承 ChannelInboundHandlerAdapter),实现decode()。Netty提供一些常用的解码器实现,
开箱即用。如下:

1 RedisDecoder 基于Redis协议的解码器
2 XmlDecoder 基于XML格式的解码器
3 JsonObjectDecoder 基于json数据格式的解码器
4 HttpObjectDecoder 基于http协议的解码器

Netty也提供了MessageToMessageDecoder,将⼀种格式转化为另⼀种格式的解码器,也提供了⼀些
实现,如下:

1 StringDecoder 将接收到ByteBuf转化为字符串
2 ByteArrayDecoder 将接收到ByteBuf转化字节数组
3 Base64Decoder 将由ByteBuf或US-ASCII字符串编码的Base64解码为ByteBuf。

1.2.2 将字节流转化为Intger类型(案例)

1 字节解码器

package com.haopt.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
    /**
    *
    * @param ctx 上下⽂
    * @param in 输⼊的ByteBuf消息数据
    * @param out 转化后输出的容器
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() >= 4){ //int类型占⽤4个字节,所以需要判断是否存在有4个字节,再进⾏读取
            out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
        }
    }
}

2 Handler

package com.haopt.netty.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Integer i = (Integer) msg; //这⾥可以直接拿到Integer类型的数据
        System.out.println("服务端接收到的消息为:" + i);
    }
}

3 在pipeline中添加解码器

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline()
    .addLast(new ByteToIntegerDecoder())
    .addLast(new ServerHandler());
}

1.3 编码器

1.3.1 概述

将原来的格式转化为字节。我们要实现自定义解码器只要继承MessageToByteEncoder
(实现了ChannelOutboundHandler接⼝),本质上也是ChannelHandler。
Netty中一些实现的编码器,如下:

1 ObjectEncoder 将对象(需要实现Serializable接⼝)编码为字节流
2 SocksMessageEncoder 将SocksMessage编码为字节流
3 HAProxyMessageEncoder 将HAProxyMessage编码成字节流

Netty也提供了MessageToMessageEncoder,将⼀种格式转化为另⼀种格式的编码器,也提供了⼀些
实现:

1 RedisEncoder 将Redis协议的对象进⾏编码
2 StringEncoder 将字符串进⾏编码操作
3 Base64Encoder 将Base64字符串进⾏编码操作

1.3.2 将Integer类型编码为字节进⾏传递(案例)

  1. 自定义编码器
package com.haopt.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
       out.writeInt(msg);
    }
}
  1. Handler
package com.haopt.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" +
        msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	ctx.writeAndFlush(123);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	cause.printStackTrace();
    	ctx.close();
    }
}
  1. pipeline
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new IntegerToByteEncoder());
    ch.pipeline().addLast(new ClientHandler());
}

二 开发Http服务器

通过Netty中提供的http的解码器,进行http服务器开发。

2.1 Netty配置

  1. server
package com.haopt.netty.codec.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyHttpServer {
    public static void main(String[] args) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // ⼯作线程,线程数默认是:cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
        // 服务器启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        //配置server通道
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                //http请求的解码器
                //将http请求中的uri以及请求体聚合成⼀个完整的FullHttpRequest对象
                .addLast(new HttpRequestDecoder()) 
                .addLast(new HttpObjectAggregator(1024 * 128))
                .addLast(new HttpResponseEncoder()) //http响应的编码器
                .addLast(new ChunkedWriteHandler()) //⽀持异步的⼤⽂件传输,防⽌内存溢出
                .addLast(new ServerHandler());
            }
          }); //worker线程的处理器
          ChannelFuture future = serverBootstrap.bind(8080).sync();
          System.out.println("服务器启动完成。。。。。");
          //等待服务端监听端⼝关闭
          future.channel().closeFuture().sync();
        } finally {
          //优雅关闭
          boss.shutdownGracefully();
          worker.shutdownGracefully();
     	}
     }
}
  1. ServerHandler
package com.haopt.netty.codec.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.util.Map;
public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>{
    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        //解析FullHttpRequest,得到请求参数
        Map<String, String> paramMap = new RequestParser(request).parse();
        String name = paramMap.get("name");
        //构造响应对象
        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
        StringBuilder sb = new StringBuilder();
        sb.append("<h1>");
        sb.append("你好," + name);
        sb.append("</h1>");
        httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8));
        //操作完成后,将channel关闭
        ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE); 
    }
}
  1. RequestParser
package com.haopt.netty.codec.http;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HTTP请求参数解析器, ⽀持GET, POST
*/
public class RequestParser {
    private FullHttpRequest fullReq;
    /**
    * 构造⼀个解析器
    * @param req
    */
    public RequestParser(FullHttpRequest req) {
    	this.fullReq = req;
    }
    /**
    * 解析请求参数
    * @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
    *
    * @throws IOException
    */
    public Map<String, String> parse() throws Exception {
        HttpMethod method = fullReq.method();
        Map<String, String> parmMap = new HashMap<>();
        if (HttpMethod.GET == method) {
          // 是GET请求
          QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
          decoder.parameters().entrySet().forEach( entry -> {
          // entry.getValue()是⼀个List, 只取第⼀个元素
          parmMap.put(entry.getKey(), entry.getValue().get(0));
          });
        } else if (HttpMethod.POST == method) {
          // 是POST请求
          HttpPostRequestDecoder decoder = new
          HttpPostRequestDecoder(fullReq);
          decoder.offer(fullReq);
          List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
          for (InterfaceHttpData parm : parmList) {
          Attribute data = (Attribute) parm;
          parmMap.put(data.getName(), data.getValue());
          }
        } else {
          // 不⽀持其它⽅法
          throw new RuntimeException("不⽀持其它⽅法"); // 可以用自定义异常来替代
        }
        return parmMap;
    }
}
  1. 对象
package com.haopt.netty.codec.obj;
public class User implements java.io.Serializable {
    private static final long serialVersionUID = -89217070354741790L;
    private Long id;
    private String name;
    private Integer age;
    public Long getId() {
    	return id;
    }
    public void setId(Long id) {
    	this.id = id;
    }
    public String getName() {
    	return name;
    }
    public void setName(String name) {
    	this.name = name;
    }
    public Integer getAge() {
    	return age;
    }
    public void setAge(Integer age) {
    	this.age = age;
    }
    @Override
    public String toString() {
      return "User{" +
        "id=" + id +
        ", name='" + name + '\'' +
        ", age=" + age +
        '}';
      }
}

2.2 服务端

  1. NettyObjectServer
package com.haopt.netty.codec.obj;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
public class NettyObjectServer {
    public static void main(String[] args) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // ⼯作线程,线程数默认是:cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
        // 服务器启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        //配置server通道
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(
                this.getClass().getClassLoader()
                )))
                .addLast(new ServerHandler());
            }
        }); //worker线程的处理器
        ChannelFuture future = serverBootstrap.bind(6677).sync();
        System.out.println("服务器启动完成。。。。。");
        //等待服务端监听端⼝关闭
        future.channel().closeFuture().sync();
        } finally {
        //优雅关闭
        boss.shutdownGracefully();
        worker.shutdownGracefully();
        }
    }
}
  1. ServerHandler
package com.haopt.netty.codec.obj;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<User> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
        //获取到user对象
        System.out.println(user);
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
}

2.3 客户端

  1. NettyObjectClient
package com.haopt.netty.codec.obj;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyObjectClient {
public static void main(String[] args) throws Exception{
    EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }
}
  1. ClientHandler
package com.haopt.netty.codec.obj;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" +
        msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        User user = new User();
        user.setId(1L);
        user.setName("张三");
        user.setAge(20);
        ctx.writeAndFlush(user);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2.4 JDK序列化的优化

JDK序列化使⽤是⽐较⽅便,但是性能较差,序列化后的字节⽐较⼤,所以⼀般在项⽬中不
会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架Hessian编解码。

  1. 导入依赖
<dependency>
  <groupId>com.caucho</groupId>
  <artifactId>hessian</artifactId>
  <version>4.0.63</version>
</dependency>
  1. User对象
package com.haopt.netty.codec.hessian;
public class User implements java.io.Serializable{
    private static final long serialVersionUID = -8200798627910162221L;
    private Long id;
    private String name;
    private Integer age;
    public Long getId() {
    	return id;
    }
    public void setId(Long id) {
    	this.id = id;
    }
    public String getName() {
    	return name;
    }
    public void setName(String name) {
    	this.name = name;
    }
    public Integer getAge() {
    	return age;
    }
    public void setAge(Integer age) {
    	this.age = age;
    }
    @Override
    public String toString() {
      return "User{" +
      "id=" + id +
      ", name='" + name + '\'' +
      ", age=" + age +
      '}';
    }
}
  1. Hessian序列化⼯具类
package com.haopt.netty.codec.hessian.codec;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Hessian序列化⼯具类
*
*/
public class HessianSerializer {
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        HessianOutput ho = new HessianOutput(os);
        try {
          ho.writeObject(obj);
          ho.flush();
          return os.toByteArray();
        } catch (IOException e) {
        	throw new RuntimeException(e);
        } finally {
          try {
          	ho.close();
          } catch (IOException e) {
          	throw new RuntimeException(e);
          }
          try {
          	os.close();
          } catch (IOException e) {
         	throw new RuntimeException(e);
          }
        }
     }
     
     public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
        ByteArrayInputStream is = new ByteArrayInputStream(bytes);
        HessianInput hi = new HessianInput(is);
        try {
          return (T) hi.readObject(clazz);
        } catch (IOException e) {
          throw new RuntimeException(e);
        } finally {
            try {
               hi.close();
            } catch (Exception e) {
            throw new RuntimeException(e);
        }
        try {
          is.close();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    }
}
  1. 编码器
package com.haopt.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HessianEncoder extends MessageToByteEncoder<User> {
    private HessianSerializer hessianSerializer = new HessianSerializer();
    protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
          byte[] bytes = hessianSerializer.serialize(msg);
          out.writeBytes(bytes);
    }
}
  1. 解码器
public class HessianDecoder extends ByteToMessageDecoder {
    private HessianSerializer hessianSerializer = new HessianSerializer();

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
            out) throws Exception {
        //复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉
        //避免出现异常:did not read anything but decoded a message
        //Netty检测没有读取任何字节就会抛出该异常
        ByteBuf in2 = in.retainedDuplicate();
        byte[] dst;
        if (in2.hasArray()) {//堆缓冲区模式
            dst = in2.array();
        } else {
            dst = new byte[in2.readableBytes()];
            in2.getBytes(in2.readerIndex(), dst);
        }
        //跳过所有的字节,表示已经读取过了
        in.skipBytes(in.readableBytes());
        //反序列化
        Object obj = hessianSerializer.deserialize(dst, User.class);
        out.add(obj);
    }
}
  1. 服务端
public class NettyHessianServer {
    public static void main(String[] args) throws Exception {
        // System.setProperty("io.netty.noUnsafe", "true");
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // ⼯作线程,线程数默认是:cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);
            //配置server通道
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
                    () {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new HessianDecoder())
                            .addLast(new ServerHandler());
                }
            }); //worker线程的处理器
            // serverBootstrap.childOption(ChannelOption.ALLOCATOR,
            UnpooledByteBufAllocator.DEFAULT);
            ChannelFuture future = serverBootstrap.bind(6677).sync();
            System.out.println("服务器启动完成。。。。。");
            //等待服务端监听端⼝关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
public class ServerHandler extends SimpleChannelInboundHandler<User> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, User user) throws
            Exception {
        //获取到user对象
        System.out.println(user);
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
}
  1. 客户端
public class NettyHessianClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new HessianEncoder());
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }
}
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
            Exception {
        System.out.println("接收到服务端的消息:" +
                msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        User user = new User();
        user.setId(1L);
        user.setName("张三");
        user.setAge(20);
        ctx.writeAndFlush(user);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

二 TCP的粘包/拆包的问题以及解决

2.1 ReplayingDecoder

  1. 自定义解码器,将buf变为int
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
      if (buf.readableBytes() < 4) {
      	return;
      }
      buf.markReaderIndex();
      int length = buf.readInt();
      if (buf.readableBytes() < length) {
        buf.resetReaderIndex();
        return;
      }
      out.add(buf.readBytes(length));
    }
}

项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
2. 使用ReplayingDecoder进行优化

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
      	out.add(buf.readBytes(buf.readInt()));
    }
}
  1. ReplayingDecoder使用说明
1 使⽤了特殊的ByteBuf,叫做ReplayingDecoderByteBuf,扩展了ByteBuf

2 重写了ByteBuf的readXxx()等⽅法,会先检查可读字节⻓度,⼀旦检测到不满⾜要求就直接抛出
  REPLAY(REPLAY继承ERROR)
  
3 ReplayingDecoder重写了ByteToMessageDecoder的callDecode()⽅法,捕获Signal并在catch块
  中重置ByteBuf的readerIndex。
  
4 继续等待数据,直到有了数据后继续读取,这样就可以保证读取到需要读取的数据。

5 类定义中的泛型S是⼀个⽤于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等⽅
  法中会⽤到。在简单解码时也可以⽤java.lang.Void来占位。
  1. 注意
1 buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等⽅法会直接抛出异常)
2 在某些情况下会影响性能(如多次对同⼀段消息解码)

继承ReplayingDecoder,错误示例和修改

//这是⼀个错误的例⼦:
//消息中包含了2个integer,代码中decode⽅法会被调⽤两次,此时队列size不等于2,这段代码达不到期望结果。
public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}
//正确的做法:
public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // Revert the state of the variable that might have been changed
        // since the last partial decode.
        values.clear();
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        // Now we know this assertion will never fail.
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}	

ByteToIntegerDecoder2的实现

public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> {
    /**
    * @param ctx 上下⽂
    * @param in 输⼊的ByteBuf消息数据
    * @param out 转化后输出的容器
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    	out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
    }
}

2.2 拆包和粘包问题重现(客户端向服务端发送十条数据)

  1. 客户端启动类
public class NettyClient {
    public static void main(String[] args) throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new ClientHandler());
                }
        	});
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
        	worker.shutdownGracefully();
        }
    }
}
  1. 客户端ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      System.out.println("接收到服务端的消息:" +
      msg.toString(CharsetUtil.UTF_8));
      System.out.println("接收到服务端的消息数量:" + (++count));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      for (int i = 0; i < 10; i++) {
      ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!",
      CharsetUtil.UTF_8));
      }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      cause.printStackTrace();
      ctx.close();
    }
}
  1. 服务端NettyServer
public class NettyServer {
    public static void main(String[] args) throws Exception {
      // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
      EventLoopGroup boss = new NioEventLoopGroup(1);
      // ⼯作线程,线程数默认是:cpu*2
      EventLoopGroup worker = new NioEventLoopGroup();
      try {
        // 服务器启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        //配置server通道
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
            .addLast(new ServerHandler());
          }
        }); //worker线程的处理器
        ChannelFuture future = serverBootstrap.bind(5566).sync();
        System.out.println("服务器启动完成。。。。。");
        //等待服务端监听端⼝关闭
        future.channel().closeFuture().sync();
      } finally {
        //优雅关闭
        boss.shutdownGracefully();
        worker.shutdownGracefully();
      }
    }
}
  1. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
      private int count;
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
          System.out.println("服务端接收到消息:" +
          msg.toString(CharsetUtil.UTF_8));
          System.out.println("服务端接收到消息数量:" + (++count));
          ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
      }
}

项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)

2.2 什么是TCP的粘包和拆包问题

TCP是流传递的,所谓流,就是没有界限的数据。服务端接受客户端数据,并不知道是一条还是多条。
服务端如何拆包并不知道。

因此服务端和客户端进行数据传递的时候,要制定好拆包规则。客户端按照该规则进行粘包,服务端
按照该规则拆包。如果有任意违背该规则,服务端就不能拿到预期的数据。

  1. 解决思路(三种)
1. 在发送的数据包中添加头,在头⾥存储数据的⼤⼩,服务端就可以按照此⼤⼩来读取数据,这样就
   知道界限在哪⾥了。
   
2. 以固定的⻓度发送数据,超出的分多次发送,不⾜的以0填充,接收端就以固定⻓度接收即可。

3. 在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。

2.3 实战:解决TCP的粘包/拆包问题

  1. 自定义协议
public class MyProtocol {
    private Integer length; //数据头:⻓度
    private byte[] body; //数据体
    public Integer getLength() {
    	return length;
    }
    public void setLength(Integer length) {
    	this.length = length;
    }
    public byte[] getBody() {
    	return body;
    }
    public void setBody(byte[] body) {
    	this.body = body;
    }
}
  1. 编码器
public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
  @Override
  protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
    out.writeInt(msg.getLength());
    out.writeBytes(msg.getBody());
  }
}
  1. 解码器
public class MyDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int length = in.readInt(); //获取⻓度
        byte[] data = new byte[length]; //根据⻓度定义byte数组
        in.readBytes(data); //读取数据
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(length);
        myProtocol.setBody(data);
        out.add(myProtocol);
    }
}
  1. 客户端ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("接收到服务端的消息:" + new String(msg.getBody(),
        CharsetUtil.UTF_8));
        System.out.println("接收到服务端的消息数量:" + (++count));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8);
            MyProtocol myProtocol = new MyProtocol();
            myProtocol.setLength(data.length);
            myProtocol.setBody(data);
            ctx.writeAndFlush(myProtocol);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
      cause.printStackTrace();
      ctx.close();
    }
}
  1. NettyClient
public class NettyClient {
    public static void main(String[] args) throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new MyEncoder());
                ch.pipeline().addLast(new MyDecoder());
                ch.pipeline().addLast(new ClientHandler());
              }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
        	worker.shutdownGracefully();
        }
    }
}
  1. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("服务端接收到消息:" + new String(msg.getBody(),
        CharsetUtil.UTF_8));
        System.out.println("服务端接收到消息数量:" + (++count));
        byte[] data = "ok".getBytes(CharsetUtil.UTF_8);
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(data.length);
        myProtocol.setBody(data);
        ctx.writeAndFlush(myProtocol);
    }
}
  1. NettyServer
public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // ⼯作线程,线程数默认是:cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);
            //配置server通道
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline()
                  .addLast(new MyDecoder())
                  .addLast(new MyEncoder())
                  .addLast(new ServerHandler());
              }
            }); //worker线程的处理器
            ChannelFuture future = serverBootstrap.bind(5566).sync();
            System.out.println("服务器启动完成。。。。。");
            //等待服务端监听端⼝关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
  1. 测试
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)

三 Netty核心源码解析

3.1 服务端启动过程刨析

  1. 创建服务端Channel
1 ServerBootstrap对象的bind()⽅法,也是⼊⼝⽅法
2 AbstractBootstrap中的initAndRegister()进⾏创建Channel
     创建Channel的⼯作由ReflectiveChannelFactory反射类中的newChannel()⽅法完成。
3 NioServerSocketChannel中的构造⽅法中,通过jdk nio底层的SelectorProvider打开ServerSocketChannel。
4 在AbstractNioChannel的构造⽅法中,设置channel为⾮阻塞:ch.configureBlocking(false);
5 通过的AbstractChannel的构造⽅法,创建了id、unsafe、pipeline内容。
6 通过NioServerSocketChannelConfig获取tcp底层的⼀些参数
  1. 初始化服务端Channel
1 AbstractBootstrap中的initAndRegister()进⾏初始化channel,代码:init(channel);

2 在ServerBootstrap中的init()⽅法设置channelOptions以及Attributes。

3 紧接着,将⽤户⾃定义参数、属性保存到局部变量currentChildOptions、currentChildAttrs,以
  供后⾯使⽤

4 如果设置了serverBootstrap.handler()的话,会加⼊到pipeline中。

5 添加连接器ServerBootstrapAcceptor,有新连接加⼊后,将⾃定义的childHandler加⼊到连接的
  pipeline中:
ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
      pipeline.addLast(
      		new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs));
    }
});
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) { 
    //当客户端有连接时才会执⾏
    final Channel child = (Channel) msg;
    //将⾃定义的childHandler加⼊到连接的pipeline中
    child.pipeline().addLast(childHandler); 
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        childGroup.register(child).addListener(new ChannelFutureListener(){
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                	forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
    	forceClose(child, t);
    }
}
  1. 注册selector
//进⾏注册
1 initAndRegister()⽅法中的ChannelFuture regFuture = config().group().register(channel); 

2 在io.netty.channel.AbstractChannel.AbstractUnsafe#register()中完成实际的注册
    2.1 AbstractChannel.this.eventLoop = eventLoop; 进⾏eventLoop的赋值操作,后续的IO事件
        ⼯作将在由该eventLoop执⾏。
     2.2 调⽤register0(promise)中的doRegister()进⾏实际的注册
    
3 io.netty.channel.nio.AbstractNioChannel#doRegister进⾏了⽅法实现
//通过jdk底层进⾏注册多路复⽤器
//javaChannel() --前⾯创建的channel
//eventLoop().unwrappedSelector() -- 获取selector
//注册感兴趣的事件为0,表明没有感兴趣的事件,后⾯会进⾏重新注册事件
//将this对象以attachment的形式注册到selector,⽅便后⾯拿到当前对象的内容
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  1. 绑定端口
1 ⼊⼝在io.netty.bootstrap.AbstractBootstrap#doBind0(),启动⼀个线程进⾏执⾏绑定端⼝操作

2 调⽤io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress,
  io.netty.channel.ChannelPromise)⽅法,再次启动线程执⾏
  
3 最终调⽤io.netty.channel.socket.nio.NioServerSocketChannel#doBind()⽅法进⾏绑定操作
//通过jdk底层的channel进⾏绑定
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
      if (PlatformDependent.javaVersion() >= 7) {
      	  javaChannel().bind(localAddress, config.getBacklog());
      } else {
          javaChannel().socket().bind(localAddress,
          config.getBacklog());
    }
}

什么时候进⾏更新selector的主从事件?
最终在io.netty.channel.nio.AbstractNioChannel#doBeginRead()⽅法中完成的

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
    	return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); //设置
        感兴趣的事件为OP_ACCEPT
    }
}
//在NioServerSocketChannel的构造⽅法中进⾏了赋值
public NioServerSocketChannel(ServerSocketChannel channel) {
      super(null, channel, SelectionKey.OP_ACCEPT);
      config = new NioServerSocketChannelConfig(this,
      javaChannel().socket());
}

3.2 连接请求过程源码刨析

  1. 新连接接入
入口在
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,
io.netty.channel.nio.AbstractNioChannel)中
    进⼊NioMessageUnsafe的read()⽅法
    
    调⽤io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() ⽅法,创建
    jdk底层的channel,封装成NioSocketChannel添加到List容器中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
          buf.add(new NioSocketChannel(this, ch));
          return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an
        accepted socket.", t);
        try {
        	ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}
创建NioSocketChannel对象
new NioSocketChannel(this, ch),通过new的⽅式进⾏创建
    调⽤super的构造⽅法
        传⼊SelectionKey.OP_READ事件标识
        创建id、unsafe、pipeline对象
        设置⾮阻塞 ch.configureBlocking(false);
    创建NioSocketChannelConfig对象
  1. 注册读事件
在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe中的:
for (int i = 0; i < size; i ++) {
    readPending = false;
    pipeline.fireChannelRead(readBuf.get(i)); //传播读事件
}
 在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)⽅
 法中
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
      try {
          //执⾏channelRead,需要注意的是,第⼀次执⾏是HeadHandler,第⼆次是
          ServerBootstrapAcceptor
          //通过ServerBootstrapAcceptor进⼊和 新连接接⼊的 注册selector相同的
          逻辑进⾏注册以及事件绑定
          ((ChannelInboundHandler) handler()).channelRead(this, msg);
      } catch (Throwable t) {
      	  invokeExceptionCaught(t);
      }
    } else {
    	fireChannelRead(msg);
    }
}

四 使用Netty优化点

4.1 零拷贝

1 Bytebuf 使⽤的是⽤池化的Direct Buffer类型使⽤的堆外内存,不需要进⾏字节缓冲区的⼆次拷
  ⻉,如果使⽤堆内存,JVM会先拷⻉到堆内,再写⼊Socket,就多了⼀次拷⻉。
  
2 CompositeByteBuf将多个ByteBuf封装成⼀个ByteBuf,在添加ByteBuf时不需要进程拷⻉。

3 Netty的⽂件传输类DefaultFileRegion的transferTo⽅法将⽂件发送到⽬标channel中,不需要进
⾏循环拷⻉,提升了性能。

4.2 EventLoop的任务调度

channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    	channel.writeAndFlush(data)
    }
});

而不是使用hannel.writeAndFlush(data);
EventLoop的任务调度直接放入到channel所对应的EventLoop的执行队列,后者会导致线程切换。
备注:在writeAndFlush的底层,如果没有通过eventLoop执行的话,就会启动新的线程。

4.3 减少ChannelPipline的调⽤⻓度

public class YourHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    //msg从整个ChannelPipline中⾛⼀遍,所有的handler都要经过。
    ctx.channel().writeAndFlush(msg);
    //从当前handler⼀直到pipline的尾部,调⽤更短。
    ctx.writeAndFlush(msg);
    }
}

4.4 减少ChannelHandler的创建(基本上不会配置)

如果channelhandler是⽆状态的(即不需要保存任何状态参数),那么使⽤Sharable注解,并在
bootstrap时只创建⼀个实例,减少GC。否则每次连接都会new出handler对象。

@ChannelHandler.Shareable
public class StatelessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {}
}
public class MyInitializer extends ChannelInitializer<Channel> {
    private static final ChannelHandler INSTANCE = new StatelessHandler();
    @Override
    public void initChannel(Channel ch) {
    	ch.pipeline().addLast(INSTANCE);
    }
}

注意:ByteToMessageDecoder之类的编解码器是有状态的,不能使⽤Sharable注解。

4.5 配置参数的设置

服务端的bossGroup只需要设置为1即可,因为ServerSocketChannel在初始化阶段,只会
注册到某⼀个eventLoop上,⽽这个eventLoop只会有⼀个线程在运⾏,所以没有必要设置为
多线程。⽽ IO 线程,为了充分利⽤ CPU,同时考虑减少线上下⽂切换的开销,通常workGroup
设置为CPU核数的两倍,这也是Netty提供的默认值。

在对于响应时间有⾼要求的场景,使⽤.childOption(ChannelOption.TCP_NODELAY, true)
和.option(ChannelOption.TCP_NODELAY, true)来禁⽤nagle算法,不等待,⽴即发送。

五 ByteBuf的api

  1. 顺序读api
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)
  2. 顺序写操作
    项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)

本文地址:https://blog.csdn.net/flowerAndJava/article/details/109908839