分析Netty直接内存原理及应用
一、通常的内存模型概述
一般地,系统为了保证系统本身的安全性和健壮性,会将内存从逻辑上隔离成内核区域和用户区域,这很容易理解。因为用户行为不可控性太强,暴露得太多,就容易导致各种神奇的用法,超出系统的控制范围。当然,有的语言是支持直接控制内存的,比如c, 你可以用一个指针,访问内存中的几乎任意位置的数据(除了一些硬件地址)。而像汇编,则可以访问任意地址。而这些底层的语言,已经离我们越来越远了,它基本上和普通程序员关系不大了。
用户很多时候的编程控制,都是在用户区域进行的,比如我做一些加减乘除,如 integer a = 2; integer b = 3; integer c = a * b; 这种操作, 所有操作就是在用户空间上完成的。这些操作,不会有内核区域的介入。但是有些操作,则必须由内核进行,比如对文件的读写,就是不同设备之间的数据交换,也就是io类操作。这类操作因为有非常的难度实现,所以一定是由操作系统来完成底层的操作的。那么,第一手的数据必定要经过内核区域。然而我们的代码是跑在用户区的,那么,通常情况下,就会存在内核区数据,拷贝到用户区数据的这么一个过程。这是一个读的过程,而写的过程则是一个相反的操作,从用户区拷贝数据到内核区,然后再由内核完成io操作。
直接将内存划分为内核区与用户区,实在是太泛了,不能说错,但有一种说了等于没说的感觉。
所以,对内存的划分,还需要再细点,即所谓的内存模型或者内存区域。各语言各场景各实现自然是百家争鸣,无可厚非。但大致就是按照一定的规则,切分成不同用途的区域,然后在需要的时候向该区域进行内存分配,并保存到相应的表或者标识中,以便后续可读或不可再分配。而这其中,还有个非常重要的点是,除了知道如何分配内存之外,还要知道如何回收内存。另外,如何保证内存的可见性,也是一个内存模型需要考虑的重要话题。
具体实现就不用说了,因为没有一个放之四海而皆准的说法,我也没那能耐讲清楚这事情。大家自行脑补吧。
二、java中的直接内存原理
首先,来说说为什么java中会有直接内存这个概念?我们知道,java中有很重要的一个内存区域,即堆内存,几乎所有的对象都堆上进行分配,所以,大部分的gc工作,也是针对堆进行的。关联上一节所讲的事,堆内存我们可以划分到用户空间内存区域去。应该说,java只要将这一块内存管理好了,基本上就可以管理好java的对象的生命周期了。那么,到底什么直接内存?和堆内存又有啥关系?
直接内存是脱离掉堆空间的,它不属于java的堆,其他区域也不属于,即直接内存不受jvm管控。它属于受系统直接控制的一段内存区域。
为什么直接内存要脱离jvm的管控呢?因为jvm管控的是用户空间,而有的场景则必须要内核空间的介入,整个过程才能完成。而如果用户空间想要获取数据,则必须要像内核中请求复制数据,数据才对用户空间可见。而很多这种场景,复制数据的目的,仅仅是为了使用一次其数据,做了相应的转换后,就不再使用有关系,比如流数据的接入过程。这个复制的过程,则必定有不少的性能损耗,所以就有直接内存的出现。它的目的在于避免内核空间和用户空间之间进行无意义的数据复制,从而提升程序性能。
直接内存不受jvm管控,那么它受谁的管控呢?实际上,是由操作系统的底层进行管控的,在进行内存分配请求时,系统会申请一段共享区域。由内核和用户代码共享这里的数据写入,即内核写入的数据,用户代码可以直接访问,用户代码写入的数据,内核可以直接使用。在底层,是由mmap这种函数接口来实现的共享内存的。
而在java层面,则是使用directbytebuffer来呈现的,它的创建、使用、删除如下:
// 创建直接内存空间实例 bytebuffer buffer = bytebuffer.allocatedirect(1600); for (int i = 0; i < 90_0000; i++) { for (int j = 0; j < 199; j++) { // 数据的写入 buffer.putint(j); } buffer.flip(); for (int j = 0; j < 199; j++) { // 数据的读取 buffer.get(); } // 数据清理 buffer.clear(); }
三、netty中使用直接内存
知道了直接内存的使用过程,那么如何找到更好的场景,则是需要我们去发现的。netty作为一个高性能网络通信框架,重要的工作就是在处理网络io问题。那么,在它的场景里,使用上直接内存这一大杀器,则是再好不过了。那么,netty是如何利用它的呢?
两个场景:1. 向应用传递网络数据时(读过程); 2. 应用向远端传递数据时(写过程);
// 写过程,将msg转换为直接内存存储的二进制数据 // io.netty.handler.codec.messagetobyteencoder#write @override public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception { bytebuf buf = null; try { if (acceptoutboundmessage(msg)) { @suppresswarnings("unchecked") i cast = (i) msg; // 默认 preferdirect = true; buf = allocatebuffer(ctx, cast, preferdirect); try { // 调用子类的实现,编码数据,以便实现私有协议 encode(ctx, cast, buf); } finally { referencecountutil.release(cast); } if (buf.isreadable()) { // 写数据到远端 ctx.write(buf, promise); } else { buf.release(); ctx.write(unpooled.empty_buffer, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (encoderexception e) { throw e; } catch (throwable e) { throw new encoderexception(e); } finally { if (buf != null) { buf.release(); } } } // io.netty.handler.codec.messagetobyteencoder#allocatebuffer /** * allocate a {@link bytebuf} which will be used as argument of {@link #encode(channelhandlercontext, i, bytebuf)}. * sub-classes may override this method to return {@link bytebuf} with a perfect matching {@code initialcapacity}. */ protected bytebuf allocatebuffer(channelhandlercontext ctx, @suppresswarnings("unused") i msg, boolean preferdirect) throws exception { if (preferdirect) { // pooledbytebufallocator return ctx.alloc().iobuffer(); } else { return ctx.alloc().heapbuffer(); } } // io.netty.buffer.abstractbytebufallocator#iobuffer() @override public bytebuf iobuffer() { if (platformdependent.hasunsafe()) { return directbuffer(default_initial_capacity); } return heapbuffer(default_initial_capacity); } // io.netty.buffer.abstractbytebufallocator#directbuffer(int) @override public bytebuf directbuffer(int initialcapacity) { return directbuffer(initialcapacity, default_max_capacity); } @override public bytebuf directbuffer(int initialcapacity, int maxcapacity) { if (initialcapacity == 0 && maxcapacity == 0) { return emptybuf; } validate(initialcapacity, maxcapacity); return newdirectbuffer(initialcapacity, maxcapacity); } // io.netty.buffer.pooledbytebufallocator#newdirectbuffer @override protected bytebuf newdirectbuffer(int initialcapacity, int maxcapacity) { poolthreadcache cache = threadcache.get(); poolarena<bytebuffer> directarena = cache.directarena; final bytebuf buf; if (directarena != null) { buf = directarena.allocate(cache, initialcapacity, maxcapacity); } else { buf = platformdependent.hasunsafe() ? unsafebytebufutil.newunsafedirectbytebuf(this, initialcapacity, maxcapacity) : new unpooleddirectbytebuf(this, initialcapacity, maxcapacity); } return toleakawarebuffer(buf); } // io.netty.buffer.poolarena#allocate(io.netty.buffer.poolthreadcache, int, int) pooledbytebuf<t> allocate(poolthreadcache cache, int reqcapacity, int maxcapacity) { pooledbytebuf<t> buf = newbytebuf(maxcapacity); allocate(cache, buf, reqcapacity); return buf; } // io.netty.buffer.poolarena.directarena#newbytebuf @override protected pooledbytebuf<bytebuffer> newbytebuf(int maxcapacity) { if (has_unsafe) { return pooledunsafedirectbytebuf.newinstance(maxcapacity); } else { return pooleddirectbytebuf.newinstance(maxcapacity); } } private void allocate(poolthreadcache cache, pooledbytebuf<t> buf, final int reqcapacity) { final int normcapacity = normalizecapacity(reqcapacity); if (istinyorsmall(normcapacity)) { // capacity < pagesize int tableidx; poolsubpage<t>[] table; boolean tiny = istiny(normcapacity); if (tiny) { // < 512 if (cache.allocatetiny(this, buf, reqcapacity, normcapacity)) { // was able to allocate out of the cache so move on return; } tableidx = tinyidx(normcapacity); table = tinysubpagepools; } else { if (cache.allocatesmall(this, buf, reqcapacity, normcapacity)) { // was able to allocate out of the cache so move on return; } tableidx = smallidx(normcapacity); table = smallsubpagepools; } final poolsubpage<t> head = table[tableidx]; /** * synchronize on the head. this is needed as {@link poolchunk#allocatesubpage(int)} and * {@link poolchunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final poolsubpage<t> s = head.next; if (s != head) { assert s.donotdestroy && s.elemsize == normcapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initbufwithsubpage(buf, handle, reqcapacity); inctinysmallallocation(tiny); return; } } synchronized (this) { allocatenormal(buf, reqcapacity, normcapacity); } inctinysmallallocation(tiny); return; } if (normcapacity <= chunksize) { if (cache.allocatenormal(this, buf, reqcapacity, normcapacity)) { // was able to allocate out of the cache so move on return; } synchronized (this) { allocatenormal(buf, reqcapacity, normcapacity); ++allocationsnormal; } } else { // huge allocations are never served via the cache so just call allocatehuge allocatehuge(buf, reqcapacity); } } // io.netty.util.internal.platformdependent0#newdirectbuffer static bytebuffer newdirectbuffer(long address, int capacity) { objectutil.checkpositiveorzero(capacity, "capacity"); try { return (bytebuffer) direct_buffer_constructor.newinstance(address, capacity); } catch (throwable cause) { // not expected to ever throw! if (cause instanceof error) { throw (error) cause; } throw new error(cause); } }
向bytebuffer中写入数据过程, 即是向直接内存中写入数据的过程,它可能不像普通的堆对象一样简单咯。
// io.netty.buffer.abstractbytebuf#writebytes(byte[]) @override public bytebuf writebytes(byte[] src) { writebytes(src, 0, src.length); return this; } @override public bytebuf writebytes(byte[] src, int srcindex, int length) { ensurewritable(length); setbytes(writerindex, src, srcindex, length); writerindex += length; return this; } // io.netty.buffer.pooledunsafedirectbytebuf#setbytes(int, byte[], int, int) @override public bytebuf setbytes(int index, byte[] src, int srcindex, int length) { // addr() 将会得到一个内存地址 unsafebytebufutil.setbytes(this, addr(index), index, src, srcindex, length); return this; } // io.netty.buffer.pooledunsafedirectbytebuf#addr private long addr(int index) { return memoryaddress + index; } // io.netty.buffer.unsafebytebufutil#setbytes(io.netty.buffer.abstractbytebuf, long, int, byte[], int, int) static void setbytes(abstractbytebuf buf, long addr, int index, byte[] src, int srcindex, int length) { buf.checkindex(index, length); if (length != 0) { // 将字节数据copy到directbytebuffer中 platformdependent.copymemory(src, srcindex, addr, length); } } // io.netty.util.internal.platformdependent#copymemory(byte[], int, long, long) public static void copymemory(byte[] src, int srcindex, long dstaddr, long length) { platformdependent0.copymemory(src, byte_array_base_offset + srcindex, null, dstaddr, length); } // io.netty.util.internal.platformdependent0#copymemory(java.lang.object, long, java.lang.object, long, long) static void copymemory(object src, long srcoffset, object dst, long dstoffset, long length) { //unsafe.copymemory(src, srcoffset, dst, dstoffset, length); while (length > 0) { long size = math.min(length, unsafe_copy_threshold); // 最终由jvm的本地方法,进行内存的copy, 此处dst为null, 即数据只会copy到对应的 dstoffset 中 // 偏移基数就是: 各种基础地址 array_object_base_offset... unsafe.copymemory(src, srcoffset, dst, dstoffset, size); length -= size; srcoffset += size; dstoffset += size; } }
可以看到,最后直接内存的写入,是通过 unsafe 类,对操作系统进行内存数据的写入的。
最后,来看下它如何将写数据到远端:
// io.netty.channel.abstractchannelhandlercontext#write(java.lang.object, io.netty.channel.channelpromise) @override public channelfuture write(final object msg, final channelpromise promise) { if (msg == null) { throw new nullpointerexception("msg"); } try { if (isnotvalidpromise(promise, true)) { referencecountutil.release(msg); // cancelled return promise; } } catch (runtimeexception e) { referencecountutil.release(msg); throw e; } write(msg, false, promise); return promise; } private void write(object msg, boolean flush, channelpromise promise) { abstractchannelhandlercontext next = findcontextoutbound(); final object m = pipeline.touch(msg, next); eventexecutor executor = next.executor(); if (executor.ineventloop()) { if (flush) { next.invokewriteandflush(m, promise); } else { next.invokewrite(m, promise); } } else { abstractwritetask task; if (flush) { task = writeandflushtask.newinstance(next, m, promise); } else { task = writetask.newinstance(next, m, promise); } safeexecute(executor, task, promise, m); } } private void invokewrite(object msg, channelpromise promise) { if (invokehandler()) { invokewrite0(msg, promise); } else { write(msg, promise); } } private void invokewrite0(object msg, channelpromise promise) { try { ((channeloutboundhandler) handler()).write(this, msg, promise); } catch (throwable t) { notifyoutboundhandlerexception(t, promise); } } // io.netty.channel.defaultchannelpipeline.headcontext#write @override public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception { unsafe.write(msg, promise); } // io.netty.channel.abstractchannel.abstractunsafe#write @override public final void write(object msg, channelpromise promise) { asserteventloop(); channeloutboundbuffer outboundbuffer = this.outboundbuffer; if (outboundbuffer == null) { // if the outboundbuffer is null we know the channel was closed and so // need to fail the future right away. if it is not null the handling of the rest // will be done in flush0() // see https://github.com/netty/netty/issues/2362 safesetfailure(promise, write_closed_channel_exception); // release message now to prevent resource-leak referencecountutil.release(msg); return; } int size; try { // 转换msg为直接内存,如有必要 msg = filteroutboundmessage(msg); size = pipeline.estimatorhandle().size(msg); if (size < 0) { size = 0; } } catch (throwable t) { safesetfailure(promise, t); referencecountutil.release(msg); return; } // 将msg放入outboundbuffer中,即相当于写完了数据 outboundbuffer.addmessage(msg, size, promise); } // io.netty.channel.nio.abstractniobytechannel#filteroutboundmessage @override protected final object filteroutboundmessage(object msg) { if (msg instanceof bytebuf) { bytebuf buf = (bytebuf) msg; if (buf.isdirect()) { return msg; } return newdirectbuffer(buf); } if (msg instanceof fileregion) { return msg; } throw new unsupportedoperationexception( "unsupported message type: " + stringutil.simpleclassname(msg) + expected_types); } // io.netty.channel.channeloutboundbuffer#addmessage /** * add given message to this {@link channeloutboundbuffer}. the given {@link channelpromise} will be notified once * the message was written. */ public void addmessage(object msg, int size, channelpromise promise) { entry entry = entry.newinstance(msg, size, total(msg), promise); if (tailentry == null) { flushedentry = null; } else { entry tail = tailentry; tail.next = entry; } tailentry = entry; if (unflushedentry == null) { unflushedentry = entry; } // increment pending bytes after adding message to the unflushed arrays. // see https://github.com/netty/netty/issues/1619 // 如有必要,立即触发 firechannelwritabilitychanged 事件,从而使立即向网络写入数据 incrementpendingoutboundbytes(entry.pendingsize, false); }
大概就是说,通过直接内存写好的数据,只需要再调用下内核的接入接口,将直接内存的数据放入缓冲,就可以被发送到远端了。
最后,我们来看下简要netty对于网络数据的接入读取过程,以辨别是否使用了直接内存,以及是如何使用的。
// io.netty.channel.nio.abstractniobytechannel.niobyteunsafe#read @override public final void read() { final channelconfig config = config(); final channelpipeline pipeline = pipeline(); final bytebufallocator allocator = config.getallocator(); final recvbytebufallocator.handle allochandle = recvbufallochandle(); allochandle.reset(config); bytebuf bytebuf = null; boolean close = false; try { do { // 分配创建bytebuffer, 此处实际就是直接内存的体现 bytebuf = allochandle.allocate(allocator); // 将数据读取到bytebuffer中 allochandle.lastbytesread(doreadbytes(bytebuf)); if (allochandle.lastbytesread() <= 0) { // nothing was read. release the buffer. bytebuf.release(); bytebuf = null; close = allochandle.lastbytesread() < 0; break; } allochandle.incmessagesread(1); readpending = false; // 读取到一部分数据,就向pipeline的下游传递,而非全部完成后再传递 pipeline.firechannelread(bytebuf); bytebuf = null; } while (allochandle.continuereading()); allochandle.readcomplete(); pipeline.firechannelreadcomplete(); if (close) { closeonread(pipeline); } } catch (throwable t) { handlereadexception(pipeline, bytebuf, t, close, allochandle); } finally { // check if there is a readpending which was not processed yet. // this could be for two reasons: // * the user called channel.read() or channelhandlercontext.read() in channelread(...) method // * the user called channel.read() or channelhandlercontext.read() in channelreadcomplete(...) method // // see https://github.com/netty/netty/issues/2254 if (!readpending && !config.isautoread()) { removereadop(); } } } } // io.netty.channel.defaultmaxmessagesrecvbytebufallocator.maxmessagehandle#allocate @override public bytebuf allocate(bytebufallocator alloc) { return alloc.iobuffer(guess()); } // io.netty.buffer.abstractbytebufallocator#iobuffer(int) @override public bytebuf iobuffer(int initialcapacity) { if (platformdependent.hasunsafe()) { return directbuffer(initialcapacity); } return heapbuffer(initialcapacity); }
可见同样,在接入数据时,仍然使用直接内存进行数据接收,从而达到内核与用户共享,无需拷贝的目的。
以上,就是netty对整个直接内存的操作方式了。看起来有点复杂,主要netty到处都是其设计哲学的体现,无论是一个写事件、读事件、或者是状态变更事件,都是一长串的流水线操作。当然了,我们此处讨论的是,其如何使用直接内存的。它通过使用一个 pooledunsafedirectbytebuf , 最终引用jdk的 direct = bytebuffer.allocatedirect(1); 使用 directbytebuffer 实现直接内存的使用。并使用其构造方法 directbytebuffer(long addr, int cap) 进行直接内存对象创建。
四、总结
从整体上来说,直接内存减少了进行io时的内存复制操,但其仅为内核与用户空间的内存复制,因为用户空间的数据复制是并不可少的,因为最终它们都必须要转换为二进制流,才能被不同空间的程序读取。但创建直接内存对象的开销要高于创建普通内存对象,因为它可能需要维护更复杂的关系环境。事实上,直接内存可以做到不同进程间的内存共享,而这在普通对象内存中是无法做到的(不过java是单进程的,不care此场景)。java的直接内存的使用,仅为使用系统提供的一个便捷接口,适应更好的场景。
直接内存实际上也可以叫共享内存,它可以实现不同进程之间的通信,即不同进程可以看到其他进程对本块内存地址的修改。这是一种高效的进程间通信方式,这对于多进程应用很有帮助。但对于多线程应用则不是必须,因为多线程本身就是共享内存的。而类似于nginx之类的应用,则非常有用了。因为对于一些全局计数器,必然需要多进程维护,通过共享内存完美解决。
而netty作为一个网络通信框架,则是为了更好处理具体场景,更合理的使用了直接内存,从而成就了所谓的零拷贝,高性能的基石之一。所以,一个好的框架,一定是解决某类问题的翘楚,它不一定是功能开创者,但一定是很好的继承者。
另外,内存管理是个非常复杂的问题。 但又很重要,值得我们花大量时间去研究。
以上就是分析netty直接内存原理及应用的详细内容,更多关于netty 直接内存原理的资料请关注其它相关文章!