源码分析Kafka之Producer

Kafka是一款很棒的消息系统,能够看看我以前写的 后端好书阅读与推荐来了解一下它的总体设计。今天咱们就来深刻了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。java

要使用kafka首先要实例化一个KafkaProducer,须要有brokerIP、序列化器必要Properties以及acks(0、一、n)、compression、retries、batch.size非必要Properties,经过这个简单的接口能够控制Producer大部分行为,实例化后就能够调用send方法发送消息了。node

核心实现是这个方法:算法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
 // intercept the record, which can be potentially modified; this method does not throw exceptions
 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
 return doSend(interceptedRecord, callback);//②
}

经过不一样的模式能够实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。sql

咱们来看看消息类ProducerRecord有哪些属性:后端

private final String topic;//主题
private final Integer partition;//分区
private final Headers headers;//头
private final K key;//键
private final V value;//值
private final Long timestamp;//时间戳

它有多个构造函数,能够适应不一样的消息类型:好比有无分区、有无key等。网络

①中ProducerInterceptors(有0 ~ 无穷多个,造成一个拦截链)对ProducerRecord进行拦截处理(好比打上时间戳,进行审计与统计等操做)架构

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
 ProducerRecord<K, V> interceptRecord = record;
 for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
 try {
 interceptRecord = interceptor.onSend(interceptRecord);
 } catch (Exception e) {
 // 不抛出异常,继续执行下一个拦截器
 if (record != null)
 log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
 else
 log.warn("Error executing interceptor onSend callback", e);
 }
 }
 return interceptRecord;
}

若是用户有定义就进行处理并返回处理后的ProducerRecord,不然直接返回自己。并发

而后②中doSend真正发送消息,而且是异步的(源码太长只保留关键):app

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
 TopicPartition tp = null;
 try {
 // 序列化 key 和 value
 byte[] serializedKey;
 try {
 serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
 } catch (ClassCastException cce) {
 }
 byte[] serializedValue;
 try {
 serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
 } catch (ClassCastException cce) {
 }
 // 计算分区得到主题与分区
 int partition = partition(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);
 // 回调与事务处理省略。
 Header[] headers = record.headers().toArray();
 // 消息追加到RecordAccumulator中
 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs);
 // 该批次满了或者建立了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法
 if (result.batchIsFull || result.newBatchCreated) {
 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
 this.sender.wakeup();
 }
 return result.future;
 } catch (Exception e) {
 // 拦截异常并抛出
 this.interceptors.onSendError(record, tp, e);
 throw e;
 }
}

下面是计算分区的方法:dom

private int partition(ProducerRecord<K, V> record, 
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
 Integer partition = record.partition();
 // 消息有分区就直接使用,不然就使用分区器计算
 return partition != null ?
 partition :
 partitioner.partition(
 record.topic(), record.key(), serializedKey,
 record.value(), serializedValue, cluster);
}

默认的分区器DefaultPartitioner实现方式是若是partition存在就直接使用,不然根据key计算partition,若是key也不存在就使用round robin算法分配partition。

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {
 private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
 int numPartitions = partitions.size();
 if (keyBytes == null) {//key为空 
 int nextValue = nextValue(topic);
 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区
 if (availablePartitions.size() > 0) {//有分区,取模就行
 int part = Utils.toPositive(nextValue) % availablePartitions.size();
 return availablePartitions.get(part).partition();
 } else {// 无分区,
 return Utils.toPositive(nextValue) % numPartitions;
 }
 } else {// key 不为空,计算key的hash并取模得到分区
 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 }
 }
 private int nextValue(String topic) {
 AtomicInteger counter = topicCounterMap.get(topic);
 if (null == counter) {
 counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
 AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
 if (currentCounter != null) {
 counter = currentCounter;
 }
 }
 return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin
 }
}

以上就是发送消息的逻辑处理,接下来咱们再看看消息发送的物理处理。

Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并经过Selector将数据发送给Broker)的wakeup方法,其实是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。

Sender的run中主要逻辑是不停执行准备消息和等待消息:

long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④

③完成消息设置并保存到信道中,而后监听感兴趣的key,由KafkaChannel实现。

public void setSend(Send send) {
 if (this.send != null)
 throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
 this.send = send;
 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// transportLayer的一种实现中的相关方法
public void addInterestOps(int ops) {
 key.interestOps(key.interestOps() | ops);
}

④主要是Selector的poll,其select被wakeup唤醒:

public void poll(long timeout) throws IOException {
 /* check ready keys */
 long startSelect = time.nanoseconds();
 int numReadyKeys = select(timeout);//wakeup使其中止阻塞
 long endSelect = time.nanoseconds();
 this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
 Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
 // Poll from channels that have buffered data (but nothing more from the underlying socket)
 if (dataInBuffers) {
 keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
 Set<SelectionKey> toPoll = keysWithBufferedRead;
 keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
 pollSelectionKeys(toPoll, false, endSelect);
 }
 // Poll from channels where the underlying socket has more data
 pollSelectionKeys(readyKeys, false, endSelect);
 // Clear all selected keys so that they are included in the ready count for the next select
 readyKeys.clear();
 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
 immediatelyConnectedKeys.clear();
 } else {
 madeReadProgressLastPoll = true; //no work is also "progress"
 }
 long endIo = time.nanoseconds();
 this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}

其中pollSelectionKeys方法会调用以下方法完成消息发送:

public Send write() throws IOException {
 Send result = null;
 if (send != null && send(send)) {
 result = send;
 send = null;
 }
 return result;
}
private boolean send(Send send) throws IOException {
 send.writeTo(transportLayer);
 if (send.completed())
 transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
 return send.completed();
}

Send是一次数据发包,通常由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,通常由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl

public long writeTo(GatheringByteChannel channel) throws IOException {
 long written = channel.write(buffers);
 if (written < 0)
 throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
 remaining -= written;
 pending = TransportLayers.hasPendingWrites(channel);
 return written;
}
public int write(ByteBuffer src) throws IOException {
 return socketChannel.write(src);
}

到此就把Producer业务相关逻辑处理非业务相关的网络 2方面的主要流程梳理清楚了。其余额外的功能是经过一些配置保证的。

好比顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1便可保证当前包未确认就不能发送下一个包从而实现有序性

public boolean canSendMore(String node) {
 Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
 return queue == null || queue.isEmpty() ||
 (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

再好比可靠性,经过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:

RequestCompletionHandler callback = new RequestCompletionHandler() {
 public void onComplete(ClientResponse response) {
 handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch
 }
 };
 /**
 * 完成或者重试投递,这里若是acks不对就会重试
 *
 * @param batch The record batch
 * @param response The produce response
 * @param correlationId The correlation id for the request
 * @param now The current POSIX timestamp in milliseconds
 */
 private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
 long now, long throttleUntilTimeMs) {
 }
 public class ProduceResponse extends AbstractResponse {
 /**
 * Possible error code:
 * INVALID_REQUIRED_ACKS (21)
 */
 }

kafka源码一层一层包装不少,错综复杂,若有错误请你们不吝赐教。

 

欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!

相关文章
相关标签/搜索