Netty源码分析 (八)----- write过程 源码分析
上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeandflush。
主要内容
本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的
- pipeline中的标准链表结构
- java对象编码过程
- write:写队列
- flush:刷新写队列
- writeandflush: 写队列并刷新
pipeline中的标准链表结构
一个标准的pipeline链式结构如下
java对象编码过程
为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的bytebuf,写到管道里?
我们先看下调用write
的code
businesshandler
protected void channelread0(channelhandlercontext ctx, request request) throws exception { response response = dobusiness(request); if (response != null) { ctx.channel().write(response); } }
业务处理器接受到请求之后,做一些业务处理,返回一个response
,然后,response在pipeline中传递,落到 encoder
节点,我们来跟踪一下 ctx.channel().write(response);
public channelfuture write(object msg) { return this.pipeline.write(msg); }
调用了channel中的pipeline中的write方法,我们接着看
public final channelfuture write(object msg) { return this.tail.write(msg); }
pipeline中有属性tail,调用tail中的write,由此我们知道write消息的时候,从tail开始,接着往下看
private void write(object msg, boolean flush, channelpromise promise) { abstractchannelhandlercontext next = this.findcontextoutbound(); object m = this.pipeline.touch(msg, next); eventexecutor executor = next.executor(); if (executor.ineventloop()) { if (flush) { next.invokewriteandflush(m, promise); } else { next.invokewrite(m, promise); } } else { object task; if (flush) { task = abstractchannelhandlercontext.writeandflushtask.newinstance(next, m, promise); } else { task = abstractchannelhandlercontext.writetask.newinstance(next, m, promise); } safeexecute(executor, (runnable)task, promise, m); } }
中间我省略了几个重载的方法,我们来看看第一行代码,next = this.findcontextoutbound();
private abstractchannelhandlercontext findcontextoutbound() { abstractchannelhandlercontext ctx = this; do { ctx = ctx.prev; } while(!ctx.outbound); return ctx; }
通过 ctx = ctx.prev; 我们知道从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokewrite(m, promise),此时找到的第一个outbound的handler就是我们自定义的编码器encoder
我们接着看 next.invokewrite(m, promise);
private void invokewrite(object msg, channelpromise promise) { if (this.invokehandler()) { this.invokewrite0(msg, promise); } else { this.write(msg, promise); } } private void invokewrite0(object msg, channelpromise promise) { try { ((channeloutboundhandler)this.handler()).write(this, msg, promise); } catch (throwable var4) { notifyoutboundhandlerexception(var4, promise); } }
一路代码跟下来,我们可以知道是调用了第一个outbound类型的handler中的write方法,也就是第一个调用的是我们自定义编码器encoder的write方法
我们来看看自定义encoder
public class encoder extends messagetobyteencoder<response> { @override protected void encode(channelhandlercontext ctx, response response, bytebuf out) throws exception { out.writebyte(response.getversion()); out.writeint(4 + response.getdata().length); out.writebytes(response.getdata()); } }
自定义encoder继承 messagetobyteencoder ,并且重写了 encode方法,这就是编码器的核心,我们先来看 messagetobyteencoder
public abstract class messagetobyteencoder<i> extends channeloutboundhandleradapter {
我们看到 messagetobyteencoder 继承了 channeloutboundhandleradapter,说明了 encoder 是一个 outbound的handler
我们来看看 encoder 的父类 messagetobyteencoder中的write方法
messagetobyteencoder
@override public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception { bytebuf buf = null; try { // 判断当前handelr是否能处理写入的消息 if (acceptoutboundmessage(msg)) { @suppresswarnings("unchecked") // 强制换换 i cast = (i) msg; // 分配一段butebuf buf = allocatebuffer(ctx, cast, preferdirect); try { // 调用encode,这里就调回到 `encoder` 这个handelr中 encode(ctx, cast, buf); } finally { // 既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉 // (当传入的msg类型是bytebuf的时候,就不需要自己手动释放了) referencecountutil.release(cast); } // 如果buf中写入了数据,就把buf传到下一个节点 if (buf.isreadable()) { ctx.write(buf, promise); } else { // 否则,释放buf,将空数据传到下一个节点 buf.release(); ctx.write(unpooled.empty_buffer, promise); } buf = null; } else { // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理 ctx.write(msg, promise); } } catch (encoderexception e) { throw e; } catch (throwable e) { throw new encoderexception(e); } finally { // 当buf在pipeline中处理完之后,释放 if (buf != null) { buf.release(); } } }
这里,我们详细阐述一下encoder是如何处理传入的java对象的
1.判断当前handler是否能处理写入的消息,如果能处理,进入下面的流程,否则,直接扔给下一个节点处理
2.将对象强制转换成encoder
可以处理的 response
对象
3.分配一个bytebuf
4.调用encoder,即进入到 encoder
的 encode
方法,该方法是用户代码,用户将数据写入bytebuf
5.既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉,(当传入的msg类型是bytebuf的时候,就不需要自己手动释放了)
6.如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点
7.最后,当buf在pipeline中处理完之后,释放节点
总结一点就是,encoder
节点分配一个bytebuf,调用encode
方法,将java对象根据自定义协议写入到bytebuf,然后再把bytebuf传入到下一个节点,在我们的例子中,最终会传入到head节点,因为head节点是一个outbount类型的handler
headcontext
public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception { unsafe.write(msg, promise); }
这里的msg就是前面在encoder
节点中,载有java对象数据的自定义bytebuf对象,进入下一节
write:写队列
我们来看看channel中unsafe的write方法,先来看看其中的一个属性
abstractunsafe
protected abstract class abstractunsafe implements unsafe { private volatile channeloutboundbuffer outboundbuffer = new channeloutboundbuffer(abstractchannel.this);
我们来看看 channeloutboundbuffer 这个类
public final class channeloutboundbuffer { private final channel channel; private channeloutboundbuffer.entry flushedentry; private channeloutboundbuffer.entry unflushedentry; private channeloutboundbuffer.entry tailentry;
channeloutboundbuffer内部维护了一个entry链表,并使用entry封装msg。其中的属性我们下面会详细讲
我们回到正题,接着看 unsafe.write(msg, promise);
abstractunsafe
@override public final void write(object msg, channelpromise promise) { asserteventloop(); channeloutboundbuffer outboundbuffer = this.outboundbuffer; int size; try { msg = filteroutboundmessage(msg); size = pipeline.estimatorhandle().size(msg); if (size < 0) { size = 0; } } catch (throwable t) { safesetfailure(promise, t); referencecountutil.release(msg); return; } outboundbuffer.addmessage(msg, size, promise); }
1.调用 filteroutboundmessage()
方法,将待写入的对象过滤,把非bytebuf
对象和fileregion
过滤,把所有的非直接内存转换成直接内存directbuffer
@override protected final object filteroutboundmessage(object msg) { if (msg instanceof bytebuf) { bytebuf buf = (bytebuf) msg; if (buf.isdirect()) { return msg; } return newdirectbuffer(buf); } if (msg instanceof fileregion) { return msg; } throw new unsupportedoperationexception( "unsupported message type: " + stringutil.simpleclassname(msg) + expected_types); }
2.接下来,估算出需要写入的bytebuf的size
3.最后,调用 channeloutboundbuffer
的addmessage(msg, size, promise)
方法,所以,接下来,我们需要重点看一下这个方法干了什么事情
channeloutboundbuffer
public void addmessage(object msg, int size, channelpromise promise) { // 创建一个待写出的消息节点 entry entry = entry.newinstance(msg, size, total(msg), promise); if (tailentry == null) { flushedentry = null; tailentry = entry; } else { entry tail = tailentry; tail.next = entry; tailentry = entry; } if (unflushedentry == null) { unflushedentry = entry; } incrementpendingoutboundbytes(size, false); }
想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图
channeloutboundbuffer 里面的数据结构是一个单链表结构,每个节点是一个 entry
,entry
里面包含了待写出bytebuf
以及消息回调 promise
,下面分别是三个指针的作用
1.flushedentry 指针表示第一个被写到操作系统socket缓冲区中的节点
2.unflushedentry 指针表示第一个未被写入到操作系统socket缓冲区中的节点
3.tailentry指针表示channeloutboundbuffer缓冲区的最后一个节点
初次调用 addmessage
之后,各个指针的情况为
fushedentry
指向空,unfushedentry
和 tailentry
都指向新加入的节点
第二次调用 addmessage
之后,各个指针的情况为
第n次调用 addmessage
之后,各个指针的情况为
可以看到,调用n次addmessage
,flushedentry指针一直指向null,表示现在还未有节点需要写出到socket缓冲区,而unfushedentry
之后有n个节点,表示当前还有n个节点尚未写出到socket缓冲区中去
flush:刷新写队列
不管调用channel.flush()
,还是ctx.flush()
,最终都会落地到pipeline中的head节点
headcontext
@override public void flush(channelhandlercontext ctx) throws exception { unsafe.flush(); }
之后进入到abstractunsafe
abstractunsafe
public final void flush() { asserteventloop(); channeloutboundbuffer outboundbuffer = this.outboundbuffer; if (outboundbuffer == null) { return; } outboundbuffer.addflush(); flush0(); }
flush方法中,先调用 outboundbuffer.addflush();
channeloutboundbuffer
public void addflush() { entry entry = unflushedentry; if (entry != null) { if (flushedentry == null) { flushedentry = entry; } do { flushed ++; if (!entry.promise.setuncancellable()) { int pending = entry.cancel(); decrementpendingoutboundbytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedentry = null; } }
可以结合前面的图来看,首先拿到 unflushedentry
指针,然后将 flushedentry
指向unflushedentry
所指向的节点,调用完毕之后,三个指针的情况如下所示
相当于所有的节点都即将开始推送出去
接下来,调用 flush0();
abstractunsafe
protected void flush0() { dowrite(outboundbuffer); }
发现这里的核心代码就一个 dowrite,继续跟
abstractniobytechannel
protected void dowrite(channeloutboundbuffer in) throws exception { int writespincount = -1; boolean setopwrite = false; for (;;) { // 拿到第一个需要flush的节点的数据 object msg = in.current(); if (msg instanceof bytebuf) { // 强转为bytebuf,若发现没有数据可读,直接删除该节点 bytebuf buf = (bytebuf) msg; boolean done = false; long flushedamount = 0; // 拿到自旋锁迭代次数 if (writespincount == -1) { writespincount = config().getwritespincount(); } // 自旋,将当前节点写出 for (int i = writespincount - 1; i >= 0; i --) { int localflushedamount = dowritebytes(buf); if (localflushedamount == 0) { setopwrite = true; break; } flushedamount += localflushedamount; if (!buf.isreadable()) { done = true; break; } } in.progress(flushedamount); // 写完之后,将当前节点删除 if (done) { in.remove(); } else { break; } } } }
这里略微有点复杂,我们分析一下
1.第一步,调用current()
先拿到第一个需要flush的节点的数据
channeloutboundbuffer
public object current() { entry entry = flushedentry; if (entry == null) { return null; } return entry.msg; }
2.第二步,拿到自旋锁的迭代次数
if (writespincount == -1) { writespincount = config().getwritespincount(); }
3.自旋的方式将bytebuf写出到jdk nio的channel
for (int i = writespincount - 1; i >= 0; i --) { int localflushedamount = dowritebytes(buf); if (localflushedamount == 0) { setopwrite = true; break; } flushedamount += localflushedamount; if (!buf.isreadable()) { done = true; break; } }
dowritebytes
方法跟进去
protected int dowritebytes(bytebuf buf) throws exception { final int expectedwrittenbytes = buf.readablebytes(); return buf.readbytes(javachannel(), expectedwrittenbytes); }
我们发现,出现了 javachannel()
,表明已经进入到了jdk nio channel的领域,我们来看看 buf.readbytes(javachannel(), expectedwrittenbytes);
public int readbytes(gatheringbytechannel out, int length) throws ioexception { this.checkreadablebytes(length); int readbytes = this.getbytes(this.readerindex, out, length); this.readerindex += readbytes; return readbytes; }
我们来看关键代码 this.getbytes(this.readerindex, out, length)
private int getbytes(int index, gatheringbytechannel out, int length, boolean internal) throws ioexception { this.checkindex(index, length); if (length == 0) { return 0; } else { bytebuffer tmpbuf; if (internal) { tmpbuf = this.internalniobuffer(); } else { tmpbuf = ((bytebuffer)this.memory).duplicate(); } index = this.idx(index); tmpbuf.clear().position(index).limit(index + length); //将tmpbuf中的数据写到out中 return out.write(tmpbuf); } }
我们来看看out.write(tmpbuf)
public int write(bytebuffer src) throws ioexception { ensureopen(); if (!writable) throw new nonwritablechannelexception(); synchronized (positionlock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isopen()) return 0; do { n = ioutil.write(fd, src, -1, nd); } while ((n == iostatus.interrupted) && isopen()); return iostatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert iostatus.check(n); } } }
和read实现一样,socketchannelimpl的write方法通过ioutil的write实现:关键代码 n = ioutil.write(fd, src, -1, nd);
static int write(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4) throws ioexception { //如果是directbuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送 if (var1 instanceof directbuffer) { return writefromnativebuffer(var0, var1, var2, var4); } else { //非directbuffer //获取已经读取到的位置 int var5 = var1.position(); //获取可以读到的位置 int var6 = var1.limit(); assert var5 <= var6; //申请一个原buffer可读大小的directbytebuffer int var7 = var5 <= var6 ? var6 - var5 : 0; bytebuffer var8 = util.gettemporarydirectbuffer(var7); int var10; try { var8.put(var1); var8.flip(); var1.position(var5); //通过directbuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送 int var9 = writefromnativebuffer(var0, var8, var2, var4); if (var9 > 0) { var1.position(var5 + var9); } var10 = var9; } finally { //回收分配的directbytebuffer util.offerfirsttemporarydirectbuffer(var8); } return var10; } }
代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filteroutboundmessage()
,将待写入的对象过滤,把非bytebuf
对象和fileregion
过滤,把所有的非直接内存转换成直接内存directbuffer
说明到了这一步所有的 var1 意境是直接内存directbuffer,就不需要走到
else,就不需要write两次了
4.删除该节点
节点的数据已经写入完毕,接下来就需要删除该节点
channeloutboundbuffer
public boolean remove() { entry e = flushedentry; object msg = e.msg; channelpromise promise = e.promise; int size = e.pendingsize; removeentry(e); if (!e.cancelled) { referencecountutil.saferelease(msg); safesuccess(promise); } // recycle the entry e.recycle(); return true; }
首先拿到当前被flush掉的节点(flushedentry所指),然后拿到该节点的回调对象 channelpromise
, 调用 removeentry()
方法移除该节点
private void removeentry(entry e) { if (-- flushed == 0) { flushedentry = null; if (e == tailentry) { tailentry = null; unflushedentry = null; } } else { flushedentry = e.next; } }
这里的remove是逻辑移除,只是将flushedentry指针移到下个节点,调用完毕之后,节点图示如下
writeandflush: 写队列并刷新
理解了write和flush这两个过程,writeandflush
也就不难了
public final channelfuture writeandflush(object msg) { return tail.writeandflush(msg); } public channelfuture writeandflush(object msg) { return writeandflush(msg, newpromise()); } public channelfuture writeandflush(object msg, channelpromise promise) { write(msg, true, promise); return promise; } private void write(object msg, boolean flush, channelpromise promise) { abstractchannelhandlercontext next = findcontextoutbound(); eventexecutor executor = next.executor(); if (executor.ineventloop()) { if (flush) { next.invokewriteandflush(m, promise); } else { next.invokewrite(m, promise); } } }
可以看到,最终,通过一个boolean变量,表示是调用 invokewriteandflush
,还是 invokewrite
,invokewrite
便是我们上文中的write
过程
private void invokewriteandflush(object msg, channelpromise promise) { invokewrite0(msg, promise); invokeflush0(); }
可以看到,最终调用的底层方法和单独调用 write
和 flush
是一样的
private void invokewrite(object msg, channelpromise promise) { invokewrite0(msg, promise); } private void invokeflush(object msg, channelpromise promise) { invokeflush0(msg, promise); }
由此看来,invokewriteandflush
基本等价于write
方法之后再来一次flush
总结
1.pipeline中的编码器原理是创建一个bytebuf,将java对象转换为bytebuf,然后再把bytebuf继续向前传递
2.调用write方法并没有将数据写到socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
3.writeandflush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
4.netty中的缓冲区中的bytebuf为directbytebuf
上一篇: C++ const 引用 指针
下一篇: python网络进程
推荐阅读
-
分布式定时任务(xxl-job执行器的启动过程源码分析)
-
深入解析vue 源码目录及构建过程分析
-
PHP源码分析之变量的存储过程分解
-
Netty源码分析 (四)----- ChannelPipeline
-
Netty源码分析 (三)----- 服务端启动源码分析
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
Tomcat源码分析三:Tomcat启动加载过程(一)的源码解析
-
Qt事件分发机制源码分析之QApplication对象构建过程
-
Netty源码分析之ChannelPipeline(二)—ChannelHandler的添加与删除
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)