上节解析了生产者发送消息时RecordAccumulator的相关操做,本节解析下RecordAccumulator用到的其余组件BufferPool,CopyOnWriteMap数据库
上节讲到RecordAccumulator在append的时候会申请缓冲区,每一个批次ProducerBatch在封装消息时MemoryRecordsBuilder会用到缓冲区ByteBuffer(MemoryRecordsBuilder和消息压缩这一块我以为太边缘了,策略调整之后只解析关键流程和收获比较多的部分,其余的有机会再仔细分享)。
若是高频率的建立ByteBuffer有可能会形成虚拟机的频繁GC,这里kafka使用了BufferPool,BufferPool和数据库链接池的设计理念同样,是一个池子的概念,池子受一个最大使用内存的限制,须要就从池子申请一个ByteBuffer,不够用了就阻塞等待,用好了再还给池子并清空。api
private final long totalMemory; //buffer.memory buffer容量 private final int poolableSize; //batch.size 批量大小 private final ReentrantLock lock; //防止多线程申请空间出现并发问题的重入锁 private final Deque<ByteBuffer> free; //空闲buffer队列 private final Deque<Condition> waiters; //等待分配buffer的队列 /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ private long nonPooledAvailableMemory;//没分配到空闲队列的可用容量,totalMemory = poolableSize * free
既然是个缓冲池,那么BufferPool主要提供两个api:借(allocate)和还(deallocate),这两个api流程比较简单。多线程
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { //申请的容量比pool总容量还大,抛异常 if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // 有分配好的直接返回 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 看看剩余空闲容量多少 int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // 还够的话释放掉空闲的 freeUp(size); // 没分配的容量先预扣掉 this.nonPooledAvailableMemory -= size; } else { // 剩余也不够 // accumulated 用来标记如今已经准备好了多少,若是异常了已经准备多少了得在finally里加回去,要不就泄露了 int accumulated = 0; // 设置一个条件变量,用来线程间同步用 Condition moreMemory = this.lock.newCondition(); try { //一直不够也不会一直等,设置一个最大等待时间 long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // 循环等待 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //不够就先释放锁,而后阻塞 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // 若是有线程正好释放掉了,那就直接从free弹出来一个 buffer = this.free.pollFirst(); // accumulated也得标记一下拿到了 accumulated = size; } else { // free不够,可是总剩余够了,先释放掉free里的 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); // 剩余的先扣除掉 this.nonPooledAvailableMemory -= got; // 标记一下,拿到了须要的容量 accumulated += got; } } // Don't reclaim memory on throwable since nothing was thrown //重置为0 accumulated = 0; } finally { // When this loop was not able to successfully terminate don't loose available memory //异常状况每分出去,准备好的要加回去,若是每一场的话accumulated是0 this.nonPooledAvailableMemory += accumulated; //把放在队列的条件变量弹出去,该下一个条件变量准备分配了 this.waiters.remove(moreMemory); } } } finally { // signal any additional waiters if there is more memory left // over for them try { //要是如今剩余容量没了那也别费劲signal下一个了,继续阻塞着吧 //这样会不会致使其余阻塞线程一直等下去呢?不会,由于归还的时候也会signal的 if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } if (buffer == null) return safeAllocateByteBuffer(size); else return buffer; }
还实在是没啥可解析的,简单点。并发
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
卧槽有点累了,这节分个上下吧,CopyOnWriteMap下节讲。app