欢迎你们关注 github.com/hsfxuebao/j… ,但愿对你们有所帮助,要是以为能够的话麻烦给点一下Star哈html
今天把 Kafka Producer 最后一部分给讲述一下,Producer 大部份内容都已经在前面几篇文章介绍过了,这里简单作个收尾,但并非对前面的总结,本文从两块来说述:RecordAccumulator 类的实现、Kafka Producer 如何保证其顺序性以及 Kafka Producer 的配置说明,每一个 Producer 线程都会有一个 RecordAccumulator 对象,它负责缓存要发送 RecordBatch、记录发送的状态而且进行相应的处理,这里会详细讲述 Kafka Producer 如何保证单 Partition 的有序性。最后,简单介绍一下 Producer 的参数配置说明,只有正确地理解 Producer 相关的配置参数,才能更好地使用 Producer,发挥其相应的做用。java
这里再看一下 RecordAccumulator 的数据结构,以下图所示,每一个 topic-partition 都有一个对应的 deque,deque 中存储的是 RecordBatch,它是发送的基本单位,只有这个 topic-partition 的 RecordBatch 达到大小或时间要求才会触发发送操做(但并非只有达到这两个条件之一才会被发送,这点要理解清楚)。node
再看一下 RecordAccumulator 类的主要方法介绍,以下图所示。github
RecordAccumulator 主要方法及其说明apache
这张图基本上涵盖了 RecordAccumulator 的主要方法,下面会选择其中几个方法详细讲述,会围绕着 Kafka Producer 如何实现单 Partition 顺序性这个主题来说述。缓存
先看下 mutePartition()
与 unmutePartition()
这两个方法,它们是保证有序性关键之一,其主要作用就是将指定的 topic-partition 从 muted 集合中加入或删除,后面会看到它们的做用。markdown
private final Set<TopicPartition> muted;
public void mutePartition(TopicPartition tp) {
muted.add(tp);
}
public void unmutePartition(TopicPartition tp) {
muted.remove(tp);
}
复制代码
这里先说一下这两个方法调用的条件,这样的话,下面在介绍其余方法时才会更容易理解:数据结构
mutePartition()
:若是要求保证顺序性,那么这个 tp 对应的 RecordBatch 若是要开始发送,就将这个 tp 加入到 muted
集合中;unmutePartition()
:若是 tp 对应的 RecordBatch 发送完成,tp 将会从 muted
集合中移除。也就是说,muted
是用来记录这个 tp 是否有还有未完成的 RecordBatch。less
ready()
是在 Sender 线程中调用的,其做用选择那些能够发送的 node,也就是说,若是这个 tp 对应的 batch 能够发送(达到时间或大小要求),就把 tp 对应的 leader 选出来。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 若是 mute 就不会遍历
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
//note: 是不是在重试
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.isFull(); //note: batch 满了
boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超时
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);// note: 将能够发送的 leader 添加到集合中
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
复制代码
能够看到这一行 (!readyNodes.contains(leader) && !muted.contains(part))
,若是 muted
集合包含这个 tp,那么在遍历时将不会处理它对应的 deque,也就是说,若是一个 tp 加入了 muted
集合中,即便它对应的 RecordBatch 能够发送了,也不会触发引发其对应的 leader 被选择出来。
drain()
是用来遍历可发送请求的 node,而后再遍历在这个 node 上全部 tp,若是 tp 对应的 deque 有数据,将会被选择出来直到超过一个请求的最大长度(max.request.size
)为止,也就说说即便 RecordBatch 没有达到条件,但为了保证每一个 request 尽快多地发送数据提升发送效率,这个 RecordBatch 依然会被提早选出来并进行发送。
//note: 返回该 node 对应的能够发送的 RecordBatch 的 batches,并从 queue 中移除(最大的大小为maxSize,超过的话,下次再发送)
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
Set<Node> nodes,
int maxSize,
long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// Only proceed if the partition has no in-flight batches.
if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不会被遍历
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.close();
size += batch.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
复制代码
在遍历 node 的全部 tp 时,能够看到是有条件的 —— !muted.contains(tp)
,若是这个 tp 被添加到 muted
集合中,那么它将不会被遍历,也就不会做为 request 一部分被发送出去,这也就保证了 tp 若是还有未完成的 RecordBatch,那么其对应 deque 中其余 RecordBatch 即便达到条件也不会被发送,就保证了 tp 在任什么时候刻只有一个 RecordBatch 在发送。
是否保证顺序性,仍是在 Sender 线程中实现的,mutePartition()
与 unmutePartition()
也都是在 Sender 中调用的,这里看一下 KafkaProducer 是如何初始化一个 Sender 对象的。
// from KafkaProducer
this.sender = new Sender(client,
this.metadata,
his.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs);//NOTE: Sender 实例,发送请求的后台线程
// from Sender
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
this.guaranteeMessageOrder = guaranteeMessageOrder;
this.maxRequestSize = maxRequestSize;
this.running = true; //note: 默认为 true
this.acks = acks;
this.retries = retries;
this.time = time;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
复制代码
对于上述过程能够这样进行解读
this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)
若是 KafkaProducer 的 max.in.flight.requests.per.connection
设置为1,那么就能够保证其顺序性,不然的话,就不保证顺序性,从下面这段代码也能够看出。
//from Sender
//note: max.in.flight.requests.per.connection 设置为1时会保证
if (guaranteeMessageOrder) {
// Mute all the partitions draine
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
复制代码
也就是说,若是要保证单 Partition 的顺序性,须要在 Producer 中配置 max.in.flight.requests.per.connection=1
,而其实现机制则是在 RecordAccumulator 中实现的。
这里是关于 Kafka Producer 一些配置的说明,内容来自官方文档Producer Configs以及本身的一些我的理解,这里以官方文档保持一致,按其重要性分为三个级别进行讲述(涉及到权限方面的参数,这里先不介绍)。
下面的这些参数虽然被描述为 medium,但实际上对 Producer 的吞吐量等影响也一样很大,在实践中跟 high 参数的重要性基本同样。
至此,Kafka Producer 部分的源码分析已经结束,从下周开始将开始对 Kafka Consumer 部分进行分析。对于不一样的场景,合理配置相应的 Kafka Producer 参数。
转自:Kafka 源码分析系列