欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

tcp粘包处理,netty decoder学习

程序员文章站 2022-06-02 20:12:37
...

粘包概念不多阐述,看代码:

对于clientHandler:

class SendClientHandler extends SimpleChannelInboundHandler {
    private byte[] req;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        req = ("abc").getBytes();
        for (int i = 0; i < 10; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
}

连续发送10条"abc"。

对于server Handler:

class SimpleServerHandler extends SimpleChannelInboundHandler {
        int count = 0;

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            //这一步必须要求添加StringDecoder 
            String s = (String) msg;
            System.out.println("channelRead0 invoked,count is " + ++count);
            System.out.println(s);
        }
    }

收到一条就打印一条,按照常理来说,channelRead0会被触发10次,然而结果是:

channelRead0 invoked,count is 1
abcabcabcabcabcabcabcabcabcabc

可以看到一次性收到了所有abc,这就是所谓的粘包。我们需要做的就是拆开来一个个分离的报文。另外为什么channelRead0中的msg可以直接强转成String,后文会提到,需要知道的是我们必须在我们的自定义handler之前添加netty提供的StringDecoder,不添加的话是会报错强转异常的。

回到粘包的问题,怎么解决呢?我们可以在自定义Handler中自己去切分,做内部缓冲,然而更加优雅的处理方案是增加多个ChannelHandlerChannelPipeline 把一整个ChannelHandler拆分成多个模块以减少应用的复杂程度。

自定义一个解码器,继承netty提供的ByteToMessageDecoder,简单重写decode方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
             if (in.readableBytes() < 3) {
                 return;
             }
             out.add(in.readBytes(3));
         }

在pipeline中加入这个handler。这里有个问题:应该添加在StringDecoder之前还是之后?后文分析ByteToMessageDecoder源码会提到。这里直接看输出结果:

channelRead0 invoked,count is 1
abc
channelRead0 invoked,count is 2
abc
channelRead0 invoked,count is 3
abc
channelRead0 invoked,count is 4
abc
channelRead0 invoked,count is 5
abc
channelRead0 invoked,count is 6
abc
channelRead0 invoked,count is 7
abc
channelRead0 invoked,count is 8
abc
channelRead0 invoked,count is 9
abc
channelRead0 invoked,count is 10
abc

可以看到消息被正确的切分了。如果我们想一次打印2个abc呢?只需要将handler里decode中的2个3改成6即可。其实decode方法很好理解,return的意思就是现在的数据还不是我想要(能够进行切分的),我需要等待更多的数据进来。out.add则是完成了一次切分。

以上的切分属于字符串的切分,比较简单,接下来设想一种自定义报文,分为header+data,header简单的由2部分构成,固定的标志位+len(2byte),len代表data的长度,一个完整的报文长度就是1byte+1byte+len(data),这里暂定len不超过byte能表达的最大值。那decode方法应该怎么写?

从最简单的情况开始考虑,我们设定标志位是a,客户端以如下逻辑发送报文:

for (int i = 1; i < 21; i++) {
            if (i % 5 != 0) {
                req = "a4bcde".getBytes();
            } else {
                req = "a8bcdefghi".getBytes();
            }
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
            //System.out.println("send" + Arrays.toString(req));
        }

每当i是5的倍数时,发送data长度是8的报文。

其实假定客户端发送的报文都是正确的,永远是以a开头,那么我们处理粘包的逻辑将会非常简单,只需要简单的判断长度够不够,够的话解析出len的长度,读取对应长度的字节然后切分就好了,因为不论你有没有成功的完成一次切分,每次进行decode你读取的第一个字节一定是a,不可能会是其他的字节。假定你连续发送a4bcde a4bcde2个报文,当你粘在一起发过了,那么我会进行2次切分,假定你第一次发了a4b,那么我会判断出长度不够等待接下来的字节,直到剩下的字节填充完毕进行切分。所以服务端的decoder可以这样写:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           if (in.readableBytes() < 2) { //header至少有2个字节,假定可以发送空data
               return;
           }
           in.markReaderIndex();
           //标志位
           in.readByte();
           byte len = in.readByte();
           //len = 4,字符4转成byte对应52
           if (len == 52) {
               if (in.readableBytes() < 4) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(4));
           } else {
               if (in.readableBytes() < 8) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(8));
           }
        }

注意长度不够的话需要resetReaderIndex以便下次decode重新进行读取。在客户端符合要求,并且不出错的情况下,我们这样的decoder是完全可行的,可以正确的拆分出一个个报文数据。然而,可能会出现一些情况,客户端出现问题,发送了不符合要求的报文,例如中间突然发了一次rtyu的报文,那么我们的decoder就会出现问题,或者说可能会有报文攻击,发送大量非法发文,那么我们的decoder也是会出错的,考虑这两种情况的话,我们要做的就是假如出现非法报文,丢弃它。看下面的decoder:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           if (in.readableBytes() < 2) { //header至少有2个字节,假定可以发送空data
               return;
           }
           while (true) {
               if (!in.isReadable()) {
                   //当一直读完还读不到标志位,那么return,不reset,此时之前的报文被抛弃
                   return;
               }
               //此处进行mark操作,确保当发送break时能够定位到标志报文处
               in.markReaderIndex();
               byte b = in.readByte();
               System.out.println("b is"+ b);
               if (b == 97) {
                   break;
               }
           }
           in.resetReaderIndex();
           in.readByte();
           byte len = in.readByte();
           //len = 4,字符4转成byte对应52
           if (len == 52) {
               if (in.readableBytes() < 4) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(4));
           } else {
               if (in.readableBytes() < 8) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(8));
           }
        }

我们用一个while循环来解决这个问题,此时就算你发送大量垃圾报文,只要我读不到标志位,那么垃圾报文会被舍弃,一旦读到标志位,就会定位到标志位然后进行正常的读取数据逻辑。client代码改成如下:

for (int i = 1; i < 30; i++) {
            req = "plmplm".getBytes();
            if (i == 25) {
                req = "a4bcde".getBytes();
            } else if(i==28) {
                req = "a8bcdefghi".getBytes();
            }
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
            System.out.println("send" + Arrays.toString(req));
        }

只有当i是25/28时发送正确格式的报文,看服务端的输出:

4 ---msg len
server receive order : bcde;the counter is: 1
8 ---msg len
server receive order : bcdefghi;the counter is: 2

服务端接收的代码很简单:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println(body.length() + " ---msg len");
        System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
    }

 

相关标签: n