其中消息的追加包含如下几个组件。咱们在KafkaProducer中调用send方法发送一个消息,在消息追加步骤,最终是将消息添加到了ByteBuffer中。java
##1、KafkaProducer #####1.一、拦截器的实现 咱们发如今send的时候,若是存在拦截器,则调用onSend方法。apache
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
onSend方法的实现很是简单,实际上这就是将注册到该Producer的拦截器进行轮询,并进行调用,从源码中咱们也能够知道,这个拦截器是有顺序要求的,解析配置文件时是依半角逗号来隔开的。安全
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; } // 解析配置时: case LIST: if (value instanceof List) return (List<?>) value; else if (value instanceof String) if (trimmed.isEmpty()) return Collections.emptyList(); else return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else throw new ConfigException(name, value, "Expected a comma separated list.");
因此咱们的配置文件能够这么写,来注册拦截器,注意,拦截器必须继承ProducerInterceptor。咱们在InterceptorPlus 中,把咱们即将发送的value强转为了int,而后为其++;app
Properties props = new Properties(); String classNames = InterceptorPlus.class.getName() + "," + InterceptorMultiply.class.getName(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ProducerTest"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classNames); // 拦截器: /** * Created by Anur IjuoKaruKas on 2018/9/5 */ public class InterceptorPlus implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { Integer val = Integer.valueOf(record.value() .toString()); String result = String.valueOf(val + 1); return new ProducerRecord(record.topic(), record.key(), result); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
#####1.二、消息分区的实现 首先,先从元数据拿到集群的信息,集群信息中的partitions是以 Map<\String, List<\PartitionInfo>>来存储的。这里根据咱们指定的topic名字来获取partitions。这里经过两种方式来肯定消息发往哪一个分区:ide
/** * Compute the partition for the given record. * 为消息计算分区 * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 先从集群中取出全部分区 int numPartitions = partitions.size(); if (keyBytes == null) { // 没有key的状况走这个分支 int nextValue = counter.getAndIncrement(); // 计数服务+1 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);// 取出可用的分区 if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part) .partition(); } else { // no partitions are available, give a non-available partition // 没有可用的分区,只能返回一个不可用的分区 return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
##2、RecordAccumulator #####2.一、双重检查锁与分段锁 ######2.1.一、分段锁 这是存在RecordAccumulator中的一段逻辑,咱们在追加消息时须要根据Topic来获取到Deque<\RecordBatch>,这后面的一系列操做,都是非线程安全的,因此在操做dq对象时,这里采用了分段锁的概念。咱们每个Topic都维护了一个Deque,它们的append操做并不互相影响,因此没必要为整个<\String /* topic */ , Deque<\RecordBatch>>加锁,只需对某个topic下的dq加锁便可。oop
######2.1.二、双重检查锁 双重检查锁,适用于先检查,后执行。咱们发现上下两段临界区的代码有一部分很像,实际上就是使用了双重检查锁,好比像下面这个简单的单例模式的建立。this
public static Singleton getInstanceDC() { if (_instance == null) { // Single Checked synchronized (Singleton.class) { if (_instance == null) { // Double checked _instance = new Singleton(); } } } return _instance; }
// check if we have an in-progress batch Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { return appendResult; } } int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }
##3、RecordBatch RecordBatch(一个批发送对象,能够存放不少条消息,它有必定的存储空间上限,当新的消息放不下时,Kafka就会在RecordAccumulator中申请空间 ByteBuffer buffer = free.allocate(size, maxTimeToBlock) )spa
咱们在调用KafkaProducer的send方法时,能够指定一个回调。这里不过多讲述RecordBatch。在消息追加时这个回调就会与RecordBatch进行绑定。.net
这个回调就是在RecordBatch层面被调用,会在正常响应、超时、或关闭生产者时调用这个。好比咱们能够在非正常响应时将消息保存在本地或者将异常日志打印出来,以便恢复之类的。线程
producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr));
每一条消息追加进来,都会生成一个新的Thunk对象,Kafka中应用了不少这种设计,例如 【仿照Kafka,实现一个简单的监听器 + 适配器吧!】。原理都是在对象中维护一个相似List<方法>之类的列表,而后在适当的时候(异常、正常、或者执行到某个步骤时)循环取出列表,并调用里面的方法。
// FutureRecordMetadata主要包含了 RecordBatch里的 ProduceRequestResult FutureRecordMetadata future = new FutureRecordMetadata( this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) { thunks.add(new Thunk(callback, future)); }
##4、MemoryRecord、Compressor 这两个对象在消息追加中算是最底层的角色了。咱们在前面申请到的空间ByteBuffer对象,就是由这二者进行维护。Compressor负责消息的写入,它生成了一个bufferStream用于向ByteBuffer写入数据。(还有一个appendStream负责压缩数据,这块还没看太明白)
建立 ByteBufferOutputStream bufferStream 十分简单,实际上就是将ByteBuffer的引用交给 bufferStream。
public class ByteBufferOutputStream extends OutputStream { private ByteBuffer buffer; public ByteBufferOutputStream(ByteBuffer buffer) { this.buffer = buffer; }
ByteBufferOutputStream 中巧妙的即是它的ByteBuffer自动扩容方法。咱们来看看最基础的一个写入
public void write(byte[] bytes, int off, int len) { if (buffer.remaining() < len) expandBuffer(buffer.capacity() + len); buffer.put(bytes, off, len); }
若是要写入的内容过大,会进行一次ByteBuffer的扩容,好比说个人RecordBatch默认都为10M,如今已经9.9M了,最后一条消息进来,若是正好大了一点,那么就依靠这个扩容方法,临时扩充一下ByteBuffer的大小。
由于前面在判断可否继续追加消息的时候,只是对消息大小进行了预估,尤为是指定了压缩方式后,这个预估可能会没那么准确,这个时候,就须要ByteBuffer的扩容机制来进行兜底。
private void expandBuffer(int size) { int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); ByteBuffer temp = ByteBuffer.allocate(expandSize); temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); buffer = temp; }
##5、Kafka的ByteBuffer内存管理 第二步中说道的RecordAccumulator中,有一个内存申请的操做,实际上就是Kafka BufferPool来完成的。
先看看BufferPool的构造方法,咱们在建立RecordAccumulator时(KafkaProducer),会传入一个参数,叫作totalMemorySize。这个totalMemorySize,是在KafkaProducer的构造方法里面获取的。
咱们能够在配置中:
指定ProducerConfig.BUFFER_MEMORY_CONFIG来配置BufferPool的大小, 指定ProducerConfig.BATCH_SIZE_CONFIG 来配置每个ByteBuffer的大小。
从构造方法和上面的分析咱们能够知道,内存的消耗随着Producer的增多而增多,ByteBuffer默认大小若是不合理,将可能致使RecordAccumulator常常性的阻塞。
-------------------- KafkaProducer 构造方法: this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); // accumulator相关配置的建立与更新 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time); -------------------- RecordAccumulator 构造方法: this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); -------------------- BufferPool 构造方法: public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); this.free = new ArrayDeque<ByteBuffer>(); this.waiters = new ArrayDeque<Condition>(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; this.time = time; this.waitTime = this.metrics.sensor("bufferpool-wait-time"); MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); }
咱们来看看ByteBuffer和核心方法,allocate。进来的第一个判断,size > this.totalMemory,若是要申请的大小直接大于ByteBuffer可持全部大小,直接抛出异常。
/** 代码1.1 */ /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available * * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) */ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) { throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); } //...... }
这上面的分支,实际上都是【有足够内存可分配的状况】
一、若是申请的内存为 poolableSize(最小分配大小) 且 private final Deque<ByteBuffer> free 列表(重复利用ByteBuffer)不为空
则直接返回重复利用的ByteBuffer
二、若是申请的大小不是poolableSize,但如今能够当即知足,则freeUp一下。
/** 代码1.2 紧接着1.1 */ this.lock.lock(); try { // check if we have a free buffer of the right size pooled // 校验是否有合适的小的空闲的buffer if (size == poolableSize && !this.free.isEmpty()) { return this.free.pollFirst(); } // now check if the request is immediately satisfiable with the // memory on hand or if we need to block // 校验如今的内存是否能够当即知足请求,或者是否须要阻塞 int freeListSize = this.free.size() * this.poolableSize; // 若是可用内存+freeList大小大于申请大小 if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request // 有足够的被释放或存放在池中的内存来当即知足请求 freeUp(size); this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else {// 如今可用的内存大小没法知足 //.....
freeUp的逻辑十分简单,实际上就是当申请的内存大于可用内存availableMemory时,从free这个ByteBuffer列表循环 将free:List<ByteBuffer>中的元素poll出来,而且将其容量赋值给availableMemory。
/** * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled * buffers (if needed) */ private void freeUp(int size) { // 当free列表不为空,而且可用内存小于申请内存时 // :: 仍是从free中取,这个取法,能够取多个 while (!this.free.isEmpty() && this.availableMemory < size) { this.availableMemory += this.free.pollLast() .capacity(); } }
除了以上两种状况(一、够用,并且有现成的ByteBuffer可供调用。二、够用,但没有现成的ByteBuffer可供调用,须要申请),只剩下第三种状况了,即不够用。这种状况可想而知,只能等待更多的,已经使用的ByteBuffer释放内存。
它能够复用freeUp嘛?固然能够。但freeUp是不够的,由于咱们知道freeUp是释放掉free:List<ByteBuffer>中的内存。而这第三种状况,即便把空余的ByteBuffer都释放掉,加上如今的可用内存availableMemory,也不够。
怎么解决?最简单的固然是写个循环,等待其余的线程把正在使用的ByteBuffer释放掉,内存就够用了。Kafka就是这么作的。
咱们来看看kafka是如何实现的:
声明一个int accumulated,这个是如今已声明的内存大小。new一个Condition,进行condition内的阻塞(await),并释放锁,且每当已声明大小还不足以达到要求,就在condition上阻塞。
假如这是咱们waiter里面的最后一个(也是第一个线程),它难道将永远阻塞?
并不,实际上咱们在释放内存时,会peek队列中第一个等待的线程进行signal,并且会将poolable的ByteBuffer塞进List<ByteBuffer>中,来实现ByteBuffer的重复利用,要知道,这里是没有释放掉ByteBuffer的,只是将ByteBuffer clear掉了。
/** 代码1.3 紧接着1.2 */ } else {// 如今可用的内存大小没法知足 // we are out of memory and will have to block int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { // 当前condition进行等待 remainingTimeToBlockNs 毫秒 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); // 统计阻塞时间 this.waitTime.record(timeNs, time.milliseconds()); } //.....
// 通知waiter队列的队头 public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) { moreMem.signal(); } } finally { lock.unlock(); } }
在被通知以后,线程又会作什么呢?
一、首先若是超时了,直接报出异常。
二、再次判断申请的空间是否是 poolableSize,若是是,则直接从free中取一个(通常也只有一个,由于deallocate是加锁了的,释放完内存就会通知waiter),固然也可能free中为空(由于释放的ByteBuffer不是Poolable大小,它会被释放到availableMemory中,进入下一步)
三、走到了这个else分支里,首先会尝试从free中释放所需内存,而后看看剩余的availableMemory是否比剩余所要申请的大。若是是,申请的空间就算已经所有知足了。 int got = (int) Math.min(size - accumulated, this.availableMemory)。但若是没申请完,就会继续进入while循环,回到await状态,继续等有人通知。
四、若是已经申请完了,出队。此时若是还有空间,会通知waiter的第一个去拿内存,并返回申请的内存。
/** 代码1.4 紧接着1.3 */ if (waitingTimeElapsed) {// 第一步 this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } // 没超时,剩余时间等于本身减去耗时 remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory // 检查free list是否能够知足请求,不知足则申请内存 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// 第二步 // just grab a buffer from the free list // 和前面是同样的,从free list中取一个出来 buffer = this.free.pollFirst(); accumulated = size; } else {// 第三步 //todo: 走到这里说明申请的大小要大于poolableSize,或者free为空 // we'll need to allocate memory, but we may only get // part of what we need on this iteration // 须要申请内存,可是在这个循环可能只能从中获取须要内存的一部分,也就是说太大了,会再获取一次 // size:要申请的大小。 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) { throw new IllegalStateException("Wrong condition: this shouldn't happen."); } // signal any additional waiters if there is more memory left // over for them // 第四步、通知其余waiters去拿内存 if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) { this.waiters.peekFirst() .signal(); } } // unlock and return the buffer lock.unlock(); if (buffer == null) {// buffer = null 表明内存时直接从free中轮询释放的 return ByteBuffer.allocate(size); } else {// buffer不为空,是直接复用free中的内存 return buffer; } }
经过上面的分析咱们知道,一旦有线程想要申请一块很大的内存的话,并且这个线程到了队头,它就成了大哥。全部释放的内存,都被分配给它,因此咱们要尽可能地去避免这种状况!
好比说咱们忽然发送一条须要占用很大内存的消息,那么对于kafka的效率来讲,将是毁灭性的!
另,若是发送的消息比较平均,且ByteBuffer的poolableSize分配合理,则能够极大地提高kafka的效率!
《Kafka技术内幕》 郑奇煌著 《Apache Kafka源码剖析》 徐郡明著