注意:配置基于Kafka 0.8.2.1html
broker配置java
#非负整数,用于惟一标识broker
broker.id 0算法
#kafka持久化数据存储的路径,能够指定多个,以逗号分隔
log.dirs /tmp/kafka-logssql
#broker接收链接请求的端口
port 9092apache
#指定zk链接字符串,[hostname:port]以逗号分隔
zookeeper.connectbootstrap
#单条消息最大大小控制,消费端的最大拉取大小须要略大于该值
message.max.bytes 1000000数组
#接收网络请求的线程数
num.network.threads 3缓存
#用于执行请求的I/O线程数
num.io.threads 8服务器
#用于各类后台处理任务(如文件删除)的线程数
background.threads 10网络
#待处理请求最大可缓冲的队列大小
queued.max.requests 500
#配置该机器的IP地址
host.name
#默认分区个数
num.partitions 1
#分段文件大小,超事后会轮转
log.segment.bytes 1024 * 1024 * 1024
#日志没达到大小,若是达到这个时间也会轮转
log.roll.{ms,hours} 168
#日志保留时间
log.retention.{ms,minutes,hours}
#不存在topic的时候是否自动建立
auto.create.topics.enable true
#partition默认的备份因子
default.replication.factor 1
#若是这个时间内follower没有发起fetch请求,被认为dead,从ISR移除
replica.lag.time.max.ms 10000
#若是follower相比leader落后这么多以上消息条数,会被从ISR移除
replica.lag.max.messages 4000
#从leader能够拉取的消息最大大小
replica.fetch.max.bytes 1024 * 1024
#从leader拉取消息的fetch线程数
num.replica.fetchers 1
#zk会话超时时间
zookeeper.session.timeout.ms 6000
#zk链接所用时间
zookeeper.connection.timeout.ms
#zk follower落后leader的时间
zookeeper.sync.time.ms 2000
#是否开启topic能够被删除的方式
delete.topic.enable false
producer配置
#参与消息确认的broker数量控制,0表明不须要任何确认 1表明须要leader replica确认 -1表明须要ISR中全部进行确认
request.required.acks 0
#从发送请求到收到ACK确认等待的最长时间(超时时间)
request.timeout.ms 10000
#设置消息发送模式,默认是同步方式, async异步模式下容许消息累计到必定量或一段时间又另外线程批量发送,吞吐量好但丢失数据风险增大
producer.type sync
#消息序列化类实现方式,默认是byte[]数组形式
serializer.class kafka.serializer.DefaultEncoder
#kafka消息分区策略实现方式,默认是对key进行hash
partitioner.class kafka.producer.DefaultPartitioner
#对发送的消息采起的压缩编码方式,有none|gzip|snappy
compression.codec none
#指定哪些topic的message须要压缩
compressed.topics null
#消息发送失败的状况下,重试发送的次数 存在消息发送是成功的,只是因为网络致使ACK没收到的重试,会出现消息被重复发送的状况
message.send.max.retries 3
#在开始从新发起metadata更新操做须要等待的时间
retry.backoff.ms 100
#metadata刷新间隔时间,若是负值则失败的时候才会刷新,若是0则每次发送后都刷新,正值则是一种周期行为
topic.metadata.refresh.interval.ms 600 * 1000
#异步发送模式下,缓存数据的最长时间,以后便会被发送到broker
queue.buffering.max.ms 5000
#producer端异步模式下最多缓存的消息条数
queue.buffering.max.messages 10000
#0表明队列没满的时候直接入队,满了当即扔弃,-1表明无条件阻塞且不丢弃
queue.enqueue.timeout.ms -1
#一次批量发送须要达到的消息条数,固然若是queue.buffering.max.ms达到的时候也会被发送
batch.num.messages 200
consumer配置
#指明当前消费进程所属的消费组,一个partition只能被同一个消费组的一个消费者消费
group.id
#针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息
fetch.message.max.bytes 1024 * 1024
#是否自动周期性提交已经拉取到消费端的消息offset
auto.commit.enable true
#自动提交offset到zookeeper的时间间隔
auto.commit.interval.ms 60 * 1000
#消费均衡的重试次数
rebalance.max.retries 4
#消费均衡两次重试之间的时间间隔
rebalance.backoff.ms 2000
#当从新去获取partition的leader前须要等待的时间
refresh.leader.backoff.ms 200
#若是zookeeper上没有offset合理的初始值状况下获取第一条消息开始的策略smallest|largeset
auto.offset.reset largest
#若是其超时,将会可能触发rebalance并认为已经死去
zookeeper.session.timeout.ms 6000
#确认zookeeper链接创建操做客户端能等待的最长时间
zookeeper.connection.timeout.ms 6000
1.maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
2.kafka生产者代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
*
* @author FromX
*
*/
public class KProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "slave1.com:6667,slave2.com:6667,slave3.com:6667");
//ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,可是最可靠。
props.put("acks", "1");
//retries,若是请求失败,生产者会自动重试,咱们指定是0次,若是启用重试,则会有重复消息的可能性。
props.put("retries", 0);
//producer缓存每一个分区未发送消息,缓存的大小是经过batch.size()配置设定的。值较大的话将会产生更大的批。并须要更多的内存(由于每一个“活跃”的分区都有一个缓冲区)
props.put("batch.size", 16384);
//默认缓冲区可当即发送,即使缓冲区空间没有满;可是,若是你想减小请求的数量,能够设置linger.ms大于0.这将指示生产者发送请求以前等待一段时间
//但愿更多的消息补填到未满的批中。这相似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,由于咱们设置了linger时间为1ms,而后,若是咱们
//没有填满缓冲区,这个设置将增长1ms的延迟请求以等待更多的消息。须要注意的是,在高负载下,相近的时间通常也会组成批,即便是linger.ms=0。
//不处于高负载的状况下,若是设置比0大,以少许的延迟代价换取更少的,更有效的请求。
props.put("linger.ms", 1);
//buffer.memory控制生产者可用的缓存总量,若是消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其余发送调用将被阻塞,阻塞时间的阈值
//经过max.block.ms设定,以后他将抛出一个TimeoutExecption。
props.put("buffer.memory", 33554432);
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置kafka的分区数量
props.put("kafka.partitions", 12);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++){
System.out.println("key-->key"+i+" value-->vvv"+i);
producer.send(new ProducerRecord<String, String>("aaa", "key"+i, "vvv"+i));
Thread.sleep(1000);
}
producer.close();
}
}
3.kafka消费者代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
*
* @author FromX
*
*/
public class KConsumer {
public KafkaConsumer<String, String> getConsmer() {
Properties props = new Properties();
//设置kafka服务器
props.put("bootstrap.servers", "c1.wb3.com:6667,n1.wb1.com:6667");
//消费者群组ID,发布-订阅模式,即若是一个生产者,多个消费者都要消费,那么须要定义本身的群组,同一个群组内的消费者只有一个能消费到消息
props.put("group.id", "test");
//true,消费者的偏移量将在后台按期提交;false关闭自动提交位移,在消息被完整处理以后再手动提交位移
props.put("enable.auto.commit", "true");
//如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
props.put("auto.commit.interval.ms", "1000");
//session.timeout.ms:在使用kafka的组管理时,用于检测消费者故障的超时
props.put("session.timeout.ms", "30000");
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
public static void main(String[] args) {
KConsumer kconsumer = new KConsumer();
KafkaConsumer<String, String> consumer = kconsumer.getConsmer();
consumer.subscribe(Arrays.asList("aaa"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("offset = "+record.offset()+", key = "+record.key()+", value = "+ record.value());
}
}
}
4.官网文档地址:http://kafka.apache.org/documentation.html#configuration
5.极限状况的数据丢失现象
a:即便将ack设置为"all"也会在必定状况下丢失消息,由于kafka的高性能特性,消息在写入kafka时并无落盘而是写入了OS buffer中,使用Os的脏页刷新策略周期性落盘,就算落盘 仍然会有raid buffer。前者机器宕机数据丢失,后者机器跳电数据丢失。
b:对数据可靠性比较高的场景建议offset手动提交,自动提交当遇到业务系统上线关闭时,消息读取而且offset已经提交,可是数据没有储存或者仍没来得及消费时,消息状态在内存中没法保留,重启应用会跳过消息,导致消息丢失。
---------------------
未分类
description: 本文介绍了Kafka实现事务性的几个阶段——正好一次语义与原子操做。以后详细分析了Kafka事务机制的实现原理,并介绍了Kafka如何处理事务相关的异常状况,如Transaction Coordinator宕机。最后介绍了Kafka的事务机制与PostgreSQL的MVCC以及Zookeeper的原子广播实现事务的异同
本文全部Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本。
Kafka事务机制的实现主要是为了支持
Exactly Once
即正好一次语义Exactly Once
《Kafka背景及架构介绍》一文中有说明Kafka在0.11.0.0以前的版本中只支持At Least Once
和At Most Once
语义,尚不支持Exactly Once
语义。
可是在不少要求严格的场景下,如使用Kafka处理交易数据,Exactly Once
语义是必须的。咱们能够经过让下游系统具备幂等性来配合Kafka的At Least Once
语义来间接实现Exactly Once
。可是:
所以,Kafka自己对Exactly Once
语义的支持就很是必要。
操做的原子性是指,多个操做要么所有成功要么所有失败,不存在部分红功部分失败的可能。
实现原子性操做的意义在于:
上文提到,实现Exactly Once
的一种方法是让下游系统具备幂等处理特性,而在Kafka Stream中,Kafka Producer自己就是“下游”系统,所以若是能让Producer具备幂等处理特性,那就可让Kafka Stream在必定程度上支持Exactly once
语义。
为了实现Producer的幂等语义,Kafka引入了Producer ID
(即PID
)和Sequence Number
。每一个新的Producer在初始化的时候会被分配一个惟一的PID,该PID对用户彻底透明而不会暴露给用户。
对于每一个PID,该Producer发送数据的每一个<Topic, Partition>
都对应一个从0开始单调递增的Sequence Number
。
相似地,Broker端也会为每一个<PID, Topic, Partition>
维护一个序号,而且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,若是其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,不然将其丢弃:
InvalidSequenceNumber
DuplicateSequenceNumber
上述设计解决了0.11.0.0以前版本中的两个问题:
上述幂等设计只能保证单个Producer对于同一个<Topic, Partition>
的Exactly Once
语义。
另外,它并不能保证写操做的原子性——即多个写操做,要么所有被Commit要么所有不被Commit。
更不能保证多个读写操做的的原子性。尤为对于Kafka Stream应用而言,典型的操做便是从某个Topic消费数据,通过一系列转换后写回另外一个Topic,保证从源Topic的读取与向目标Topic的写入的原子性有助于从故障中恢复。
事务保证可以使得应用程序将生产数据和消费数据看成一个原子单元来处理,要么所有成功,要么所有失败,即便该生产或消费跨多个<Topic, Partition>
。
另外,有状态的应用也能够保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)惟一的ID,也即Transaction ID
。Transactin ID
与PID
可能一一对应。区别在于Transaction ID
由用户提供,而PID
是内部的实现对用户透明。
另外,为了保证新的Producer启动后,旧的具备相同Transaction ID
的Producer即失效,每次Producer经过Transaction ID
拿到PID的同时,还会获取一个单调递增的epoch。因为旧的Producer的epoch比新Producer的epoch小,Kafka能够很容易识别出该Producer是老的Producer并拒绝其请求。
有了Transaction ID
后,Kafka可保证:
Transaction ID
的新的Producer实例被建立且工做时,旧的且拥有相同Transaction ID
的Producer将再也不工做。须要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤为是不能保证全部被某事务Commit过的全部消息都被一块儿消费,由于:
这一节所说的事务主要指原子性,也即Producer将多条消息做为一个事务批量发送,要么所有成功要么所有失败。
为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator
,用于管理Producer发送的消息的事务性。
该Transaction Coordinator
维护Transaction Log
,该log存于一个内部的Topic内。因为Topic数据具备持久性,所以事务的状态也具备持久性。
Producer并不直接读写Transaction Log
,它与Transaction Coordinator
通讯,而后由Transaction Coordinator
将该事务的状态插入相应的Transaction Log
。
Transaction Log
的设计与Offset Log
用于保存Consumer的Offset相似。
许多基于Kafka的应用,尤为是Kafka Stream应用中同时包含Consumer和Producer,前者负责从Kafka中获取消息,后者负责将处理完的数据写回Kafka的其它Topic中。
为了实现该场景下的事务的原子性,Kafka须要保证对Consumer Offset的Commit与Producer对发送消息的Commit包含在同一个事务中。不然,若是在两者Commit中间发生异常,根据两者Commit的顺序可能会形成数据丢失和数据重复:
At Least Once
语义,可能形成数据重复。At Most Once
语义,可能形成数据丢失。为了区分写入Partition的消息被Commit仍是Abort,Kafka引入了一种特殊类型的消息,即Control Message
。该类消息的Value内不包含任何应用相关的数据,而且不会暴露给应用程序。它只用于Broker与Client间的内部通讯。
对于Producer端事务,Kafka以Control Message的形式引入一系列的Transaction Marker
。Consumer便可经过该标记断定对应的消息被Commit了仍是Abort了,而后结合该Consumer配置的隔离级别决定是否应该将该消息返回给应用程序。
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事务,包括结束该Transaction ID对应的未完成的事务(若是有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();
// 开始事务
producer.beginTransaction();
// 消费数据
ConsumerRecords<String, String> records = consumer.poll(100);
try{
// 发送数据
producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
// 发送消费数据的Offset,将上述数据消费与数据发送归入同一个Transaction内
producer.sendOffsetsToTransaction(offsets, "group1");
// 数据发送及Offset发送均成功的状况下,提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 数据发送或者Offset发送出现异常时,终止事务
producer.abortTransaction();
} finally {
// 关闭Producer和Consumer
producer.close();
consumer.close();
}
Transaction Coordinator
因为Transaction Coordinator
是分配PID和管理事务的核心,所以Producer要作的第一件事情就是经过向任意一个Broker发送FindCoordinator
请求找到Transaction Coordinator
的位置。
注意:只有应用程序为Producer配置了Transaction ID
时才可以使用事务特性,也才须要这一步。另外,因为事务性要求Producer开启幂等特性,所以经过将transactional.id
设置为非空从而开启事务特性的同时也须要经过将enable.idempotence
设置为true来开启幂等特性。
找到Transaction Coordinator
后,具备幂等特性的Producer必须发起InitPidRequest
请求以获取PID。
注意:只要开启了幂等特性即必须执行该操做,而无须考虑该Producer是否开启了事务特性。
* 若是事务特性被开启 *
InitPidRequest
会发送给Transaction Coordinator
。若是Transaction Coordinator
是第一次收到包含有该Transaction ID
的InitPidRequest请求,它将会把该<TransactionID, PID>
存入Transaction Log
,如上图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即便Transaction Coordinator
宕机该对应关系也不会丢失。
除了返回PID外,InitPidRequest
还会执行以下任务:
注意:InitPidRequest
的处理过程是同步阻塞的。一旦该调用正确返回,Producer便可开始新的事务。
另外,若是事务特性未开启,InitPidRequest
可发送至任意Broker,而且会获得一个全新的惟一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。
Kafka从0.11.0.0版本开始,提供beginTransaction()
方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但Transaction Coordinator
只有在Producer发送第一条消息后才认为事务已经开启。
这一阶段,包含了整个事务的数据处理过程,而且包含了多种请求。
AddPartitionsToTxnRequest
一个Producer可能会给多个<Topic, Partition>
发送数据,给一个新的<Topic, Partition>
发送数据前,它须要先向Transaction Coordinator
发送AddPartitionsToTxnRequest
。
Transaction Coordinator
会将该<Transaction, Topic, Partition>
存于Transaction Log
内,并将其状态置为BEGIN
,如上图中步骤4.1所示。有了该信息后,咱们才能够在后续步骤中为每一个Topic, Partition>
设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。
另外,若是该<Topic, Partition>
为该事务中第一个<Topic, Partition>
,Transaction Coordinator
还会启动对该事务的计时(每一个事务都有本身的超时时间)。
ProduceRequest
Producer经过一个或多个ProduceRequest
发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number
。该过程如上图中步骤4.2所示。
AddOffsetsToTxnRequest
为了提供事务性,Producer新增了sendOffsetsToTransaction
方法,该方法将多组消息的发送和消费放入同一批处理内。
该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。如果,直接跳到下一步;若不是,则向Transaction Coordinator
发送AddOffsetsToTxnRequests
请求,Transaction Coordinator
将对应的全部<Topic, Partition>
存于Transaction Log
中,并将其状态记为BEGIN
,如上图中步骤4.3所示。该方法会阻塞直到收到响应。
TxnOffsetCommitRequest
做为sendOffsetsToTransaction
方法的一部分,在处理完AddOffsetsToTxnRequest
后,Producer也会发送TxnOffsetCommit
请求给Consumer Coordinator
从而将本事务包含的与读操做相关的各<Topic, Partition>
的Offset持久化到内部的__consumer_offsets
中,如上图步骤4.4所示。
在此过程当中,Consumer Coordinator
会经过PID和对应的epoch来验证是否应该容许该Producer的该请求。
这里须要注意:
__consumer_offsets
的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset还没有Commit,也即对应的消息还没有被完成处理。Consumer Coordinator
并不会当即更新缓存中相应<Topic, Partition>
的Offset,由于此时这些更新操做还没有被COMMIT或ABORT。一旦上述数据写入操做完成,应用程序必须调用KafkaProducer
的commitTransaction
方法或者abortTransaction
方法以结束当前事务。
EndTxnRequest
commitTransaction
方法使得Producer写入的数据对下游Consumer可见。abortTransaction
方法经过Transaction Marker
将Producer写入的数据标记为Aborted
状态。下游的Consumer若是将isolation.level
设置为READ_COMMITTED
,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。
不管是Commit仍是Abort,Producer都会发送EndTxnRequest
请求给Transaction Coordinator
,并经过标志位标识是应该Commit仍是Abort。
收到该请求后,Transaction Coordinator
会进行以下操做
PREPARE_COMMIT
或PREPARE_ABORT
消息写入Transaction Log
,如上图中步骤5.1所示WriteTxnMarker
请求以Transaction Marker
的形式将COMMIT
或ABORT
信息写入用户数据日志以及Offset Log
中,如上图中步骤5.2所示COMPLETE_COMMIT
或COMPLETE_ABORT
信息写入Transaction Log
中,如上图中步骤5.3所示补充说明:对于commitTransaction
方法,它会在发送EndTxnRequest
以前先调用flush方法以确保全部发送出去的数据都获得相应的ACK。对于abortTransaction
方法,在发送EndTxnRequest
以前直接将当前Buffer中的事务性消息(若是有)所有丢弃,但必须等待全部被发送但还没有收到ACK的消息发送完成。
上述第二步是实现将一组读操做与写操做做为一个事务处理的关键。由于Producer写入的数据Topic以及记录Comsumer Offset的Topic会被写入相同的Transactin Marker
,因此这一组读操做与写操做要么所有COMMIT要么所有ABORT。
WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest
由Transaction Coordinator
发送给当前事务涉及到的每一个<Topic, Partition>
的Leader。收到该请求后,对应的Leader会将对应的COMMIT(PID)
或者ABORT(PID)
控制信息写入日志,如上图中步骤5.2所示。
该控制消息向Broker以及Consumer代表对应PID的消息被Commit了仍是被Abort了。
这里要注意,若是事务也涉及到__consumer_offsets
,即该事务中有消费数据的操做且将该消费的Offset存于__consumer_offsets
中,Transaction Coordinator
也须要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest
从而写入COMMIT(PID)
或COMMIT(PID)
控制信息。
写入最终的COMPLETE_COMMIT
或COMPLETE_ABORT
消息
写完全部的Transaction Marker
后,Transaction Coordinator
会将最终的COMPLETE_COMMIT
或COMPLETE_ABORT
消息写入Transaction Log
中以标明该事务结束,如上图中步骤5.3所示。
此时,Transaction Log
中全部关于该事务的消息所有能够移除。固然,因为Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时再也不保留。
另外,COMPLETE_COMMIT
或COMPLETE_ABORT
的写入并不须要获得全部Rreplica的ACK,由于若是该消息丢失,能够根据事务协议重发。
补充说明,若是参与该事务的某些<Topic, Partition>
在被写入Transaction Marker
前不可用,它对READ_COMMITTED
的Consumer不可见,但不影响其它可用<Topic, Partition>
的COMMIT或ABORT。在该<Topic, Partition>
恢复可用后,Transaction Coordinator
会从新根据PREPARE_COMMIT
或PREPARE_ABORT
向该<Topic, Partition>
发送Transaction Marker
。
PID
与Sequence Number
的引入实现了写操做的幂等性At Least Once
语义实现了单一Session内的Exactly Once
语义Transaction Marker
与PID
提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性Transaction Marker
)来实现事务中涉及的全部读写操做同时对外可见或同时对外不可见InvalidProducerEpoch
这是一种Fatal Error,它说明当前Producer是一个过时的实例,有Transaction ID
相同但epoch更新的Producer实例被建立并使用。此时Producer会中止并抛出Exception。
InvalidPidMapping
Transaction Coordinator
没有与该Transaction ID
对应的PID。此时Producer会经过包含有Transaction ID
的InitPidRequest
请求建立一个新的PID。
NotCorrdinatorForGTransactionalId
该Transaction Coordinator
不负责该当前事务。Producer会经过FindCoordinatorRequest
请求从新寻找对应的Transaction Coordinator
。
InvalidTxnRequest
违反了事务协议。正确的Client实现不该该出现这种Exception。若是该异常发生了,用户须要检查本身的客户端实现是否有问题。
CoordinatorNotAvailable
Transaction Coordinator
仍在初始化中。Producer只须要重试便可。
DuplicateSequenceNumber
发送的消息的序号低于Broker预期。该异常说明该消息已经被成功处理过,Producer能够直接忽略该异常并处理下一条消息
InvalidSequenceNumber
这是一个Fatal Error,它说明发送的消息中的序号大于Broker预期。此时有两种可能
max.inflight.requests.per.connection
被强制设置为1,而acks
被强制设置为all。故前面消息重试期间,后续消息不会被发送,也即不会发生乱序。而且只有ISR中全部Replica都ACK,Producer才会认为消息已经被发送,也即不存在Broker端数据丢失问题。InvalidTransactionTimeout
InitPidRequest
调用出现的Fatal Error。它代表Producer传入的timeout时间不在可接受范围内,应该中止Producer并报告给用户。
Transaction Coordinator
失败PREPARE_COMMIT/PREPARE_ABORT
前失败Producer经过FindCoordinatorRequest
找到新的Transaction Coordinator
,并经过EndTxnRequest
请求发起COMMIT
或ABORT
流程,新的Transaction Coordinator
继续处理EndTxnRequest
请求——写PREPARE_COMMIT
或PREPARE_ABORT
,写Transaction Marker
,写COMPLETE_COMMIT
或COMPLETE_ABORT
。
PREPARE_COMMIT/PREPARE_ABORT
后失败此时旧的Transaction Coordinator
可能已经成功写入部分Transaction Marker
。新的Transaction Coordinator
会重复这些操做,因此部分Partition中可能会存在重复的COMMIT
或ABORT
,但只要该Producer在此期间没有发起新的事务,这些重复的Transaction Marker
就不是问题。
COMPLETE_COMMIT/ABORT
后失败旧的Transaction Coordinator
可能已经写完了COMPLETE_COMMIT
或COMPLETE_ABORT
但在返回EndTxnRequest
以前失败。该场景下,新的Transaction Coordinator
会直接给Producer返回成功。
transaction.timeout.ms
当Producer失败时,Transaction Coordinator
必须可以主动的让某些进行中的事务过时。不然没有Producer的参与,Transaction Coordinator
没法判断这些事务应该如何处理,这会形成:
Transaction Coordinator
须要维护大量的事务状态,大量占用内存Transaction Log
内也会存在大量数据,形成新的Transaction Coordinator
启动缓慢READ_COMMITTED
的Consumer须要缓存大量的消息,形成没必要要的内存浪费甚至是OOMTransaction ID
不一样的Producer交叉写同一个Partition,当一个Producer的事务状态不更新时,READ_COMMITTED
的Consumer为了保证顺序消费而被阻塞为了不上述问题,Transaction Coordinator
会周期性遍历内存中的事务状态Map,并执行以下操做
BEGIN
而且其最后更新时间与当前时间差大于transaction.remove.expired.transaction.cleanup.interval.ms
(默认值为1小时),则主动将其终止:1)未避免原Producer临时恢复与当前终止流程冲突,增长该Producer对应的PID的epoch,并确保将该更新的信息写入Transaction Log
;2)以更新后的epoch回滚事务,从而使得该事务相关的全部Broker都更新其缓存的该PID的epoch从而拒绝旧Producer的写操做PREPARE_COMMIT
,完成后续的COMMIT流程————向各<Topic, Partition>
写入Transaction Marker
,在Transaction Log
内写入COMPLETE_COMMIT
PREPARE_ABORT
,完成后续ABORT流程Transaction ID
某Transaction ID
的Producer可能很长时间再也不发送数据,Transaction Coordinator
不必再保存该Transaction ID
与PID
等的映射,不然可能会形成大量的资源浪费。所以须要有一个机制探测再也不活跃的Transaction ID
并将其信息删除。
Transaction Coordinator
会周期性遍历内存中的Transaction ID
与PID
映射,若是某Transaction ID
没有对应的正在进行中的事务而且它对应的最后一个事务的结束时间与当前时间差大于transactional.id.expiration.ms
(默认值是7天),则将其从内存中删除并在Transaction Log
中将其对应的日志的值设置为null从而使得Log Compact可将其记录删除。
Kafka的事务机制与《MVCC PostgreSQL实现事务和多版本并发控制的精华》一文中介绍的PostgreSQL经过MVCC实现事务的机制很是相似,对于事务的回滚,并不须要删除已写入的数据,都是将写入数据的事务标记为Rollback/Abort从而在读数据时过滤该数据。
Kafka的事务机制与《分布式事务(一)两阶段提交及JTA》一文中所介绍的两阶段提交机制看似类似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不一样。
PREPARE_COMMIT
仍是PREPARE_ABORT
,而且只须在Transaction Log
中标记便可,无须其它组件参与。而两阶段提交的PREPARE须要发送给全部的分布式事务参与方,而且事务参与方须要尽量准备好,并根据准备状况返回Prepared
或Non-Prepared
状态给事务管理器。PREPARE_COMMIT
或PREPARE_ABORT
,则肯定该事务最终的结果应该是被COMMIT
或ABORT
。而分布式事务中,PREPARE后由各事务参与方返回状态,只有全部参与方均返回Prepared
状态才会真正执行COMMIT,不然执行ROLLBACKTransaction Coordinator
实例,而分布式事务中只有一个事务管理器Zookeeper的原子广播协议与两阶段提交以及Kafka事务机制有类似之处,但又有各自的特色
Transaction Coordinator
实例,扩展性较好。而Zookeeper写操做只能在Leader节点进行,因此其写性能远低于读性能。