欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty分析(二) -- 数据接收及报文处理

程序员文章站 2022-07-12 18:54:50
...

上篇博文我们分析了netty的启动流程。
详细见netty分析(一) -- 服务启动流程。这篇文章,我们来分析下netty的数据处理。

上篇讲到在bossGroup的NioEventLoop中的processSelectedKey函数中会调用unsafe.read()来执行NioServerSocketChannel的的accept操作。
在workerGroup中,NioEventLoop的processSelectedKey函数中会执行socket的数据读取操作,让我们来看一下。

1.读取客户端数据过程

processSelectedKey

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

NioSocketChannel的read方法

        public void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {   
                int byteBufCapacity = allocHandle.guess();
                int totalReadAmount = 0;
                do {
                    byteBuf = allocator.ioBuffer(byteBufCapacity);
                    int writable = byteBuf.writableBytes();
                    int localReadAmount = doReadBytes(byteBuf);
                    if (localReadAmount <= 0) {
                        // not was read release the buffer
                        byteBuf.release();
                        close = localReadAmount < 0;
                        break;
                    }

                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;

                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);

                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }

逻辑还是比较清晰的,核心操作是这两句"int localReadAmount = doReadBytes(byteBuf);"以及"pipeline.fireChannelReadComplete();"。
前者负责将数据从底层读入ByteBuf,后者负责将数据转交pipline处理。

DefaultChannelPipeline的fireChannelRead实现

    public ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);
        return this;
    }

ChannelPipeline

每个socket绑定了一个pipline,pipline内部维护了一个可迭代的ChannelHandlerContext链表,用于处理io数据,以head标识头部,tail标识尾部。
当我们初始化通道,调用pipline的addLast方法塞入一个ChannelHandler时,实际对handler各个方法是否有@Skip注解做了标记,封装成一个ChannelHandlerContext放入链表的尾部。
这样有数据触发的时候,pipline会从链表首部开始迭代,找到对应能处理相应逻辑的handler进行处理。如果当前的handler需要将数据丢给下一层handler进行处理,需要调用ChannelHandlerContext的fireXXX方法将数据传递下去。

进一步查看DefaultChannelHandlerContext的fireChannelRead方法。

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) {
        DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
        next.invoker.invokeChannelRead(next, msg);
        return this;
    }

这里的next上下文是根据handler中未携带@Skip的注解来查找最近的链表节点。进一步查看invokeChannelRead方法。

    public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        if (executor.inEventLoop()) {
            invokeChannelReadNow(ctx, msg);
        } else {
            safeExecuteInbound(new Runnable() {
                @Override
                public void run() {
                    invokeChannelReadNow(ctx, msg);
                }
            }, msg);
        }
    }

由于当前是workerGroup中的EventLoop线程,走进invokeChannelReadNow。

    public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
        try {
            ctx.handler().channelRead(ctx, msg);
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);
        }
    }

走到这里,终于和用户代码联系起来了。当接收到客户端数据会调用用户设置的ChannelHandler的channelRead方法。这个很重要。下面我们分析下常见的ChannelHandler。

2 常见的ChannelHandler

由于数据处理的复杂性,Netty针对常见的应用场景给我们封装了一系列的ChannelHandler。先介绍ByteToMessageDecoder的子类。

2.1 ByteToMessageDecoder

2.1.1.LineBasedFrameDecoder

LineBasedFrameDecoder用于以换行符做解码分割符的场景。我们来查看其实现。根据上面的分析,当数据来了首先调用到channelRead方法。我们查看其实现

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);
                    data.release();
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

