Netty 源码分析之ByteBuf

Netty 源码分析之ByteBuf


ByteBuf基础

Java Nio 的Buffer

在进行数据传输的过程当中,咱们常常会用到缓冲区。
在Java NIO 为咱们提供了原生的七种缓冲区实现,对应着Java 的七种基本类型。通常使用ByteBuffer较多。原生的Buffer虽然能知足咱们的平常使用,可是要进行复杂的应用的时候,确有点力不从心了,原生Buffer存在着如下缺点。所以Netty对其进行了封装,提供了更为友好的接口供咱们使用。java

  • 当咱们调用对应Buffer类的allocate方法来建立缓冲区实例的时候,会分配指定的空间,同时缓冲区的长度就会被固定,不能进行动态的增加或者收缩。若是咱们写入的数据大于缓冲区的capacity的时候,就会发生数组越界错误。
  • Buffer只有一个位置标志位属性Position,咱们只能flip或者rewind方法来对position进行修改来处理数据的存取位置,一不当心就可能会致使错误。
  • Buffer只提供了存取、翻转、释放、标志、比较、批量移动等缓冲区的基本操做,咱们想使用高级的功能,就得本身手动进行封装及维护,使用很是不方便。

ByteBuf工做原理

ByteBuf也是经过字节数组做为缓冲区来存取数据,经过外观模式聚合了JDK NIO元素的ByteBuffer,进行封装。
ByteBuf是经过readerIndex跟writerIndex两个位置指针来协助缓冲区的读写操做的。
在对象初始化的时候,readerIndex和writerIndex的值为0,随着读操做和写操做的进行,writerIndex和readerIndex都会增长,不过readerIndex不能超过writerIndex,在进行读取操做以后,0到readerIndex之间的空间会被视为discard,调用ByteBuf的discardReadBytes方法,能够对这部分空间进行释放重用,相似于ByteBuffer的compact操做,对缓冲区进行压缩。readerIndex到writerIndex的空间,至关于ByteBuffer的position到limit的空间,能够对其进行读取,WriterIndex到capacity的空间,则至关于ByteBuffer的limit到capacity的空间,是能够继续写入的。
readerIndex跟writerIndex让读写操做的位置指针分离,不须要对同一个位置指针进行调整,简化了缓冲区的读写操做。
一样,ByteBuf对读写操做进行了封装,提供了动态扩展的能力,当咱们对缓冲区进行写操做的时候,须要对剩余的可用空间进行校验,若是可用空间不足,同时要写入的字节数小于可写的最大字节数,会对缓冲区进行动态扩展,它会从新建立一个缓冲区,而后将之前的数据复制到新建立的缓冲区中,数组

