一,基础讲解java
本文是基于kafka 0.10讲的,kafkaProducer模型和0.8的客户端模型大体是同样的,区别是0.8版本的会为每一个Broker(有给定topic分区leader的Broker)建立一个SyncProducer,而0.10的Producer是用一个NioSelector实现实现了多连接的维护的。也是一个后台线程进行发送。基本步骤,也是按期获取元数据,将消息按照key进行分区后归类,每一类发送到正确的Broker上去。node
再写kafka文章的缘由是0.10版本后跟spark结合有了大的变更,后面会讲解多版本的sparkStreaming和StructuredStreaming 与kafka的各类结合。因此在这里会更新两篇kafka文章:一篇关于kafka 0.10版本的Producer,另外一篇固然是kafka 0.10版本的Consumer了。为后面的文章打下基础。apache
二,重要类讲解安全
Cluster微信
表明一个当前kafka集群的nodes,topics和partitions子集网络
Selectorapp
org.apache.kafka.common.network.Selector。一个nioSelector的接口,负责非阻塞多连接网络I/O操做。该类于NetworkSend和NetworkReceive协同工做,传输大小限制的网络请求和应答。一个新的连接能够被加入到该nioSelector,固然须要配上一个id,经过调用机器学习
connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)异步
内部维护了一个java NIOSelector函数
java.nio.channels.Selector nioSelector;
NetworkClient
一个针对异步请求/应答的网络IO 的网络客户端。这是一个内部类,用来实现用户层面的生产消费者客户端。非线程安全的。
Sender
一个后台线程,主要负责发送生产请求到kafka集群。该线程会更新kafka集群的metadata,将produce Request发送到正确的节点。
RecordAccumulator
该类扮演的是一个队列的角色,将records追加到MemoryRecords实例中,用于发送到server端。
RecordBatch
一批准备发送的消息。该类是线程不安全的,须要加入外部同步加入须要修改的话。
MemoryRecords
用一个byteBuffer支撑的Records的实现。
RecordAccumulator维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
RecordBatch维护了一个MemoryRecords。
三,源码过程
1,构建必要对象的过程
用户代码里会构建一个KafkaProducer对象。
producer = new KafkaProducer<>(props);
在构造函数里活作三个重要的的事情
A),new Selector传递给NetworkClient
B),new NetworkClient
C),new Sender
D),new KafkaThread并将构建的send对象,当作该线程的runnable。并启动该线程。
E),构建了分区器和一个Metatdata。
F),构建了一个RecordAccumulator。此时须要关注的两个配置是
batch.size:批量发送的大小。
linger.ms:超时发送的时间。
合理配置两个值,有利于咱们提高kafkaProducer的性能。
2,消息加入发送队列的过程
1),用户程序里调用KafkaProducer.send发送消息
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
2),对消息按照partition策略进行分区。
//获取分区号
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
3),将消息追加到RecordAccumulator。
//将消息追加到RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
具体追加的细节,先根据topic和partition信息获取一个recordBatch,而后在获取MemoryRecords,将消息加入其中
//根据topic和partition信息获取该partition的队列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//RecordBatch类非安全,须要加外部同步
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
tryAppend内部
// 获取最后一个RecordBatch
RecordBatch last = deque.peekLast();
if (last != null) {
// 将消息追加到该RecordBatch里面
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
Last.TryAppend
// 首先会判断是否有充足的空间
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
// 将消息加入memoryRecords
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
3,消息发送的过程
1),获取Cluster
//获取当前cluster信息,
Cluster cluster = metadata.fetch();
2),获取那些有数据待发送的分区,依据是batch.size和linger.ms
//获取当前准备好发送的有数据的分区
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
3),更新那些leader未知的分区信息
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
4),移除不能发送Request的node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
//判断链接是否能发消息
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
5),转化为list格式,以node为基准,清空那些给定的node数据
清空全部给定node的数据,而后将它们放到给定适合大小的list,以单个node为基准
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
6),以node为基准,将batches转化为ProducerRequests
//以单个node为基准,将batches数据转化为ProducerRequests
List<ClientRequest> requests = createProduceRequests(batches, now);
7),发送数据
for (ClientRequest request : requests)
client.send(request, now);
8),作真正的网络读写的动做,以前会更新元数据
this.client.poll(pollTimeout, now);
四,总结
写本文的缘由是为StructuredStreaming的系列文章之kafkaSink作准备。
1,具体调优请参考kafka系列文章。
2,性能调优的参数重要的就两个
batch.size:批量发送的大小。
linger.ms:超时发送的时间。
3,具体跟0.8.2.2区别,请参考:
假如你对kafka,spark,hbase源码,spark机器学习等感兴趣,请关注浪尖公众号。一块儿探讨进步。
本文分享自微信公众号 - 浪尖聊大数据(bigdatatip)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。