【Kafka】Exactly Once语义与事务

Kafka在0.11.0.0以前的版本中只支持At Least OnceAt Most Once语义,尚不支持Exactly Once语义。服务器

可是在不少要求严格的场景下,如使用Kafka处理交易数据,Exactly Once语义是必须的。咱们能够经过让下游系统具备幂等性来配合Kafka的At Least Once语义来间接实现Exactly Once。可是:设计

  • 该方案要求下游系统支持幂等操做,限制了Kafka的适用场景
  • 实现门槛相对较高,须要用户对Kafka的工做机制很是了解
  • 对于Kafka Stream而言,Kafka自己便是本身的下游系统,但Kafka在0.11.0.0版本以前不具备幂等发送能力

所以,Kafka自己对Exactly Once语义的支持就很是必要。code

操做原子性

操做的原子性是指,多个操做要么所有成功要么所有失败,不存在部分红功部分失败的可能。事务

实现原子性操做的意义在于:get

  • 操做结果更可控,有助于提高数据一致性
  • 便于故障恢复。由于操做是原子的,从故障中恢复时只须要重试该操做(若是原操做失败)或者直接跳过该操做(若是原操做成功),而不须要记录中间状态,更不须要针对中间状态做特殊处理

实现事务机制的几个阶段

幂等性发送

上文提到,实现Exactly Once的一种方法是让下游系统具备幂等处理特性,而在Kafka Stream中,Kafka Producer自己就是“下游”系统,所以若是能让Producer具备幂等处理特性,那就可让Kafka Stream在必定程度上支持Exactly once语义。kafka

为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每一个新的Producer在初始化的时候会被分配一个惟一的PID,该PID对用户彻底透明而不会暴露给用户。it

对于每一个PID,该Producer发送数据的每一个<Topic, Partition>都对应一个从0开始单调递增的Sequence Numberio

相似地,Broker端也会为每一个<PID, Topic, Partition>维护一个序号,而且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,若是其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,不然将其丢弃:ast

  • 若是消息序号比Broker维护的序号大一以上,说明中间有数据还没有写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 若是消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

上述设计解决了0.11.0.0以前版本中的两个问题:原理

  • Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,形成数据重复
  • 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,形成数据乱序

事务性保证

上述幂等设计只能保证单个Producer对于同一个<Topic, Partition>Exactly Once语义。

另外,它并不能保证写操做的原子性——即多个写操做,要么所有被Commit要么所有不被Commit。

更不能保证多个读写操做的的原子性。尤为对于Kafka Stream应用而言,典型的操做便是从某个Topic消费数据,通过一系列转换后写回另外一个Topic,保证从源Topic的读取与向目标Topic的写入的原子性有助于从故障中恢复。

事务保证可以使得应用程序将生产数据和消费数据看成一个原子单元来处理,要么所有成功,要么所有失败,即便该生产或消费跨多个<Topic, Partition>

另外,有状态的应用也能够保证重启后从断点处继续处理,也即事务恢复。

为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)惟一的ID,也即Transaction IDTransactin IDPID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。

另外,为了保证新的Producer启动后,旧的具备相同Transaction ID的Producer即失效,每次Producer经过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。因为旧的Producer的epoch比新Producer的epoch小,Kafka能够很容易识别出该Producer是老的Producer并拒绝其请求。

有了Transaction ID后,Kafka可保证:

  • 跨Session的数据幂等发送。当具备相同Transaction ID的新的Producer实例被建立且工做时,旧的且拥有相同Transaction ID的Producer将再也不工做。
  • 跨Session的事务恢复。若是某个应用实例宕机,新的实例能够保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工做。

事务机制原理

事务性消息传递

这一节所说的事务主要指原子性,也即Producer将多条消息做为一个事务批量发送,要么所有成功要么所有失败。

为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。

Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。因为Topic数据具备持久性,所以事务的状态也具备持久性。

Producer并不直接读写Transaction Log,它与Transaction Coordinator通讯,而后由Transaction Coordinator将该事务的状态插入相应的Transaction Log

Transaction Log的设计与Offset Log用于保存Consumer的Offset相似。

事务中Offset的提交

许多基于Kafka的应用,尤为是Kafka Stream应用中同时包含Consumer和Producer,前者负责从Kafka中获取消息,后者负责将处理完的数据写回Kafka的其它Topic中。

为了实现该场景下的事务的原子性,Kafka须要保证对Consumer Offset的Commit与Producer对发送消息的Commit包含在同一个事务中。不然,若是在两者Commit中间发生异常,根据两者Commit的顺序可能会形成数据丢失和数据重复:

  • 若是先Commit Producer发送数据的事务再Commit Consumer的Offset,即At Least Once语义,可能形成数据重复。
  • 若是先Commit Consumer的Offset,再Commit Producer数据发送事务,即At Most Once语义,可能形成数据丢失。

总结

  • PIDSequence Number的引入实现了写操做的幂等性
  • 写操做的幂等性结合At Least Once语义实现了单一Session内的Exactly Once语义
  • Transaction MarkerPID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
  • Offset的更新标记了消息是否被读取,从而将对读操做的事务处理转换成了对写(Offset)操做的事务处理
  • Kafka事务的本质是,将一组写操做(若是有)对应的消息与一组读操做(若是有)对应的Offset的更新进行一样的标记(即Transaction Marker)来实现事务中涉及的全部读写操做同时对外可见或同时对外不可见
  • Kafka只提供对Kafka自己的读写操做的事务性,不提供包含外部系统的事务性

出处:http://www.jasongj.com/kafka/transaction/

相关文章
相关标签/搜索