ByteBuf基本功能

  • 顺序读
    在进行读操做以前,首先对缓冲区可用的空间进行校验。若是要读取的字节长度小于0,就会抛出IllegalArgumentException异常,若是要读取的字节长度大于已写入的字节长度,会抛出IndexOutOfBoundsException异常。经过校验以后,调用getBytes方法,从当前的readerIndex开始,读取length长度的字节数据到目标dst中,因为不一样的子类实现不同,getBytes是个抽象方法,由对应的子类去实现。若是读取数据成功,readerIndex将会增长相应的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
    ensureAccessible();
    if (minimumReadableBytes < 0) {
        throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
    }
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}
  • 顺序写
    读操做是将源字节数组从srcIndex开始,length长度的数据写入到当前的ByteBuf中的。
    一开始须要对写入数组的字节数进行校验,若是写入长度小于0,将会抛出IllegalArgumentException异常,若是写入字节数小于当前ByteBuf的可写入字节数,则经过检验。若是写入字节数大于缓冲区最大可动态扩展的容量maxCapacity,就会抛出
    IndexOutOfBoundsException异常,不然的话,就会经过动态扩展来知足写入须要的字节数。首先经过calculateNewCapacity计算出从新扩展后的容量,而后调用capacity方法进行扩展,不一样的子类有不一样实现,因此也是一个抽象方法。
    • 计算扩展容量,首先设置门阀值为4m,若是要扩展的容量等于阀值就使用阀值做为缓冲区新的容量,若是大于阀值就以4M做为步长,每次增长4M,若是扩展期间,要扩展的容量比最大可扩展容量还大的话,就以最大可扩展容量maxCapacity为新的容量。不然的话,就从64开始倍增,直到倍增以后的结果大于要扩展的容量,再把结果做为缓冲区的新容量。
    • 经过先倍增再步长来扩展容量,若是咱们只是writerIndex+length的值做为缓冲区的新容量,那么再之后进行写操做的时候,每次都须要进行容量扩展,容量扩展的过程须要进行内存复制,过多内存复制会致使系统的性能降低,之因此是倍增再部长,在最初空间比较小的时候,倍增操做并不会带来太多的内存浪费,可是内存增加到必定的时候,再进行倍增的时候,就会对内存形成浪费,所以,须要设定一个阀值,到达阀值以后就经过步长的方法进行平滑的增加。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }

    if (minWritableBytes <= writableBytes()) {
        return this;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return this;
}
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity实现
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • Clear操做
    clear操做只是把readerIndex和writerIndex设置为0,不会对存储的数据进行修改。
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}
  • 索引操做安全

    • 读写位置索引设置:主要是对边界条件进行校验,设置readerIndex的时候,newReaderIndex不能小于0跟大于writerIndex;设置writerIndex的时候,newWriterIndex必须大于readerIndex和小于当前的capacity。若是不能经过校验的话,就会抛出IndexOutOfBoundsException异常。
    • mark和reset操做:因为有readerIndex和writerIndex,所以进行mark或者reset须要指定相应的操做位置索引,mark操做会把当前的readerIndex或者writerIndex设置为markedReaderIndex或者markedWriterIndex;reset操做的话,它是参入对应的mark值调用对应readerIndex()或者writerIndex();
  • 缓冲区重用
    能够经过discardReadByte方法去重用已经读取过的缓冲区。
    首先对readerIndex进行判断:app

    • 若是readerIndex等于0,就说明没有读取数据,没有能够用来重用的空间,直接返回;
    • 若是readerIndex大于0且不等于writerIndex的话,说明有进行数据读取被丢弃的缓冲区,也有尚未被读取的缓冲区。调用setBytes方法进行字节数组的复制,将没被读取的数据移动到缓冲区的起始位置,从新去设置readerIndex和writerIndex,readerIndex为0,writerIndex为原writerIndex-readerIndex;同时,也须要对mark进行从新设置。
      • 首先对markedReaderIndex进行备份而后跟decrement进行比较,若是markedReaderIndex比decrement小的话,markedReaderIndex设置为0,再用markedWriterIndex跟decrement比较,若是小于的话,markedWriterIndex也设置为0,不然的话markedWriterIndex较少decrement;
      • 若是markedReaderIndex比decrement大的话,markedReaderIndex和markedReaderIndex都减去decrement就能够了。
    • 若是readerIndex等于writerIndex的话,说明没有能够进行重用的缓冲区,直接对mark从新设置就能够了,不须要内存复制。
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}
  • skipBytes

当咱们须要跳过某些不须要的字节的时候,能够调用skipBytes方法来跳过指定长度的字节来读取后面的数据。
首先对跳跃长度进行判断,若是跳跃长度小于0的话,会抛出IllegalArgumentException异常,或者跳跃长度大于当前缓冲区可读长度的话,会抛出IndexOutOfBoundsException异常。若是校验经过,新的readerindex为原readerIndex+length,若是新的readerIndex大于writerIndex的话,会抛出IndexOutOfBoundsException异常,不然就更新readerIndex。函数

public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    int newReaderIndex = readerIndex + length;
    if (newReaderIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
                length, readerIndex, writerIndex));
    }
    readerIndex = newReaderIndex;
    return this;
}

ByteBuf源码分析

