最近和一些同窗交流的时候反馈说,在面试Kafka时,被问到Kafka组件组成部分、API使用、Consumer和Producer原理及做用等问题都能详细做答。可是,问到一个平时不注意的问题,就是Kafka的幂等性,被卡主了。那么,今天笔者就为你们来剖析一下Kafka的幂等性原理及实现。html
Producer在生产发送消息时,不免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。Kafka做为分布式消息系统,它的使用场景常见与分布式系统中,好比消息推送系统、业务平台系统(如物流平台、银行结算平台等)。以银行结算平台来讲,业务方做为上游把数据上报到银行结算平台,若是一份数据被计算、处理屡次,那么产生的影响会很严重。node
在使用Kafka时,须要确保Exactly-Once语义。分布式系统中,一些不可控因素有不少,好比网络、OOM、FullGC等。在Kafka Broker确认Ack时,出现网络异常、FullGC、OOM等问题时致使Ack超时,Producer会进行重复发送。可能出现的状况以下:面试
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?数据库
Kafka在引入幂等性以前,Producer向Broker发送消息,而后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程以下:apache
上图的实现流程是一种理想状态下的消息发送状况,可是实际状况中,会出现各类不肯定的因素,好比在Producer在发送给Broker的时候出现网络异常。好比如下这种异常状况的出现:缓存
上图这种状况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,可是在返回Ack信号给Producer时失败了(好比网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)从新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,而后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。网络
面对这样的问题,Kafka引入了幂等性。那么幂等性是如何解决这类重复发送消息的问题的呢?下面咱们能够先来看看流程图:架构
一样,这是一种理想状态下的发送流程。实际状况下,会有不少不肯定的因素,好比Broker在发送Ack信号给Producer时出现网络异常,致使发送失败。异常状况以下图所示:分布式
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常致使Producer接收Ack信号失败。对于Producer来讲,会触发重试机制,将消息(x2,y2)再次发送,可是,因为引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而以前Broker缓存过以前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的状况。oop
客户端在生成Producer时,会实例化以下代码:
// 实例化一个Producer对象 Producer<String, String> producer = new KafkaProducer<>(props);
在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码以下:
private void maybeWaitForPid() { if (transactionState == null) return; while (!transactionState.hasPid()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody(); transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch()); } else { log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node); } } else { log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again."); } } catch (Exception e) { log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); } log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } }
与幂等性有关的另一个特性就是事务。Kafka中的事务与数据库的事务相似,Kafka中的事务属性是指一系列的Producer生产消息和消费消息提交Offsets的操做在一个事务中,即原子性操做。对应的结果是同时成功或者同时失败。
这里须要与数据库中事务进行区别,操做数据库中的事务指一系列的增删查改,对Kafka来讲,操做事务是指一系列的生产和消费等原子性操做。
在事务属性引入以前,先引入Producer的幂等性,它的做用为:
产生的场景有:
好比,在Consumer中Commit Offsets时,当Consumer在消费完成时Commit的Offsets为100(假设最近一次Commit的Offsets为50),那么执行触发Balance时,其余Consumer就会重复消费消息(消费的Offsets介于50~100之间的消息)。
Producer提供了五种事务方法,它们分别是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代码定义在org.apache.kafka.clients.producer.Producer<K,V>接口中,具体定义接口以下:
// 初始化事务,须要注意确保transation.id属性被分配 void initTransactions(); // 开启事务 void beginTransaction() throws ProducerFencedException; // 为Consumer提供的在事务内Commit Offsets的操做 void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 提交事务 void commitTransaction() throws ProducerFencedException; // 放弃事务,相似于回滚事务的操做 void abortTransaction() throws ProducerFencedException;
在Kafka事务中,一个原子性操做,根据操做类型能够分为3种状况。状况以下:
Kafka的幂等性和事务是比较重要的特性,特别是在数据丢失和数据重复的问题上很是重要。Kafka引入幂等性,设计的原理也比较好理解。而事务与数据库的事务特性相似,有数据库使用的经验对理解Kafka的事务也比较容易接受。
这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。