Kafka添加了事务机制之后,consumer端有个须要解决的问题就是怎么样从收到的消息中滤掉aborted的消息。Kafka经过broker和consumer端的协做,利用一系列优化手段极大地下降了这部分工做的开销。服务器
首先来看一下这部分工做的难点在哪。app
对于isolation.level为read_committed的消费者来讲,它只想获取committed的消息。可是在服务器端的存储中,committed的消息、aborted的消息、以及正在进行中的事务的消息在Log里是紧挨在一块儿的,并且这些状态的消息可能源于不一样的producerId。因此,若是broker对FetchRequest的处理和加入事务机制前同样,那么consumer就须要作不少地清理工做,并且须要buffer消息直到control marker的到来。那么,就无端浪费了不少流量,并且consumer端的内存管理也很成问题。ide
Kafka大致采用了三个措施一块儿来解决这个问题。性能
Kafka添加了一个很重要概念,叫作LSO,即last stable offset。对于同一个TopicPartition,其offset小于LSO的全部transactional message的状态都已肯定,要不就是committed,要不就是aborted。而broker对于read_committed的consumer,只提供offset小于LSO的消息。这样就避免了consumer收到状态不肯定的消息,而不得不buffer这些消息。fetch
对于每一个LogSegment(对应于一个log文件),broker都维护一个aborted transaction index. 这是一个append only的文件,每当有事务被abort时,就会有一个entry被append进去。这个entry的格式是:优化
TransactionEntry => Version => int16 PID => int64 FirstOffset => int64 LastOffset => int64 LastStableOffset => int64
这涉及到FetchResponse的消息格式的变化,在FetchResponse里包含了其中每一个TopicPartition的记录里的aborted transactions的信息,consumer使用这些信息,能够更高效地从FetchResponse里包含的消息里过滤掉被abort的消息。spa
// FetchResponse v4 FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]] ThrottleTime => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64
LastStableOffset => int64
AbortedTransactions => [PID FirstOffset] PID => int64 FirstOffset => int64 MessageSetSize => int32
(如下对只针对read_committed的consumer)debug
consumer端会根据fetch response里提供的aborted transactions里过滤掉aborted的消息,只返回给用户committed的消息。code
其核心逻辑是这样的:orm
首先,因为broker只返回LSO以前的消息给consumer,因此consumer拉取的消息只有两种可能的状态:committed和aborted。
活跃的aborted transaction的pid集合
而后, 对于每一个在被fetch的消息里包含的TopicPartition, consumer维护一个producerId的集合,这个集合就是当前活跃的aborted transaction所使用的pid。一个aborted transaction是“活跃的”,是说:在过滤过程当中,当前的待处理的消息的offset处于这个这个aborted transaction的initial offset和last offset之间。有了这个活跃的aborted transaction对应的PID的集合(如下简称"pid集合"),在过滤消息时,只要看一下这个消息的PID是否在此集合中,若是是,那么消息就确定是aborted的,若是不是,那就是committed的。
这个pid集合在过滤的过程当中,是不断变化的,为了维护这个集合,consumer端还会对于每一个在被fetch的消息里包含的TopicPartition 维护一个aborted transaction构成的mini heap, 这个heap是以aborted transaction的intial offset排序的。
public static final class AbortedTransaction { public final long producerId; public final long firstOffset; ... } private class PartitionRecords { private final TopicPartition partition; private final CompletedFetch completedFetch; private final Iterator<? extends RecordBatch> batches; private final Set<Long> abortedProducerIds; private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions; ... }
//这个heap的初始化过程,能够看出是按offset排序的
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
return null;
PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
partition.abortedTransactions.size(),
new Comparator<FetchResponse.AbortedTransaction>() {
@Override
public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
return Long.compare(o1.firstOffset, o2.firstOffset);
}
}
);
abortedTransactions.addAll(partition.abortedTransactions);
return abortedTransactions;
}
按照Kafka文档里的说法:
- If the message is a transaction control message, and the status is ABORT, then remove the corresponding PID from the set of PIDs with active aborted transactions. If the status is COMMIT, ignore the message.
If the message is a normal message, compare the offset and PID with the head of the aborted transaction minheap. If the PID matches and the offset is greater than or equal to the corresponding initial offset from the aborted transaction entry, remove the head from the minheap and insert the PID into the set of PIDs with aborted transactions.
Check whether the PID is contained in the aborted transaction set. If so, discard the record set; otherwise, add it to the records to be returned to the user.
可是实际上考虑到batch的问题,状况会比这简单一些。在producer端发送的时候,同一个TopicPartition的不一样transaction的消息是不可能在同一个message batch里的, 并且committed的消息和aborted的消息也不可能在同一batch里。由于在不一样transaction的消息之间,确定会有transaction marker, 而transaction marker是单独的一个batch。这就使得,一个batch要不所有被aborted了,要不所有被committed了。因此过滤aborted transaction时就能够一次过滤一个batch,而非一条消息。
相关代码为PartitionRecords#nextFetchedRecord()中:
if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { // remove from the aborted transaction queue all aborted transactions which have begun // before the current batch's last offset and add the associated producerIds to the // aborted producer set
//从aborted transaction里移除那些其inital offset在当前的batch的末尾以前的那些。
//由于这些transaction开始于当前batch以前,而在处理这个batch以前没有结束,因此它要不是活跃的aborted transaction,要不当前的batch就是control batch
//这里须要考虑到aborted transaction可能开始于此次fetch到的全部records以前
consumeAbortedTransactionsUpTo(currentBatch.lastOffset()); long producerId = currentBatch.producerId(); if (containsAbortMarker(currentBatch)) { abortedProducerIds.remove(producerId); //若是当前batch是abort marker, 那么它对应的transaction就结束了,因此从pid集合里移除它对应的pid。 } else if (isBatchAborted(currentBatch)) { //若是当前batch被abort了,那就跳过它 log.debug("Skipping aborted record batch from partition {} with producerId {} and " + "offsets {} to {}", partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset()); nextFetchOffset = currentBatch.nextOffset(); continue; } }
经过对aborted transaction index和LSO的使用,Kafka使得consumer端能够高效地过滤掉aborted transaction里的消息,从而减少了事务机制的性能开销。