ByteBuf

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf是ByteBuf实现对引用进行计数的基类,用来跟踪对象的分配和销毁,实现自动内存回收。工具

  • 成员变量
    • refCntUpdater refCntUpdater是一个AtomicIntegerFieldUpdater类型的成员变量,它能够对成员变量进行原子性更新操做,达到线程安全。
    • REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是标识refCnt字段在AbstractReferenceCountedByteBuf的内存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf两个子类中都会使用到这个偏移量。
    • refCnt volatile修饰保证变量的线程可见性,用来跟踪对象的引用次数
  • 对象引用计数器
    每调用retain方法一次,引用计数器就会加一。retain方法经过自旋对引用计数器进行加一操做,引用计数器的初始值为1,只要程序是正确执行的话,它的最小值应该为1,当申请和释放次数相等的时候,对应的ByteBuf就会被回收。当次数为0时,代表对象被错误的引用,就会抛出IllegalReferenceCountException异常,若是次数等于Integer类型的最大值,就会抛出
    IllegalReferenceCountException异常。retain经过refCntUpdater的compareAndSet方法进行原子操做更新,compareAndSet会使用获取的值与指望值进行比较,若是在比较器件,有其余线程对变量进行修改,那么比较失败,会再次自旋,获取引用计数器的值再次进行比较,不然的话,就会进行加一操做,退出自旋。
    release方法的话与retain方法相似,也是经过自旋循环进行判断和更新,不过当refCnt的值等于1的时候,代表引用计数器的申请跟释放次数同样,对象引用已经不可达了,对象应该要被垃圾收集回收掉了,调用deallocate方法释放ByteBuf对象
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}
    
public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

UnpooledHeapByteBuf

UnpooledHeapByteBuf是一个非线程池实现的在堆内存进行内存分配的字节缓冲区,在每次IO操做的都会去建立一个UnpooledHeapByteBuf对象,若是频繁地对内存进行分配或者释放会对性能形成影响。源码分析

  • 成员变量
    • ByteBufAllocator 用于内存分配
    • array 字节数组做为缓冲区,用于存储字节数据
    • ByteBuffer 用来实现Netty ByteBuf 到Nio ByteBuffer的变换
  • 动态扩展缓冲区
    调用capacity方法动态扩展缓冲区,首先要对扩展容量进行校验,若是新容量的大小小于0或者大于最大可扩展容量maxCapacity的话,抛出IllegalArgumentException异常。
    经过校验以后,若是新扩展容量比原来大的话,则建立一个新的容量为新扩展容量的字节数组缓冲区,而后调用System.arraycopy进行内存复制,将旧的数据复制到新数组中去,而后用setArray进行数组替换。动态扩展以后须要原来的视图tmpNioBuffer设置为控。
    若是新的容量小于当前缓冲区容量的话,不须要进行动态扩展,可是须要截取部分数据做为子缓冲区。
    • 首先对当前的readerIndex是否小于newCapacity,若是小于的话继续对writerIndex跟newCapacity进行比较,若是writerIndex大于newCapacity的话,就将writerIndex设置为newCapacity,更新完索引以后就经过System.arrayCopy内存复制将当前可读的数据复制到新的缓冲区字节数组中。
    • 若是newCapacity小于readerIndex的话,说明没有新的可读数据要复制到新的字节数组缓冲区中,只须要把writerIndex跟readerIndex都更新为newCapacity既可,最后调用setArray更换字节数组。
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • setBytes
    字节数组复制,首先对数据进行合法性检验,若是srcIndex或者index的值小于0,就会抛出IllegalArgumentException,若是index+length的值大于capacity的值或者srcIndex+length的值大于src.length的话,就会抛出IndexOutOfBoundsException异常。经过校验以后,就调用System.arraycopy进行字节数组复制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}
  • Netty ByteBuf与Nio ByteBuffer转换
    要将Netty的ByteBuf转化为Nio ByteBuffer,在ByteBuffer中有wrap静态方法,只须要传入对应的字节数组便可建立转化为ByteBuffer,在nioBuffer方法还调用了slice方法,它能够建立一个从原ByteBuffer的position开始缓冲区,与原缓冲区共享同一段数据元素。nioBuffer方法不会重用缓冲区,只能保证writerIndex跟readerIndex的独立性。
public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

PooledByteBuf

在Netty4以后加入内存池管理,经过内存池管理比以前ByteBuf的建立性能获得了极大提升。性能

  • PoolChunk
    • Page 能够用来分配的最小内存块单位
    • Chunk page的集合

PoolChunk主要负责内存块的分配及释放,chunk中的page会构建成一颗二叉树,默认状况下page的大小是8K,chunk的大小是2^11 page,即16M,构成了11层的二叉树,最下面一层的叶子节点有8192个,与page的数目同样,每一次内存的分配必须保证连续性,方便内存操做。每一个节点会记录本身在Memory Area的偏移地址,当一个节点表示的内存区域被分配以后,那么该节点会被标志为已分配,该节点的全部子节点的内存请求都会忽略。每次内存分配的都是8k(2^n)大小的内存块,当须要分配大小为chunkSize/(2^k)的内存端时,为了找到可用的内存段,会从第K层左边开始寻找可用节点。this

  • PoolArena

