什么是ByteBuf?html
ByteBuf在Netty中充当着很是重要的角色;它是在数据传输中负责装载字节数据的一个容器;其内部结构和数组相似,初始化默认长度为256,默认最大长度为Integer.MAX_VALUE。
ByteBuf数据结构java
* <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
ByteBuf字节缓冲区主要由discardable
、readable
、writable
三种类型的字节组成的;数组
ByteBuf字节缓冲区能够操控readerIndex
、writerIndex
二个下标;这两个下标都是单独维护
的缓存
名词 | 解释 | 方法 |
---|---|---|
discardable bytes | 丢弃的字节;ByteBuf中已经读取的字节 | discardReadBytes(); |
readable bytes | 剩余的可读的字节 | |
writable bytes | 已经写入的字节 | |
readerIndex | 字节读指针(数组下标) | readerIndex() |
writerIndex | 字节写指针(数组下标) | writerIndex() |
ByteBuf中主要的类-UML图数据结构
ByteBuf怎么建立的?ide
ByteBuf是经过Unpooled来进行建立;默认长度为256,可自定义指定长度,最大长度为Integer.MAX_VALUE;
ByteBuf建立的类型有哪几种?工具
1. 基于内存管理分类源码分析
类型 | 解释 | 对应的字节缓冲区类 |
---|---|---|
Pooled | 池化; 简单的理解就是pooled拥有一个pool 池空间 (poolArea),凡是建立过的字节缓冲区都会被缓存进去, 有新的链接须要字节缓冲区会先从缓存中 get ,取不到则在进行建立; |
1.PooledDirectByteBuf 2.PooledHeapByteBuf 3.PooledUnsafeDirectByteBuf 4.PooledUnsafeHeapByteBuf |
Unpooled | 非池化; 每次都会建立一个字节缓冲区 |
1.UnpooledDirectByteBuf 2.UnpooledHeapByteBuf 3.UnpooledUnsafeDirectByteBuf 4.UnpooledUnsafeHeapByteBuf |
优缺点:性能
2. 基于内存分类this
类型 | 解释 | 特色 | 构造方法 |
---|---|---|---|
heapBuffer(经常使用) | 堆字节缓冲区; | 底层就是JVM的堆内存,只是IO读写须要从堆内存拷贝到内核中(相似以前学过的IO多路复用) | buffer(128) |
directBuffer(经常使用) | 直接内存字节缓冲区; | 直接存于操做系统内核空间(堆外内存) | directBuffer(256) |
优缺点:
每一种都有本身优点的地方,咱们要根据实际的业务来灵活的运用;若是涉及到大量的文件操做建议使用directBuffer(搬来搬去确实挺耗性能);大部分业务仍是推荐使用heapBuffer(heapBuffer,普通的业务搬来搬去相比在内核申请一块内存和释放内存来讲要更加优)。
ByteBuf是怎么样回收的
1. heapBuffer
heapBuffer是基于堆内存来进行建立的,回收天然而然经过JVM的回收机制进行回收
2. directBuffer回收内存的方法
能够经过DirectByteBuffer中的Cleaner来进行清除 或者依靠unsafe的释放内存(freeMemory方法)也能够进行回收
ByteBuf内存分配
ByteBuf的内存分配主要分为heap(堆内存)和direct(堆外内存);
1.1 建立堆缓冲区
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { // 是否支持unsafe return PlatformDependent.hasUnsafe() ? // 建立unsafe非池化堆字节缓冲区 new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : // 建立非池化堆字节缓冲区 new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
1.2 unsafe和非unsafe都是经过实例 UnpooledHeapByteBuf 来分配内存
// InstrumentedUnpooledUnsafeHeapByteBuf/InstrumentedUnpooledHeapByteBuf // 最终都是经过这个方法来建立分配内存;后面会讲讲unsafe和普通的非unsafe的区别 protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); // 检查内存分配类是否为空 checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // allocateArray初始化一个initialCapacity长度的字节数组 setArray(allocateArray(initialCapacity)); // 初始化读写索引为0 setIndex(0, 0); } // 初始化一个initialCapacity长度的字节数组 byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } // 初始化读写索引为0 @Override public ByteBuf setIndex(int readerIndex, int writerIndex) { if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) { throw new IndexOutOfBoundsException(String.format( "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, capacity())); } setIndex0(readerIndex, writerIndex); return this; } final void setIndex0(int readerIndex, int writerIndex) { this.readerIndex = readerIndex; this.writerIndex = writerIndex; }
从源码能够得知,堆内存ByteBuf经过判断系统环境是否支持unsafe来判断是建立UnsafeHeapByteBuf仍是heapByteBuf; 若是支持unsafe则返回 InstrumentedUnpooledUnsafeHeapByteBuf
实例,反之则返回 InstrumentedUnpooledHeapByteBuf
实例;但它们都是分配一个byte数组来进行存储字节数据。
1.3 unsafe和非unsafe建立的ByteBuf有什么区别呢
unsafe和非unsafe建立的heapByteBuf区别在于获取数据;非unsafe获取数据直接是经过数组索引来进行获取的;而unsafe获取数据则是经过UNSAFE操控内存来获取;咱们能够经过源码来看看
heapByteBuf获取数据
@Override public byte getByte(int index) { ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(array, index); } // 直接返回数组对应索引的值 static byte getByte(byte[] memory, int index) { return memory[index]; }
unsafeHeapByteBuf获取数据
@Override public byte getByte(int index) { checkIndex(index); return _getByte(index); } @Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(array, index); } static byte getByte(byte[] array, int index) { return PlatformDependent.getByte(array, index); } public static byte getByte(byte[] data, int index) { return PlatformDependent0.getByte(data, index); } static byte getByte(byte[] data, int index) { // 经过unsafe来获取 return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index); }
2.1 建立directBuffer
// PlatformDependent检测运行环境的变量属性,好比java环境,unsafe是否支持等 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; // 支持unsafe if (PlatformDependent.hasUnsafe()) { // 运行环境是否使用不清空的direct内存 buf = PlatformDependent.useDirectBufferNoCleaner() ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { // 建立非unsafe实例ByteBuf buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
2.2 unsafe返回 UnpooledUnsafeDirectByteBuf
实例,非unsafe返回 UnpooledDirectByteBuf
实例
// 建立unsafe direct字节缓冲区 protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // allocateDirect建立DirectByteBuffer(java nio)分配内存 setByteBuffer(allocateDirect(initialCapacity), false); } // 非unsafe建立direct内存 protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); }
2.3 分配 direct内存,返回 java nio ByteBuffer实例
protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } /** * Allocates a new direct byte buffer. 分配一个新的直接内存字节缓冲区 * * <p> The new buffer's position will be zero, its limit will be its * capacity, its mark will be undefined, and each of its elements will be * initialized to zero. Whether or not it has a * {@link #hasArray backing array} is unspecified. * * @param capacity * The new buffer's capacity, in bytes * * @return The new byte buffer * * @throws IllegalArgumentException * If the <tt>capacity</tt> is a negative integer */ public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } // allocateDirect建立DirectByteBuffer(java nio) DirectByteBuffer(int cap) { // package-private // 设置文件描述, 位置等信息 super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { // 经过unsafe类来分配内存 base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } // 实例化cleaner,用于后续回收 cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
2.4 设置ByteBuffer的属性(unsafe和非unsafe)
unsafe设置ByteBuffer
/** * buffer 数据字节 * tryFree 尝试释放,默认为false */ final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { // 全局buffer设置成旧buffer ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { // 释放旧缓冲区的内存 freeDirect(oldBuffer); } } } // 将当前传入的buffer设置成全局buffer this.buffer = buffer; // 记录内存地址 memoryAddress = PlatformDependent.directBufferAddress(buffer); // 将临时buff设置为null tmpNioBuf = null; // 记录容量大小 capacity = buffer.remaining(); } // 获取对象在内存中的地址 static long directBufferAddress(ByteBuffer buffer) { return getLong(buffer, ADDRESS_FIELD_OFFSET); } // 经过unsafe操控系统内存,获取对象在内存中的地址 private static long getLong(Object object, long fieldOffset) { return UNSAFE.getLong(object, fieldOffset); }
非unsafe设置ByteBuffer
// 非unsafe设置属性 private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; tmpNioBuf = null; capacity = buffer.remaining(); }
根据源码,direct经过判断运行系统环境是否使用useDirectBufferNoCleaner
来实例不一样的ByteBufferedReader(unsafe和非unsafe),可是他们最终都是经过ByteBuffer来分配内存,底层都是经过在不一样的ByteBuf实例中构建一个ByteBuffer来进行存储字节数据的(具体能够看看UnpooledDirectByteBuf
的set方法)
2.5 unsafe和非unsafe建立的directByteBuf的区别
unsafe获取数据:UNSAFE经过索引的内存地址来获取对应的值
@Override protected byte _getByte(int index) { // UnsafeByteBufUtil unsafe工具类获取 return UnsafeByteBufUtil.getByte(addr(index)); } // addr 获取索引的内存地址 long addr(int index) { return memoryAddress + index; } static byte getByte(long address) { return PlatformDependent.getByte(address); } public static byte getByte(long address) { return PlatformDependent0.getByte(address); } // 经过UNSAFE获取内存地址的值 static byte getByte(long address) { return UNSAFE.getByte(address); }
非unsafe获取数据: 直接经过对应索引的ByteBuffer获取值
@Override public byte getByte(int index) { // 检查受权,及ByteBuffer对象是否还有引用 ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { // 经过索引获取值 return buffer.get(index); }
每次咱们再往字节缓冲区中写入数据的时候都会判断当前容量是否还能写入数据,当发现容量不够时,此时ByteBuf会总动进行扩容;固然咱们也能够手动更改ByteBuf的容量;详细见代码分析。
public static void main(String[] args) { // 利用非池化Unpooled类建立字节缓冲区 ByteBuf byteBuf = Unpooled.buffer(2); System.out.println("initCapacity: " + byteBuf.capacity()); byteBuf.writeByte(66); byteBuf.writeByte(67); byteBuf.readBytes(1); System.out.println("readerIndex: " + byteBuf.readerIndex()); System.out.println("writerIndex: " + byteBuf.writerIndex()); // 丢弃已经阅读的字节 byteBuf.discardReadBytes(); byteBuf.writeByte(68); byteBuf.writeByte(69); System.out.println("readerIndex: " + byteBuf.readerIndex()); System.out.println("writerIndex: " + byteBuf.writerIndex()); System.out.println("capacity: " + byteBuf.capacity()); } // 运行结果 initCapacity: 2 readerIndex: 1 writerIndex: 2 readerIndex: 0 writerIndex: 3 capacity: 64
上面代码的操做步骤:初始化ByteBuf --- 写入数据 --- 读取数据 --- 丢弃数据 --- 再写入数据;
丢弃了一个字节数的数据又写入了2个字节数的数据,初始化容量的缓冲区明显不够发生了自动扩容,扩容后的容量:64;它是怎么进行扩容的呢?何时扩容的呢?看下源码
public ByteBuf writeByte(int value) { // 确保能够写入(判断是否容量够不够写入) ensureWritable0(1); // 设置写索引、存值 _setByte(writerIndex++, value); return this; } // minWritableBytes默认为1 由于writeByte每次只能写入一个字节数 final void ensureWritable0(int minWritableBytes) { // 检查是否还有占有权和是否还有引用 ensureAccessible(); // writableBytes() = capacity - writerIndex 剩余可写容量 if (minWritableBytes <= writableBytes()) { return; } // ByteBuf虽然支持自动扩容可是也有上限(Integer.MAX_VALUE) if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // 开始进行扩容 newCapacity = writerIndex + minWritableBytes int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // 将新的容量写入到ByteBuf capacity(newCapacity); } /** * 计算新的容量 * minNewCapacity 写入的最小容量 * maxCapacity 最大容量及Integer.MAX_VALUE 2147483647 */ public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } // 4兆大小 4194304 final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // 若是超过了4兆, if (minNewCapacity > threshold) { // 新的容量扩容为超过的倍数的容量 int newCapacity = minNewCapacity / threshold * threshold; // 若是超过了最大的容量则直接设置为最大容量 if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // 默认扩容大小为64 int newCapacity = 64; while (newCapacity < minNewCapacity) { // 左移一位 newCapacity = newCapacity*2 newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
从上面的源码可知,自动扩容在4兆
的范围内变化的话,每次扩容都是64 * 2的N字方
(N >= 1); 一旦超过了4兆
则递增倍数为(newCapacity / 4194304) * 4194304
即表示的是基于4兆增加的倍数。
读写
两个数组下标,ByteBuffer只有一个下标索引,读写的时候须要手动设置(调用flip和rewind)虽然Netty中使用的ByteBuf来进行缓存字节数据,可是最后在Channel中仍是以ByteBuffer(java nio)来进行传输
参考文献:
https://www.cnblogs.com/stateis0/p/9062152.html https://www.jianshu.com/p/1585e32cf6b4 https://blog.csdn.net/ZBylant/article/details/83037421