注意:Netty框架并不缓存数据,所以当有未处理完的半包,我们需要自己存起来。
ByteToMessageDecoder如其名,只处理ByteBuf类型的输入数据,内部有个cumulation用于缓存半包(确切的说是若干个已读的全包加一个半包).如果每次解码以后,恰好处理完读入的字节没有剩余半包,那么清空cumulation。读入的新数据会附加到cumulation的尾部,如果cumulation剩下的空间不够写入了,则会对cumulation重新分配内存,新的内存大小正好是需要的字节数。
接着对读入的数据进行分包,详细代码如下:

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while(true) {
                if (in.isReadable()) {
                    int outSize = out.size();
                    int oldInputLength = in.readableBytes();
                    this.decode(ctx, in, out);
                    if (!ctx.isRemoved()) {
                        if (outSize == out.size()) {
                            if (oldInputLength != in.readableBytes()) {
                                continue;
                            }
                        } else {
                            if (oldInputLength == in.readableBytes()) {
                                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
                            }

                            if (!this.isSingleDecode()) {
                                continue;
                            }
                        }
                    }
                }

                return;
            }
        } catch (DecoderException var6) {
            throw var6;
        } catch (Throwable var7) {
            throw new DecoderException(var7);
        }
    }

接下来会调用子类的decode方法进行进一步解码,查看LineBasedFrameDecoder的实现:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        int eol = findEndOfLine(buffer);
        int length;
        int length;
        if (!this.discarding) {
            if (eol >= 0) {
                length = eol - buffer.readerIndex();
                int delimLength = buffer.getByte(eol) == 13 ? 2 : 1;
                if (length > this.maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    this.fail(ctx, length);
                    return null;
                } else {
                    ByteBuf frame;
                    if (this.stripDelimiter) {
                        frame = buffer.readBytes(length);
                        buffer.skipBytes(delimLength);
                    } else {
                        frame = buffer.readBytes(length + delimLength);
                    }

                    return frame;
                }
            } else {
                length = buffer.readableBytes();
                if (length > this.maxLength) {
                    this.discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    this.discarding = true;
                    if (this.failFast) {
                        this.fail(ctx, "over " + this.discardedBytes);
                    }
                }

                return null;
            }
        } else {
            if (eol >= 0) {
                length = this.discardedBytes + eol - buffer.readerIndex();
                length = buffer.getByte(eol) == 13 ? 2 : 1;
                buffer.readerIndex(eol + length);
                this.discardedBytes = 0;
                this.discarding = false;
                if (!this.failFast) {
                    this.fail(ctx, length);
                }
            } else {
                this.discardedBytes = buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
            }

            return null;
        }
    }

以"\r\n" 或"\n"做分隔符,如果超过了配置的最大长度还没有读到结束符,将调用fireExceptionCaught告知后续节点解码发生异常。

2.1.2.DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder支持用户以多种特定的分割符来对原始ByteBuf解码。如果是换行符"\n"和"\r\n",且不是DelimiterBasedFrameDecoder的子类,功能和LineBasedFrameDecoder一模一样,内部创建一个LineBasedFrameDecoder对象来处理。

2.1.3.FixedLengthFrameDecoder

定长解码器,没什么好说的。

2.1.4.LengthFieldBasedFrameDecoder

这个比较经常用到,先看构造函数。

public LengthFieldBasedFrameDecoder(
            ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast)

第一个参数是网络传输大小端,默认大端,最后一个参数是解码异常了是否快速报告。先跳过这两个参数,着重讲下其他

  • maxFrameLength 似有协议最大的包长度

  • lengthFieldOffset 私有协议都有一个header,header中有若干个字节是代表数据总长度的,这个字段代表header报文长度字段相对于第一个字节的偏移量。

  • lengthFieldLength 用多少个字节表示Header报文的长度,即从包头开始lengthFieldOffset ~lengthFieldOffset+lengthFieldLength 的数据代表报文总长度。

  • lengthAdjustment header中的报文总长度与body长度的差值,设body+header的实际长度是L,body长度是Lbody那么应该有公式,valueof(lengthFieldOffset ~lengthFieldOffset+lengthFieldLength) - lengthAdjustment= Lbody;

2.2.MessageToMessageDecoder

功能和ByteToMessageDecoder类似,但是支持了泛型的输入数据。

2.3.IdleStateHandler

IdleStateHandler在超过指定的时间通道上未发生读/写/(读和写)事件时,将借助NioEventLoop的定时器功能触发定时任务,满足条件时发出fireUserEventTriggered事件。通常我们可以用这个特性用来控制心跳等逻辑。