在内存分配中,为了可以集中管理内存的分配及释放,同时提供分配和释放内存的性能,通常都是会先预先分配一大块连续的内存,不须要重复频繁地进行内存操做,那一大块连续的内存就叫作memory Arena,而PoolArena是Netty的内存池实现类。
在Netty中,PoolArena是由多个Chunk组成的,而每一个Chunk则由多个Page组成。PoolArena是由Chunk和Page共同组织和管理的。线程

  • PoolSubpage

当对于小于一个Page的内存分配的时候,每一个Page会被划分为大小相等的内存块,它的大小是根据第一次申请内存分配的内存块大小来决定的。一个Page只能分配与第一次内存内存的内存块的大小相等的内存块,若是想要想要申请大小不想等的内存块,只能在新的Page上申请内存分配了。
Page中的存储区域的使用状况是经过一个long数组bitmap来维护的,每一位表示一个区域的占用状况。

PooledDirectByteBuf

  • 建立字节缓冲区
    因为内存池实现,每次建立字节缓冲区的时候,不是直接new,而是从内存池中去获取,而后设置引用计数器跟读写Index,跟缓冲区最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
    PooledHeapByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}
  • 复制字节缓冲区实例
    copy方法能够复制一个字节缓冲区实例,与原缓冲区独立。
    首先要对index和length进行合法性判断,而后调用PooledByteBufAllocator的directBuffer方法分配一个新的缓冲区。newDirectBuffer方法是一个抽象方法,对于不一样的子类有不一样的实现。若是是unpooled的话,会直接建立一个新的缓冲区,若是是pooled的话,它会从内存池中获取一个可用的缓冲区。
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf copy = alloc().directBuffer(length, maxCapacity());
    copy.writeBytes(this, index, length);
    return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

ByteBuf辅助类分析

ByteBufHolder

ByteBufHolder是ByteBuf的一个容器,它能够更方便地访问ByteBuf中的数据,在使用不一样的协议进行数据传输的时候,不一样的协议消息体包含的数据格式和字段不同,因此抽象一个ByteBufHolder对ByteBuf进行包装,不一样的子类有不一样的实现,使用者能够根据本身的须要进行实现。Netty提供了一个默认实现DefaultByteBufHolder。

ByteBufAllocator

ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不一样,分为两种不一样的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不一样ByteBuf的分配方法。

CompositeByteBuf

CompositeByteBuf是一个虚拟的Buffer,它能够将多个ByteBuf组装为一个ByteBuf视图。
在Java NIO中,咱们有两种实现的方法

  • 将其余ByteBuffer的数据复制到一个ByteBuffer中,或者从新建立一个新的ByteBuffer,将其余的ByteBuffer复制到新建的ByteBuffer中。
  • 经过容器将多个ByteBuffer存储在一块儿,进行统一的管理和维护。

在Netty中,CompositeByByteBuf中维护了一个Component类型的集合。Component是ByteBuf的包装类,它聚合了ByteBuf.维护在集合中的位置偏移量等信息。通常状况下,咱们应该使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法来建立CompositeByteBuf,而不是直接经过构造函数去实例化一个CompositeByteBuf对象。

private int addComponent0(int cIndex, ByteBuf buffer) {
    checkComponentIndex(cIndex);
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    int readableBytes = buffer.readableBytes();

    // No need to consolidate - just add a component to the list.
    Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
    if (cIndex == components.size()) {
        components.add(c);
        if (cIndex == 0) {
            c.endOffset = readableBytes;
        } else {
            Component prev = components.get(cIndex - 1);
            c.offset = prev.endOffset;
            c.endOffset = c.offset + readableBytes;
        }
    } else {
        components.add(cIndex, c);
        if (readableBytes != 0) {
            updateComponentOffsets(cIndex);
        }
    }
    return cIndex;
}
private void consolidateIfNeeded() {
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        final int capacity = components.get(numComponents - 1).endOffset;
    
        ByteBuf consolidated = allocBuffer(capacity);
    
        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();
        components.add(c);
    }
}

public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        updateComponentOffsets(cIndex);
    }
    return this;
}

private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

ByteBufUtil

ByteBufUtil是ByteBuf的工具类,它提供了一系列的静态方法来操做ByteBuf。

相关文章
相关标签/搜索