欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty的Nio写优化  

程序员文章站 2022-05-06 12:59:48
...

  今天又看了一遍boyan的《Nip trick and trip》,又有一些新的收获。比以前看地更深刻了。其中有一处写到Nio的写优化,当Nio的channel设置为nonblocking时,写操作不再阻塞,而是直接返回0,表示当时socket的写缓冲区已满不可写。通常的处理办法是返回本次loop并继续注册op_read或者op_write。这样会带来线程切换的开销,使当前select阻塞,直到有可写发生再次唤醒。Netty是怎么解决的?以前虽然看过Netty的代码但并没有深入到细节中去,看了boyanNetty采用writeSpinCount的介绍才深有感悟。好多知识都在细节里,就看我们对这个领域掌握地深不深,如果蜻蜓点水肯定发现了这些看似简单的代码的玄妙之处。话归正题,writeSpinCount如何解决这个问题的呢?类似自旋锁,还是看看Netty的代码吧,比较形象。

for (;;) {
                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf;
                if (evt == null) {
                    if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                       //写事件队列为空,此时应该去除op_write
                        removeOpWrite = true;
                        channel.writeSuspended = false;
                        break;
                    }

                    channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                } else {
                    buf = channel.currentWriteBuffer;
                }

                ChannelFuture future = evt.getFuture();
                try {
                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {//此处为writeSpinCount的解决方式
                       //向channel写数据,为了使其fileChannel的transferTo接口一致。这样从file写向socket时避免四次copy,sendfile的功能
                        localWrittenBytes = buf.transferTo(ch);
                        //写入的数据大于0,跳出本次循环,如为零则循环writeSpinCount,如再次期间localWrittenBytes>0,说明再次可写。
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        //完整地写完,则继续处理下一个消息
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        // Not written fully - perhaps the kernel buffer is full.
                        //没能完整地写完当前的message,则继续设置op_write,并跳出外层循环
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (localWrittenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    buf = null;
                    evt = null;
                    future.setFailure(t);
                    if (iothread) {
                        fireExceptionCaught(channel, t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        open = false;
                        close(channel, succeededFuture(channel));
                    }
                }
            }
            channel.inWriteNowLoop = false;

            // Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }