深入研究Netty框架之ByteBuf功能原理及源码分析
ByteBuf功能原理
ByteBuf是一个byte数组的缓冲区,通过两个位置指针完成缓冲区的读写操作,读操作使用readerIndex,写操作使用writeIndex。
readerIndex和writeIndex初始取值均为0,写入数据,writeIndex增加;读取数据则readerIndex增加。0~readerIndex之间的数据是已经读取的,调用discardReadBytes()可释放这部分空间,其作用类似于JDK ByteBuffer的compact()方法;readerIndex~writeIndex之间的数据是可读取的,等价于ByteBuffer position和limit之间的数据;writeIndex和capacity之间的空间是可写入的,等价于ByteBuffer limit和capacity之间的可用空间;调用clear()可重置readerIndex和writeIndex为0,但该操作不会清理buffer中的内容。
初始分配的ByteBuf:
+-------------------------------------------------------+ | writable bytes | +-------------------------------------------------------+ | | 0 = readerIndex = writerIndex capacity
写入N个字节后的ByteBuf:
+-------------------------------------+------------------+ | readable bytes | writable bytes | | (CONTENT) | | +-------------------------------------+------------------+ | | | 0 = readerIndex N = writerIndex <= capacity
读取M(<=N)个字节后的ByteBuf:
+-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | | | (CONTENT) | | +-------------------+------------------+------------------+ | | | | 0 M = readerIndex <= N = writerIndex <= capacity
调用discardReadBytes()方法之后的ByteBuf:
+-------------------+---------------------+ | readable bytes | writable bytes | +-------------------+---------------------+ | | | 0 = readerIndex N-M = writerIndex <= capacity
调用clear()方法之后的ByteBuf:
+-------------------------------------------------------+ | writable bytes | +-------------------------------------------------------+ | | 0 = readerIndex = writerIndex capacity
ByteBuf 动态扩展
通常情况下,当对JDK ByteBuffer进行put操作时,如果缓冲区可写空间不够,就会抛出BufferOverflowException异常。为了避免这个问题,在进行put操作时,需要对可写空间进行判断,如果剩余可写空间不足,需要创建一个新ByteBuffer,并将之前ByteBuffer的内容复制到新创建的ByteBuffer中,然后释放老的ByteBuffer。
//needSize为需要写入的字节数
if(this.buffer.remaining()<needSize){
int realAllocateSize=needSize>128 ? needSize:128;
ByteBuffer newBuffer=ByteBuffer.allocate(this.buffer.capacity()+realAllocateSize);
this.buffer.flip();
newBuffer.put(this.buffer);
this.buffer=newBuffer;
}
为防止ByteBuffer溢出,每次进行put操作都需要进行可写空间校验,这导致了代冗余。
为了解决这个问题,ByteBuf对write方法进行了封装,由write操作负责进行剩余可用空间的校验,当空间不足时,由ByteBuf自动进行动态扩展(不超过maxCapacity),使用者无需关心底层的校验和动态扩展细节。
源码如下:
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
当执行writeBytes时,先调用ensureWritable(length)进行可写空间的校验。
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
当需要写入的字节数大于缓冲区最大可写字节数时,ByteBuf自动进行动态扩展。calculateNewCapacity(writerIndex + minWritableBytes)方法用于计算缓冲区新的容量,capacity(newCapacity)则用于实现动态扩展,后面会详细介绍其源码。
ByteBuf 主要API
顺序读操作(read)
方法名称 | 返回值 | 功能说明 | 抛出异常 |
readBoolean() | boolean |
从readerIndex开始读取1字节的数据 |
throws IndexOutOfBoundsException readableBytes<1 |
readByte() | byte | 从readerIndex开始读取1字节的数据 |
throws IndexOutOfBoundsException readableBytes<1 |
readUnsignedByte() | short | 从readerIndex开始读取1字节的数据(无符号字节值) |
throws IndexOutOfBoundsException: readableBytes<1 |
readShort() | short | 从readerIndex开始读取16位的短整形值 |
throws IndexOutOfBoundsException: readableBytes<2 |
readUnsignedShort() | int | 从readerIndex开始读取16位的无符号短整形值 |
throws IndexOutOfBoundsException: readableBytes<2 |
readMedium() | int | 从readerIndex开始读取24位的整形值,(该类型并非java基本类型,通常不用) |
throws IndexOutOfBoundsException: readableBytes<3 |
readUnsignedMedium() | int | 从readerIndex开始读取24位的无符号整形值,(该类型并非java基本类型,通常不用) |
throws IndexOutOfBoundsException: readableBytes<3 |
readInt() | int | 从readerIndex开始读取32位的整形值 |
throws IndexOutOfBoundsException: readableBytes<4 |
readUnsignedInt() | long | 从readerIndex开始读取32位的无符号整形值 |
throws IndexOutOfBoundsException: readableBytes<4 |
readLong() | long | 从readerIndex开始读取64位的整形值 |
throws IndexOutOfBoundsException: readableBytes<8 |
readChar() | char | 从readerIndex开始读取2字节的字符值 |
throws IndexOutOfBoundsException: readableBytes<2 |
readFloat() | float | 从readerIndex开始读取32位的浮点值 |
throws IndexOutOfBoundsException: readableBytes<4 |
readDouble() | double | 从readerIndex开始读取64位的浮点值 |
throws IndexOutOfBoundsException: readableBytes<8 |
readBytes(int length) | ByteBuf |
将当前ByteBuf中的数据读取到新创建的ByteBuf中,从readerIndex开始读取length字节的数据。返回的ByteBuf readerIndex 为0,writeIndex为length。 |
throws IndexOutOfBoundsException: readableBytes<length |
readSlice(int length) | ByteBuf | 返回当前ByteBuf新创建的子区域,子区域和原ByteBuf共享缓冲区的内容,但独立维护自己的readerIndex和writeIndex,新创建的子区域readerIndex 为0,writeIndex为length。 |
throws IndexOutOfBoundsException: readableBytes<length |
readBytes(ByteBuf dst) | ByteBuf |
将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从当前ByteBuf readerIndex开始读取,直到目标ByteBuf无可写空间,从目标ByteBuf writeIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=读取的字节数。目标ByteBuf的writeIndex+=读取的字节数。 |
throws IndexOutOfBoundsException: this.readableBytes<dst.writableBytes |
readBytes(ByteBuf dst, int length) | ByteBuf | 将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从当前ByteBuf readerIndex开始读取,长度为length,从目标ByteBuf writeIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=length,目标ByteBuf的writeIndex+=length |
throws IndexOutOfBoundsException: this.readableBytes<length or dst.writableBytes<length |
readBytes(ByteBuf dst, int dstIndex, int length) | ByteBuf | 将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从readerIndex开始读取,长度为length,从目标ByteBuf dstIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=length,目标ByteBuf的writeIndex+=length |
throws IndexOutOfBoundsException: dstIndex<0 or this.readableBytes<length or dst.capacity<dstIndex + length |
readBytes(byte[] dst) | ByteBuf | 将当前ByteBuf中的数据读取到byte数组dst中,从当前ByteBuf readerIndex开始读取,读取长度为dst.length,从byte数组dst索引0处开始写入数据。 |
throws IndexOutOfBoundsException: this.readableBytes<dst.length |
readBytes(byte[] dst, int dstIndex, int length) | ByteBuf | 将当前ByteBuf中的数据读取到byte数组dst中,从当前ByteBuf readerIndex开始读取,读取长度为length,从byte数组dst索引dstIndex处开始写入数据。 |
throws IndexOutOfBoundsException: dstIndex<0 or this.readableBytes<length or dst.length<dstIndex + length |
readBytes(ByteBuffer dst) | ByteBuf | 将当前ByteBuf中的数据读取到ByteBuffer dst中,从当前ByteBuf readerIndex开始读取,直到dst的位置指针到达ByteBuffer 的limit。读取完成后,当前ByteBuf的readerIndex+=dst.remaining() |
throws IndexOutOfBoundsException: this.readableBytes<dst.remaining() |
readBytes(OutputStream out, int length) | ByteBuf | 将当前ByteBuf readerIndex读取数据到输出流OutputStream中,读取的字节长度为length |
throws IndexOutOfBoundsException: this.readableBytes<length throws IOException |
readBytes(GatheringByteChannel out, int length) | int | 将当前ByteBuf readerIndex读取数到GatheringByteChannel 中,写入out的最大字节长度为length。GatheringByteChannel为非阻塞Channel,调用其write方法不能够保存将全部需要写入的数据均写入成功,存在半包问题。因此其写入的数据长度为【0,length】,如果操作成功,readerIndex+=实际写入的字节数,返回实际写入的字节数 |
throws IndexOutOfBoundsException: this.readableBytes<length throws IOException |
顺序写操作(write)
方法名称 | 返回值 | 功能说明 | 抛出异常 |
writeBoolean(boolean value) | ByteBuf |
将value写入到当前ByteBuf中。写入成功,writeIndex+=1 |
throws IndexOutOfBoundsException: this.writableBytes<1 |
writeByte(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=1 |
throws IndexOutOfBoundsException: this.writableBytes<1 |
writeShort(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=2 |
throws IndexOutOfBoundsException: this.writableBytes<2 |
writeMedium(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=3 |
throws IndexOutOfBoundsException: this.writableBytes<3 |
writeInt(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=4 |
throws IndexOutOfBoundsException: this.writableBytes<4 |
writeLong(long value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=8 |
throws IndexOutOfBoundsException: this.writableBytes<8 |
writeChar(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=2 |
throws IndexOutOfBoundsException: this.writableBytes<2 |
writeFloat(float value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=4 |
throws IndexOutOfBoundsException: this.writableBytes<4 |
writeDouble(double value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=8 |
throws IndexOutOfBoundsException: this.writableBytes<8 |
writeBytes(ByteBuf src) | ByteBuf | 将源ByteBuf src中从readerIndex开始的所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.readableBytes |
throws IndexOutOfBoundsException: this.writableBytes<src.readableBytes |
writeBytes(ByteBuf src, int length) | ByteBuf | 将源ByteBuf src中从readerIndex开始,长度length的可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length |
throws IndexOutOfBoundsException: this.writableBytes<length or src.readableBytes<length |
writeBytes(ByteBuf src, int srcIndex, int length) | ByteBuf | 将源ByteBuf src中从srcIndex开始,长度length的可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length |
throws IndexOutOfBoundsException: srcIndex<0 or this.writableBytes<length or src.capacity<srcIndex + length |
writeBytes(byte[] src) | ByteBuf | 将源字节数组src中所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.length |
throws IndexOutOfBoundsException: this.writableBytes<src.length |
writeBytes(byte[] src, int srcIndex, int length) | ByteBuf | 将源字节数组src中srcIndex开始,长度为length可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length |
throws IndexOutOfBoundsException: srcIndex<0 or this.writableBytes<src.length or src.length<srcIndex + length |
writeBytes(ByteBuffer mignsrc) | ByteBuf | 将源ByteBuffer src中所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.remaining() |
throws IndexOutOfBoundsException: this.writableBytes<src.remaining() |
writeBytes(InputStream in, int length) | int | 将源InputStream in中的内容写入到当前ByteBuf,写入的最大长度为length,实际写入的字节数可能少于length。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=实际写入的字节数。返回实际写入的字节数 |
throws IndexOutOfBoundsException: this.writableBytes<length |
writeBytes(ScatteringByteChannel in, int length) | int | 将源ScatteringByteChannel in中的内容写入到当前ByteBuf,写入的最大长度为length,实际写入的字节数可能少于length。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=实际写入的字节数。返回实际写入的字节数 |
throws IndexOutOfBoundsException: this.writableBytes<length |
writeZero(int length) | ByteBuf | 将当前缓冲区的内容填充为NUL(0x00),当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length |
throws IndexOutOfBoundsException: this.writableBytes<length |
readerIndex 和 writeIndex
调用ByteBuf的read操作时,从readerIndex开始读取数据,调用ByteBuf的write操作时,从writeIndex开始写入数据,readerIndex和writeInde关系如下:
+-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | | | (CONTENT) | | +-------------------+------------------+------------------+ | | | | 0 <= readerIndex <= writerIndex <= capacity
方法名称 | 返回值 | 功能说明 | 抛出异常 |
readerIndex() | int | 返回当前ByteBuf的readerIndex | |
readerIndex(int readerIndex) | ByteBuf | 修改当前ByteBuf的readerIndex |
throws IndexOutOfBoundsException this.writerIndex<readerIndex |
writerIndex() | int | 返回当前ByteBuf的writeIndex | |
writerIndex(int writerIndex) | ByteBuf | 修改当前ByteBuf的writeIndex |
throws IndexOutOfBoundsException writeIndex<this.readerIndex or this.capacity<writerIndex |
readableBytes() | int |
获取当前ByteBuf的可读字节数 this.writerIndex -this.readerIndex |
|
writableBytes() | int |
获取当前ByteBuf的可写字节数 this.capacity - this.writerIndex |
|
setIndex(int readerIndex, int writerIndex) | ByteBuf | 快捷设置当前ByteBuf的readerIndex和writerIndex |
throws IndexOutOfBoundsException readerIndex<0 or this.writerIndex<readerIndex or this.capacity<writerIndex |
skipBytes(int length) | ByteBuf | 更新当前ByteBuf的readerIndex,更新后将跳过length字节的数据读取。 |
throws IndexOutOfBoundsException this.readableBytes<length |
释放空间和clear操作
方法名称 | 返回值 | 功能说明 |
discardReadBytes() | ByteBuf | 释放0到readerIndex之间已经读取的空间;同时复制readerIndex和writerIndex之间的数据到0到writerIndex-readerIndex之间;修改readerIndex和writerIndex的值。该操作会发生字节数据的内存复制,频繁调用会导致性能下降。此外,相比其他java对象,缓冲区的分配和释放是个耗时的操作,缓冲区的动态扩张需要进行进行字节数据的复制,也是耗时的操作,因此应尽量提高缓冲区的重用率 |
discardSomeReadBytes() | ByteBuf | 功能和discardReadBytes()相似,不同之处在于可定制要释放的空间,依赖于具体实现 |
clear() | ByteBuf | 与JDK 的ByteBuffer clear操作相同,该操作不会清空缓冲区内容本身,其主要是为了操作位置指针,将readerIndex和writerIndex重置为0 |
mark和rest
当对缓冲区进行读写操作时,可能需要对之前的操作进行回滚。ByteBuf可通过调用mark操作将当前的位置指针备份到mark变量中,调用rest操作后,重新将指针的当前位置恢复为备份在mark变量的值。ByteBuf主要有以下相关方法:
markReaderIndex():将当前的readerIndex备份到markedReaderIndex中;
resetReaderIndex():将当前的readerIndex重置为markedReaderIndex的值;
markWriterIndex() :将当前的writerIndex备份到markedWriterIndex中;
resetWriterIndex():将当前的writerIndex重置为markedWriterIndex的值;
相关源码:
@Override
public ByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public ByteBuf resetReaderIndex() {
readerIndex(markedReaderIndex);
return this;
}
@Override
public ByteBuf markWriterIndex() {
markedWriterIndex = writerIndex;
return this;
}
@Override
public ByteBuf resetWriterIndex() {
writerIndex = markedWriterIndex;
return this;
}
查找操作
方法名称 | 返回值 | 功能说明 | 抛出异常 |
indexOf(int fromIndex, int toIndex, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,fromIndex<=查找范围<toIndex;查找成功返回位置索引,否则返回-1 | |
bytesBefore(byte value) | int | 从当前ByteBuf中查找首次出现value的位置,readerIndex<=查找范围<writerIndex;查找成功返回位置索引,否则返回-1 | |
bytesBefore(int length, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,readerIndex<=查找范围<readerIndex+length;查找成功返回位置索引,否则返回-1 |
IndexOutOfBoundsException: this.readableBytes<length |
bytesBefore(int index, int length, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,index<=查找范围<index+length;查找成功返回位置索引,否则返回-1 |
IndexOutOfBoundsException: this.readableBytes<index+length |
forEachByte(ByteBufProcessor processor); | int | 遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从readerIndex开始遍历直到writerIndex。如果满足条件,返回位置索引,否则返回-1 | |
forEachByte(int index, int length, ByteBufProcessor processor) | 遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从index开始遍历直到index+length。如果满足条件,返回位置索引,否则返回-1 | ||
forEachByteDesc(ByteBufProcessor processor) | 逆序遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从writerIndex-1开始遍历直到readerIndex。如果满足条件,返回位置索引,否则返回-1 | ||
forEachByteDesc(int index, int length, ByteBufProcessor processor) | 逆序遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从index+length-1开始遍历直到index。如果满足条件,返回位置索引,否则返回-1 |
Buffer视图
Derived Buffers类似于数据库视图,ByteBuf提供了多个接口用于创建某个ByteBuf的视图或者复制ByteBuf。主要操作如下:
方法名称 | 返回值 | 功能说明 |
duplicate() | ByteBuf | 返回当前ByteBuf的复制对象,复制后的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。 |
copy() | ByteBuf | 从当前ByteBuf复制一个新的ByteBuf对象,复制的新对象缓冲区的内容和索引均是独立的。该操作不修改原ByteBuf的readerIndex和writerIndex。(复制readerIndex到writerIndex之间的内容,其他属性与原ByteBuf相同,如maxCapacity,ByteBufAllocator) |
copy(int index, int length) | ByteBuf | 从当前ByteBuf 指定索引index开始,字节长度为length,复制一个新的ByteBuf对象,复制的新对象缓冲区的内容和索引均是独立的。该操作不修改原ByteBuf的readerIndex和writerIndex。(其他属性与原ByteBuf相同,,如maxCapacity,ByteBufAllocator) |
slice() | ByteBuf | 返回当前ByteBuf的可读子区域,起始位置从readerIndex到writerIndex,返回的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。返回ByteBuf对象的长度为readableBytes() |
slice(int index, int length) | ByteBuf | 返回当前ByteBuf的可读子区域,起始位置从index到index+length,返回的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。返回ByteBuf对象的长度为length |
转换为JDK ByteBuffer
当通过NIO的SocketChannel进行网络读写时,操作的对象为JDK的ByteBuffer,因此须在接口层支持netty ByteBuf到JDK的ByteBuffer的相互转换。
方法名称 | 返回值 | 功能说明 | 抛出异常 |
nioBuffer() | ByteBuffer | 将当前ByteBuf的可读缓冲区(readerIndex到writerIndex之间的内容)转换为ByteBuffer,两者共享共享缓冲区的内容。对ByteBuffer的读写操作不会影响ByteBuf的读写索引。注意:ByteBuffer无法感知ByteBuf的动态扩展操作。ByteBuffer的长度为readableBytes() | UnsupportedOperationException |
nioBuffer(int index, int length) | ByteBuffer | 将当前ByteBuf的可读缓冲区(index到index+length之间的内容)转换为ByteBuffer,两者共享共享缓冲区的内容。对ByteBuffer的读写操作不会影响ByteBuf的读写索引。注意:ByteBuffer无法感知ByteBuf的动态扩展操作。ByteBuffer的长度为length | UnsupportedOperationException |
随机读写(set和get)
除顺序读写之外,ByteBuf还支持随机读写,其最大的区别在于可随机指定读写的索引位置。
关于随机读写的API这里不再详述。无论set或get,执行前都会进行索引和长度的合法性验证,此外,set操作不同于write的是不支持动态扩展。部分源码:
@Override
public ByteBuf getBytes(int index, byte[] dst) {
getBytes(index, dst, 0, dst.length);
return this;
}
//
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
protected final void checkDstIndex(int index, int length, int dstIndex, int dstCapacity) {
checkIndex(index, length);
if (dstIndex < 0 || dstIndex > dstCapacity - length) {
throw new IndexOutOfBoundsException(String.format(
"dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dstCapacity));
}
}
protected final void checkIndex(int index, int fieldLength) {
ensureAccessible();
if (fieldLength < 0) {
throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
}
if (index < 0 || index > capacity() - fieldLength) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
}
}
@Override
public ByteBuf setByte(int index, int value) {
checkIndex(index);
_setByte(index, value);
return this;
}
//索引合法性验证
protected final void checkIndex(int index) {
ensureAccessible();
if (index < 0 || index >= capacity()) {
throw new IndexOutOfBoundsException(String.format(
"index: %d (expected: range(0, %d))", index, capacity()));
}
}
//确认ByteBuf对象可访问,引用计数器不为0
protected final void ensureAccessible() {
if (refCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
}
//UnpooledHeapByteBuf 实现
@Override
protected void _setByte(int index, int value) {
array[index] = (byte) value;
}
ByteBuf类继承关系图如下:
ReferenceCounted:对象引用计数器,初始化ReferenceCounted对象时,引用数量refCnt为1,调用retain()可增加refCnt,release()用于减少refCnt。refCnt为1时,说明对象实际不可达,release()方法将立即调用deallocate()释放对象。如果refCnt为0,说明对象被错误的引用。在AbstractReferenceCountedByteBuf源码分析小节将详细介绍ReferenceCounted的原理。
ByteBuf:实现接口ReferenceCounted和Comparable,实现ReferenceCounted使得ByteBuf具备引用计数的能力,方便跟踪ByteBuf对象分配和释放。
- ByteBuf直接子类
EmptyByteBuf:用于构建空ByteBuf对象,capacity和maxCapacity均为0。
ReplayingDecoderBuffer:用于构建在IO阻塞条件下实现无阻塞解码的特殊ByteBuf对象,当要读取的数据还未接收完全时,抛出异常,交由ReplayingDecoder处理。
SwappedByteBuf:用于构建具有切换字节顺序功能的ByteBuf对象,默认ByteBuf对象使用BIG_ENDIAN(大字节序)存储数据,SwappedByteBuf可以在BIG_ENDIAN和LITTLE_ENDIAN之间*切换。TCP/IP各层协议均采用网络字节序(BIG_ENDIAN),关于字节序的更多内容不详细介绍。
WrappedByteBuf:用于装饰ByteBuf对象,主要有AdvancedLeakAwareByteBuf、SimpleLeakAwareByteBuf和UnreleasableByteBuf三个子类。这里WrappedByteBuf使用装饰者模式装饰ByteBuf对象,AdvancedLeakAwareByteBuf用于对所有操作记录堆栈信息,方便监控内存泄漏;SimpleLeakAwareByteBuf只记录order(ByteOrder endianness)的堆栈信息;UnreleasableByteBuf用于阻止修改对象引用计数器refCnt的值。
AbstractByteBuf:提供ByteBuf的默认实现,同时组合ResourceLeakDetector和SwappedByteBuf的能力,ResourceLeakDetector是内存泄漏检测工具,SwappedByteBuf用于字节序不同时转换字节序。
- AbstractByteBuf直接子类
AbstractDerivedByteBuf:提供派生ByteBuf的默认实现,主要有DuplicatedByteBuf、ReadOnlyByteBuf和SlicedByteBuf。
DuplicatedByteBuf使用装饰者模式创建ByteBuf的复制对象,使得复制后的对象与原对象共享缓冲区的内容,但是独立维护自己的readerIndex和writerIndex。部分源码如下:
private final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) {
super(buffer.maxCapacity());
//共享缓冲区内容
if (buffer instanceof DuplicatedByteBuf) {
this.buffer = ((DuplicatedByteBuf) buffer).buffer;
} else {
this.buffer = buffer;
}
//调用自身的setIndex方法维护readerIndex和writerIndex
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
//所有操作都是通过调用被装饰对象buffer的相应方法实现
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
buffer.getBytes(index, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
buffer.getBytes(index, dst, dstIndex, length);
return this;
}
ReadOnlyByteBuf使用装饰者模式创建ByteBuf的只读对象,该只读对象与原对象共享缓冲区的内容,但是独立维护自己的readerIndex和writerIndex,之后所有的写操作都被限制;部分源码如下:
private final ByteBuf buffer;
public ReadOnlyByteBuf(ByteBuf buffer) {
super(buffer.maxCapacity());
if (buffer instanceof ReadOnlyByteBuf || buffer instanceof DuplicatedByteBuf) {
this.buffer = buffer.unwrap();
} else {
this.buffer = buffer;
}
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
@Override
protected void _setLong(int index, long value) {
throw new ReadOnlyBufferException();
}
@Override
public int setBytes(int index, InputStream in, int length) {
throw new ReadOnlyBufferException();
}
SlicedByteBuf使用装饰者模式创建ByteBuf的一个子区域ByteBuf对象,返回的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex,允许写操作。
AbstractReferenceCountedByteBuf:提供修改对象引用计数器相关操作的默认实现。
- AbstractReferenceCountedByteBuf直接子类
CompositeByteBuf:用于将多个ByteBuf组合在一起,形成一个虚拟的ByteBuf对象,支持读写和动态扩展。内部使用List<Component>组合多个ByteBuf。推荐使用ByteBufAllocator的compositeBuffer()方法,Unpooled的工厂方法compositeBuffer()或wrappedBuffer(ByteBuf... buffers)创建CompositeByteBuf对象。
FixedCompositeByteBuf:用于将多个ByteBuf组合在一起,形成一个虚拟的只读ByteBuf对象,不允许写入和动态扩展。内部使用Object[]将多个ByteBuf组合在一起,一旦FixedCompositeByteBuf对象构建完成,则不会被更改。
PooledByteBuf<T>:基于内存池的ByteBuf,主要为了重用ByteBuf对象,提升内存的使用效率;适用于高负载,高并发的应用中。主要有PooledDirectByteBuf,PooledHeapByteBuf,PooledUnsafeDirectByteBuf三个子类,PooledDirectByteBuf是在堆外进行内存分配的内存池ByteBuf,PooledHeapByteBuf是基于堆内存分配内存池ByteBuf,PooledUnsafeDirectByteBuf也是在堆外进行内存分配的内存池ByteBuf,区别在于PooledUnsafeDirectByteBuf内部使用基于PlatformDependent相关操作实现ByteBuf,具有平台相关性。
ReadOnlyByteBufferBuf:只读ByteBuf,内部持有ByteBuffer对象,相关操作委托给ByteBuffer实现,该ByteBuf限内部使用,ReadOnlyByteBufferBuf还有一个子类ReadOnlyUnsafeDirectByteBuf。
UnpooledDirectByteBuf:在堆外进行内存分配的非内存池ByteBuf,内部持有ByteBuffer对象,相关操作委托给ByteBuffer实现。
UnpooledHeapByteBuf:基于堆内存分配非内存池ByteBuf,即内部持有byte数组。
UnpooledUnsafeDirectByteBuf:与UnpooledDirectByteBuf相同,区别在于UnpooledUnsafeDirectByteBuf内部使用基于PlatformDependent相关操作实现ByteBuf,具有平台相关性。
到此,ByteBuf继承家族的各个成员对应的相关功能已介绍完成。
总结:
从内存分配角度看,ByteBuf主要分为两类:
- 堆内存(HeapByteBuf)字节缓冲区:特点是内存的分配和回收速度快,可以被JVM自动回收;缺点是进行Socket的I/O读写需要额外进行一次内存复制,即将内存对应的缓冲区复制到内核Channel中,性能会有一定程度下降。
- 直接内存(DirectByteBuf)字节缓冲区:在堆外进行内存分配,相比堆内存,分配和回收速度稍慢。但用于Socket的I/O读写时,少一次内存复制,速度比堆内存字节缓冲区快。
经验表明,在I/O通信线程的读写缓冲区使用DirectByteBuf,后端业务消息的编解码模块使用HeapByteBuf,这样组合可以达到性能最优。
从内存回收角度看,ByteBuf也分为两类:
- 基于内存池的ByteBuf:优点是可以重用ByteBuf对象,通过自己维护一个内存池,可以循环利用创建的ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁GC。适用于高负载,高并发的应用中。推荐使用基于内存池的ByteBuf。
- 非内存池的ByteBuf:优点是管理和维护相对简单。
ByteBuf - 字节数据的容器
因为所有的网络通信最终都是基于底层的字节流传输,因此一个高效、方便、易用的数据接口是必要的,而 Netty 的 ByteBuf 满足这些需求。
ByteBuf 是一个很好的经过优化的数据容器,我们可以将字节数据有效的添加到 ByteBuf 中或从 ByteBuf 中获取数据。ByteBuf 有2部分:一个用于读,一个用于写。我们可以按顺序的读取数据,也可以通过调整读取数据的索引或者直接将读取位置索引作为参数传递给get方法来重复读取数据。
ByteBuf 如何在工作?
写入数据到 ByteBuf 后,writerIndex(写入索引)增加。开始读字节后,readerIndex(读取索引)增加。你可以读取字节,直到写入索引和读取索引处在相同的位置,ByteBuf 变为不可读。当访问数据超过数组的最后位,则会抛出 IndexOutOfBoundsException。
调用 ByteBuf 的 "read" 或 "write" 开头的任何方法都会提升 相应的索引。另一方面,"set" 、 "get"操作字节将不会移动索引位置;他们只会操作相关的通过参数传入方法的相对索引。
可以给ByteBuf指定一个最大容量值,这个值限制着ByteBuf的容量。任何尝试将写入索引超过这个值的行为都将导致抛出异常。ByteBuf 的默认最大容量限制是 Integer.MAX_VALUE。
ByteBuf 类似于一个字节数组,最大的区别是读和写的索引可以用来控制对缓冲区数据的访问。下图显示了一个容量为16的空的 ByteBuf 的布局和状态,writerIndex 和 readerIndex 都在索引位置 0 :
Figure 5.1 A 16-byte ByteBuf with its indices set to 0
ByteBuf 使用模式
HEAP BUFFER(堆缓冲区)
最常用的模式是 ByteBuf 将数据存储在 JVM 的堆空间,这是通过将数据存储在数组的实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。它还提供了直接访问数组的方法,通过 ByteBuf.array() 来获取 byte[]数据。 这种方法,正如清单5.1中所示的那样,是非常适合用来处理遗留数据的。
Listing 5.1 Backing array
ByteBuf heapBuf = ...;
if (heapBuf.hasArray()) { //1
byte[] array = heapBuf.array(); //2
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); //3
int length = heapBuf.readableBytes();//4
handleArray(array, offset, length); //5
}
1.检查 ByteBuf 是否有支持数组。
2.如果有的话,得到引用数组。
3.计算第一字节的偏移量。
4.获取可读的字节数。
5.使用数组,偏移量和长度作为调用方法的参数。
注意:
- 访问非堆缓冲区 ByteBuf 的数组会导致UnsupportedOperationException, 可以使用 ByteBuf.hasArray()来检查是否支持访问数组。
- 这个用法与 JDK 的 ByteBuffer 类似
DIRECT BUFFER(直接缓冲区)
“直接缓冲区”是另一个 ByteBuf 模式。对象的所有内存分配发生在 堆,对不对?好吧,并非总是如此。在 JDK1.4 中被引入 NIO 的ByteBuffer 类允许 JVM 通过本地方法调用分配内存,其目的是
- 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
- DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.(详见http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html.)
这就解释了为什么“直接缓冲区”对于那些通过 socket 实现数据传输的应用来说,是一种非常理想的方式。如果你的数据是存放在堆中分配的缓冲区,那么实际上,在通过 socket 发送数据之前,JVM 需要将先数据复制到直接缓冲区。
但是直接缓冲区的缺点是在内存空间的分配和释放上比堆缓冲区更复杂,另外一个缺点是如果要将数据传递给遗留代码处理,因为数据不是在堆上,你可能不得不作出一个副本,如下:
Listing 5.2 Direct buffer data access
ByteBuf directBuf = ...
if (!directBuf.hasArray()) { //1
int length = directBuf.readableBytes();//2
byte[] array = new byte[length]; //3
directBuf.getBytes(directBuf.readerIndex(), array); //4
handleArray(array, 0, length); //5
}
1.检查 ByteBuf 是不是由数组支持。如果不是,这是一个直接缓冲区。
2.获取可读的字节数
3.分配一个新的数组来保存字节
4.字节复制到数组
5.将数组,偏移量和长度作为参数调用某些处理方法
显然,这比使用数组要多做一些工作。因此,如果你事前就知道容器里的数据将作为一个数组被访问,你可能更愿意使用堆内存。
COMPOSITE BUFFER(复合缓冲区)
最后一种模式是复合缓冲区,我们可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf 组合的视图。复合缓冲区就像一个列表,我们可以动态的添加和删除其中的 ByteBuf,JDK 的 ByteBuffer 没有这样的功能。
Netty 提供了 ByteBuf 的子类 CompositeByteBuf 类来处理复合缓冲区,CompositeByteBuf 只是一个视图。
警告
CompositeByteBuf.hasArray() 总是返回 false,因为它可能既包含堆缓冲区,也包含直接缓冲区
例如,一条消息由 header 和 body 两部分组成,将 header 和 body 组装成一条消息发送出去,可能 body 相同,只是 header 不同,使用CompositeByteBuf 就不用每次都重新分配一个新的缓冲区。下图显示CompositeByteBuf 组成 header 和 body:
Figure 5.2 CompositeByteBuf holding a header and body
下面代码显示了使用 JDK 的 ByteBuffer 的一个实现。两个 ByteBuffer 的数组创建保存消息的组件,第三个创建用于保存所有数据的副本。
Listing 5.3 Composite buffer pattern using ByteBuffer
// 使用数组保存消息的各个部分
ByteBuffer[] message = { header, body };
// 使用副本来合并这两个部分
ByteBuffer message2 = ByteBuffer.allocate(
header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();
这种做法显然是低效的;分配和复制操作不是最优的方法,操纵数组使代码显得很笨拙。
下面看使用 CompositeByteBuf 的改进版本
Listing 5.4 Composite buffer pattern using CompositeByteBuf
CompositeByteBuf messageBuf = ...;
ByteBuf headerBuf = ...; // 可以支持或直接
ByteBuf bodyBuf = ...; // 可以支持或直接
messageBuf.addComponents(headerBuf, bodyBuf);
// ....
messageBuf.removeComponent(0); // 移除头 //2
for (int i = 0; i < messageBuf.numComponents(); i++) { //3
System.out.println(messageBuf.component(i).toString());
}
1.追加 ByteBuf 实例的 CompositeByteBuf
2.删除 索引1的 ByteBuf
3.遍历所有 ByteBuf 实例。
清单5.4 所示,你可以简单地把 CompositeByteBuf 当作一个可迭代遍历的容器。 CompositeByteBuf 不允许访问其内部可能存在的支持数组,也不允许直接访问数据,这一点类似于直接缓冲区模式,如图5.5所示。
Listing 5.5 Access data
CompositeByteBuf compBuf = ...;
int length = compBuf.readableBytes(); //1
byte[] array = new byte[length]; //2
compBuf.getBytes(compBuf.readerIndex(), array); //3
handleArray(array, 0, length); //4
1.得到的可读的字节数。
2.分配一个新的数组,数组长度为可读字节长度。
3.读取字节到数组
4.使用数组,把偏移量和长度作为参数
Netty 尝试使用 CompositeByteBuf 优化 socket I/O 操作,消除 原生 JDK 中可能存在的的性能低和内存消耗问题。虽然这是在Netty 的核心代码中进行的优化,并且是不对外暴露的,但是作为开发者还是应该意识到其影响。
CompositeByteBuf API
CompositeByteBuf 提供了大量的附加功能超出了它所继承的 ByteBuf。请参阅的 Netty 的 Javadoc 文档 API。
rel: https://my.oschina.net/7001/blog/743240
上一篇: Netty之有效规避内存泄漏
下一篇: Scrapy网络爬虫----初识