在不少的流处理框架的介绍中,都会说kafka是一个可靠的数据源,而且推荐使用Kafka看成数据源来进行使用。这是由于与其余消息引擎系统相比,kafka提供了可靠的数据保存及备份机制。而且经过消费者位移这一律念,可让消费者在因某些缘由宕机而重启后,能够轻易得回到宕机前的位置。html
但其实kafka的可靠性也只能说是相对的,在整条数据链条中,总有可让数据出现丢失的状况,今天就来讨论如何避免kafka数据丢失,以及实现精确一致处理的语义。git
在讨论如何实现kafka无消息丢失的时候,首先要先清楚大部分状况下消息丢失是在什么状况下发生的。为何是大部分,由于总有一些很是特殊的状况会被人忽略,而咱们只须要关注广泛的状况就足够了。接下来咱们来讨论如何较为广泛的数据丢失状况。github
前面介绍Kafka分区和副本的时候,有提到过一个producer客户端有一个acks的配置,这个配置为0的时候,producer是发送以后无论的,这个时候就颇有可能由于网络等缘由形成数据丢失,因此应该尽可能避免。可是将ack设置为1就没问题了吗,那也不必定,由于有可能在leader副本接收到数据,但还没同步给其余副本的时候就挂掉了,这时候数据也是丢失了。而且这种时候是客户端觉得消息发送成功,但kafka丢失了数据。算法
要达到最严格的无消息丢失配置,应该是要将acks的参数设置为-1(也就是all),而且将min.insync.replicas配置项调高到大于1,这部份内容在上一篇副本机制有介绍详细解析kafka之kafka分区和副本。编程
同时还须要使用带有回调的producer api,来发送数据。注意这里讨论的都是异步发送消息,同步发送不在讨论范围。api
public class send{ ...... public static void main(){ ... /* * 第一个参数是 ProducerRecord 类型的对象,封装了目标 Topic,消息的 kv * 第二个参数是一个 CallBack 对象,当生产者接收到 Kafka 发来的 ACK 确认消息的时候, * 会调用此 CallBack 对象的 onCompletion() 方法,实现回调功能 */ producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); ... } ...... } class DemoCallBack implements Callback { /* 开始发送消息的时间戳 */ private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 生产者成功发送消息,收到 Kafka 服务端发来的 ACK 确认消息后,会调用此回调函数 * @param metadata 生产者发送的消息的元数据,若是发送过程当中出现异常,此参数为 null * @param exception 发送过程当中出现的异常,若是发送成功为 null */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n", key, message, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } }
更详细的代码能够参考这里:Kafka生产者分析——KafkaProducer。网络
咱们以前提到过,producer发送到kafka broker的时候,是有多种可能会失败的,而回调函数能准确告诉你是否确认发送成功,固然这依托于acks和min.insync.replicas的配置。而当数据发送丢失的时候,就能够进行手动重发或其余操做,从而确保生产者发送成功。并发
有些时候,kafka内部由于一些不大好的配置,可能会出现一些极为隐蔽的数据丢失状况,那么咱们分别讨论下大体都有哪几种状况。框架
首先是replication.factor配置参数,这个配置决定了副本的数量,默认是1。注意这个参数不能超过broker的数量。说这个参数实际上是由于若是使用默认的1,或者不在建立topic的时候指定副本数量(也就是副本数为1),那么当一台机器出现磁盘损坏等状况,那么数据也就从kafka里面丢失了。因此replication.factor这个参数最好是配置大于1,好比说3。异步
接下来要说的仍是和副本相关的,也是上一篇副本中提到的unclean.leader.election.enable 参数,这个参数是在主副本挂掉,而后在ISR集合中没有副本能够成为leader的时候,要不要让进度比较慢的副本成为leader的。不用多说,让进度比较慢的副本成为leader,确定是要丢数据的。虽然可能会提升一些可用性,但若是你的业务场景丢失数据更加不能忍受,那仍是将unclean.leader.election.enable设置为false吧。
消费者丢失的状况,其实跟消费者位移处理不当有关。消费者位移提交有一个参数,enable.auto.commit,默认是true,决定是否要让消费者自动提交位移。若是开启,那么consumer每次都是先提交位移,再进行消费,好比先跟broker说这5个数据我消费好了,而后才开始慢慢消费这5个数据。
这样处理的话,好处是简单,坏处就是漏消费数据,好比你说要消费5个数据,消费了2个本身就挂了。那下次该consumer重启后,在broker的记录中这个consumer是已经消费了5个的。
因此最好的作法就是将enable.auto.commit设置为false,改成手动提交位移,在每次消费完以后再手动提交位移信息。固然这样又有可能会重复消费数据,毕竟exactly once处理一直是一个问题呀(/摊手)。遗憾的是kafka目前没有保证consumer幂等消费的措施,若是确实须要保证consumer的幂等,能够对每条消息维持一个全局的id,每次消费进行去重,固然耗费这么多的资源来实现exactly once的消费到底值不值,那就得看具体业务了。
那么到这里先来总结下无消息丢失的主要配置吧:
那么接下来就来讲说kafka实现精确一次(exactly once)处理的方法吧。
在分布式环境下,要实现消息一致与精确一次(exactly once)语义处理是很难的。精确一次处理意味着一个消息只处理一次,形成一次的效果,不能多也不能少。
那么kafka如何可以实现这样的效果呢?在介绍以前,咱们先来介绍其余两个语义,至多一次(at most once)和至少一次(at least once)。
最多一次就是保证一条消息只发送一次,这个其实最简单,异步发送一次而后无论就能够,缺点是容易丢数据,因此通常不采用。
至少一次语义是kafka默认提供的语义,它保证每条消息都能至少接收并处理一次,缺点是可能有重复数据。
前面有介绍过acks机制,当设置producer客户端的acks是1的时候,broker接收到消息就会跟producer确认。但producer发送一条消息后,可能由于网络缘由消息超时未达,这时候producer客户端会选择重发,broker回应接收到消息,但极可能最开始发送的消息延迟到达,就会形成消息重复接收。
那么针对这些状况,要如何实现精确一次处理的语义呢?
要介绍幂等的producer以前,得先了解一下幂等这个词是什么意思。幂等这个词最先起源于函数式编程,意思是一个函数不管执行多少次都会返回同样的结果。好比说让一个数加1就不是幂等的,而让一个数取整就是幂等的。由于这个特性因此幂等的函数适用于并发的场景下。
但幂等在分布式系统中含义又作了进一步的延申,好比在kafka中,幂等性意味着一个消息不管重复多少次,都会被看成一个消息来持久化处理。
kafka的producer默认是支持最少一次语义,也就是说不是幂等的,这样在一些好比支付等要求精确数据的场景会出现问题,在0.11.0后,kafka提供了让producer支持幂等的配置操做。即:
props.put("enable.idempotence", ture)
在建立producer客户端的时候,添加这一行配置,producer就变成幂等的了。注意开启幂等性的时候,acks就自动是“all”了,若是这时候手动将ackss设置为0,那么会报错。
而底层实现其实也很简单,就是对每条消息生成一个id值,broker会根据这个id值进行去重,从而实现幂等,这样一来就可以实现精确一次的语义了。
可是!幂等的producery也并不是万能。有两个主要是缺陷:
当遇到上述幂等性的缺陷没法解决的时候,能够考虑使用事务了。事务能够支持多分区的数据完整性,原子性。而且支持跨会话的exactly once处理语义,也就是说若是producer宕机重启,依旧能保证数据只处理一次。
开启事务也很简单,首先须要开启幂等性,即设置enable.idempotence为true。而后对producer发送代码作一些小小的修改。
//初始化事务 producer.initTransactions(); try { //开启一个事务 producer.beginTransaction(); producer.send(record1); producer.send(record2); //提交 producer.commitTransaction(); } catch (KafkaException e) { //出现异常的时候,终止事务 producer.abortTransaction(); }
但不管开启幂等仍是事务的特性,都会对性能有必定影响,这是必然的。因此kafka默认也并无开启这两个特性,而是交由开发者根据自身业务特色进行处理。
以上~
推荐阅读:
分布式系统一致性问题与Raft算法(上)
Scala函数式编程(五) 函数式的错误处理
大数据存储的进化史 --从 RAID 到 Hadoop Hdfs