Kafka科普系列 | Kafka中的事务是什么样子的?

事务,对于你们来讲可能并不陌生,好比数据库事务、分布式事务,那么Kafka中的事务是什么样子的呢?数据库

在说Kafka的事务以前,先要说一下Kafka中幂等的实现。幂等和事务是Kafka 0.11.0.0版本引入的两个特性,以此来实现EOS(exactly once semantics,精确一次处理语义)。缓存

幂等,简单地说就是对接口的屡次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能以后就能够避免这种状况。bash

开启幂等性功能的方式很简单,只须要显式地将生产者客户端参数enable.idempotence设置为true便可(这个参数的默认值为false)。微信

Kafka是如何具体实现幂等的呢?Kafka为此引入了producer id(如下简称PID)和序列号(sequence number)这两个概念。每一个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是彻底透明的。session

对于每一个PID,消息发送到的每个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将对应的序列号的值加1。分布式

broker端会在内存中为每一对维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new = SN_old + 1)时,broker才会接收它。ide

若是SN_new< SN_old + 1,那么说明消息被重复写入,broker能够直接将其丢弃。若是SN_new> SN_old + 1,那么说明中间有数据还没有写入,出现了乱序,暗示可能有消息丢失,这个异常是一个严重的异常。spa

引入序列号来实现幂等也只是针对每一对而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。幂等性不能跨多个分区运做,而事务能够弥补这个缺陷。.net

事务能够保证对多个分区写入操做的原子性。操做的原子性是指多个操做要么所有成功,要么所有失败,不存在部分红功、部分失败的可能。设计

为了使用事务,应用程序必须提供惟一的transactionalId,这个transactionalId经过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,所以经过将transactional.id参数设置为非空从而开启事务特性的同时须要将enable.idempotence设置为true(若是未显式设置,则KafkaProducer默认会将它的值设置为true),若是用户显式地将enable.idempotence设置为false,则会报出ConfigException的异常。

transactionalId与PID一一对应,二者之间所不一样的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。

另外,为了保证新的生产者启动后具备相同transactionalId的旧生产者可以当即失效,每一个生产者经过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。若是使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报错。

从生产者的角度分析,经过事务,Kafka能够保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。

前者表示具备相同transactionalId的新生产者实例被建立且工做的时候,旧的且拥有相同transactionalId的生产者实例将再也不工做。

后者指当某个生产者实例宕机后,新的生产者实例能够保证任何未完成的旧事务要么被提交(Commit),要么被停止(Abort),如此可使新的生产者实例从一个正常的状态开始工做。

KafkaProducer提供了5个与事务相关的方法,详细以下:

void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId)
        throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
复制代码

initTransactions()方法用来初始化事务;beginTransaction()方法用来开启事务;sendOffsetsToTransaction()方法为消费者提供在事务内的位移提交的操做;commitTransaction()方法用来提交事务;abortTransaction()方法用来停止事务,相似于事务回滚。

在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用能够看到(消费到)未提交的事务,固然对于已提交的事务也是可见的。

这个参数还能够设置为“read_committed”,表示消费端应用不能够看到还没有提交的事务内的消息。

举个例子,若是生产者开启事务并向某个分区值发送3条消息msg一、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行commitTransaction()方法以后它才能将这些消息推送给消费端应用。反之,若是生产者执行了abortTransaction()方法,那么KafkaConsumer会将这些缓存的消息丢弃而不推送给消费端应用。

在这里插入图片描述

日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT和ABORT,分别用来表征事务已经成功提交或已经被成功停止。

RecordBatch中attributes字段的第6位用来标识当前消息是不是控制消息。若是是控制消息,那么这一位会置为1,不然会置为0,如上图所示。

attributes字段中的第5位用来标识当前消息是否处于事务中,若是是事务中的消息,那么这一位置为1,不然置为0。因为控制消息也处于事务中,因此attributes字段的第5位和第6位都被置为1。

在这里插入图片描述
KafkaConsumer能够经过这个控制消息来判断对应的事务是被提交了仍是被停止了,而后结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端应用,如上图所示。注意ControlBatch对消费端应用不可见。

咱们在上一篇Kafka科普系列中还讲过LSO——《Kafka科普系列 | 什么是LSO》,它与Kafka的事务有着密切的联系,看着下图,你回忆起来了嘛。

在这里插入图片描述


欢迎支持笔者小册:《图解Kafka之实战指南》和《图解Kafka之核心原理


欢迎支持笔者新做:《深刻理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

相关文章
相关标签/搜索