咱们讨论可靠性时,通常使用保证这个词,它是确保系统在各类不一样的环境下可以发生一致的行为。Kafka能够在哪些方面做出保证呢?数据库
1.Kafka能够保证分区消息的顺序apache
2.只有消息被写入分区的全部同步副本时,它才会被认为是已提交的。生产者能够选择接收不一样类型的确认。服务器
3.只要还有一个副本是活跃的,那么提交的消息就不会丢失。网络
4.消费者只能读取已经提交的消息。架构
Kafka的管理员和开发者能够在配置参数上做出权衡,从而获得它们想要的可靠性,这种权衡通常是指消息存储的可靠性和一致性的重要程度与可用性,高吞吐量,低延迟和硬件成本的重要程度之间的权衡。框架
6.2复制异步
Kafka的复制机制和分区的多副本架构师Kafka可靠性保证的核心。把消息写入多个副本可以使Kafka在发生崩溃时仍能保证消息的持久性。工具
Kafka的主题被分为多个分区,分区是基本的数据块,分区存储在单个磁盘上,Kafka能够保证分区里的事件是有序的,分区能够在线(可用),也能够离线(不可用)。每一个分区能够有多个副本,其中一个副本是首领,全部的事件都直接发送给首领副本,或者从首领副本读取事件。其它副本只须要与首领保持同步,并及时复制最新的事件,当首领副本不可用时,其中一个副本会成为新的首领。性能
分区首领时同步副本,对于跟踪者副原本说,它须要知足如下条件才能被认为是同步的。测试
1.与zookeeper之间有一个活跃的会话,(过去6秒(可配置)向zookeeper发送过心跳)。
2.在过去10秒(可配置)从首领获取过消息。
3.过去10秒获取过最新的消息,光从首领获取消息是不够的,还必须是几乎零延迟的。
若是不能知足以上任何一点,那么就被认为是不一样步的。
一个滞后的同步副本会致使生产者和消费者变慢,由于在消息被认为已提交以前,客户端会等待全部同步副本接收消息。
6.3broker配置
broker有三个配置参数会影响Kafka的消息存储的可靠性。与其它配置参数同样,它们能够应用在broker级别,用于控制全部主题的行为,也能够应用在主题级别,用于控制特定主题的行为。
在主题级别控制可靠性,意味着Kafka集群能够同时拥有可靠的主题和非可靠的主题。
6.3.1复制系数
主题级别的配置参数是replication.factor,而在broker级别能够经过default.repllication.factor来配置自动建立的主题。
咱们假定主题的复制系数是3,也就是说每一个分区总共会被3个不一样的broker复制三次。即便在主题建立以后,也能够经过新增或移除副原本改变复制系数。
更高的复制系数会带来更好的可用性,可靠性和更少的故障。
副本的分布也很重要,默认状况下,Kafka会确保分区的每一个副本都被放在不一样的broker上,若是这些broker处在同一个机架上,一旦机架的交换机发生故障,分区就会不可用,因此咱们建议把broker分布在多个不一样的机架上。并使用broker.reck参数来为每一个broker配置所在机架的名字。若是配置了机架名字,Kafka会保证分区的副本被分布在多个机架上,从而得到更高的可用性。
若是首领不可用时,其它副本都是不一样步的,若是把unclean.leader.election.enable设为true,就是容许不一样步的副本称为首领,那么咱们将面临丢失消息的风险。若是把这个参数设为false,就要等待原先的首领从新上线,从而下降了可用性。银行系统通常会禁用这种不彻底的首领选举(把这个参数设为false)。
6.3.3最少同步副本
min.insync.replicas
根据Kafka对可靠性保证的定义,消息只有在被写入到全部同步副本以后才会被认为是已提交的。若是两个副本都变为不可用,那么broker都会中止接受生产者的请求。尝试发送数据的生产者就会收到NoEnoughReplicasException异常。但消费者仍然能够继续读取已有的数据。
6.4在可靠的系统里使用生产者
每一个使用Kafka的开发人员必须注意两件事:
1.根据可靠性需求配置恰当的acks值
2.在参数配置和代码里正确处理错误。
6.4.1发送确认
生产者能够选择如下三种确认模式。
acks=0意味着若是生产者经过网络把消息发送出去,就认为消息已经成功写入Kafka,这种状况下有可能发生错误。可是速度是最快的。
acks=1意味着首领在收到消息并把它写入到分区数据文件就会返回确认信息,这个状况下,若是发生正常的首领选举,生产者会收到异常,若是生产者能恰当处理异常,选择重发,最终消息仍然能够发送成功,但这个模式仍然可能发生丢失数据,好比消息成功写入首领,在复制到跟随者副本以前,首领发生崩溃。
acks=all,意味着首领在返回确认以前,会等待全部同步副本收到消息。若是和min.insync.replicas参数结合起来,就能够决定在返回确认前至少有多少副本可以收到消息。这是最保险的作法——生产者会一直重试直到消息被成功提交。不过生产者在继续发送其它消息以前须要等待全部的副本都收到当前消息。虽然能够经过使用异步模式和更大的批次加快速度,但这样一般会下降吞吐量。
6.4.2配置生产者的重试参数
生产者须要处理的错误包含两部分:一部分是生产者能够自动处理的错误,还有一部分是须要开发者手动处理的错误。
若是broker返回的错误能够经过重试来解决,那么生产者会自动处理这些错误。生产者向broker发送消息,broker能够返回一个成功响应码或者一个错误响应码。错误响应码能够分为两种:
一种是重试以后能够解决的,还有一种是没法经过重试解决的。
通常状况下,若是目标是不丢失任何消息,那么最好在生产者遇到可重试错误时可以保持重试。Kafka的跨数据中心复制工具默认会进行无限制的重试。做为一个高可用性的复制工具,它毫不会丢失消息。
要注意重试可能会致使消息重复,例如因为网络问题消息写入后生产者没有收到确认消息,生产者重试。这种状况下broker会收到两个相同的消息。
6.4.3额外的错误处理
使用生产者内置的重试机制能够在不形成消息丢失的状况下轻松处理大部分错误,不过对于开发人员来讲,仍然须要处理其它类型的错误,包括:
不可重试的broker错误,例如消息大小错误,认证错误;
在消息发送以前的错误,如序列化错误;
在生产者达到重试次数上限时或在消息占用的内存达到上限时发生的错误。
若是错误处理只是为了重发,那么最好使用生产者内置的重试机制。
6.5在可靠的系统里使用消费者
只有那些被提交到Kafka的数据,也就是已经被写入到全部同步副本的数据,对消费者时可用的,这意味着消费者获得的消息已经具有了一致性。消费者惟一要作的事跟踪哪些消息是已经读取过的,哪些是尚未读取过的。这是在读取消息时不丢失消息的关键。
从分区读取数据时,消费者会获取一批事件,检查其中最大的偏移量,而后从这个偏移量开始读取另一批事件,这样能够保证消费者总能以正确的顺序获取新数据,不会错过任何事件。
若是消费者提交了偏移量却未能处理完消息,那么就可能形成消息丢失,这也是消费者丢失消息的主要缘由,所以咱们会很是重视偏移量的提交时间点和方法。
已提交偏移量是指消费者发送给Kafka的偏移量,用于确认它已经收到并处理好的消息位置。
6.5.1消费者的可靠性配置
为了保证消费者的可靠性,有4个很是重要的配置参数。
若是两个消费者具备相同的group.id,而且订阅了同一个主题,那么每一个消费者会分到主题分区的一个子集,若是但愿消费者能够看到主题的全部消息,那么须要为它们设置惟一的group.id
2.auto.offset.reset
这个参数指定了没有偏移量可提交时,消费者作什么?有两种配置:earliest消费者从分区的开始位置读取数据,latest消费者从分区的末尾读取数据,这样能够减小重复处理消息,但可能错过一些消息。
3.enable.auto.commit
可让消费者自动提交偏移量,也能够在代码里手动提交偏移量。自动提交的好处是实现消费者逻辑时能够少考虑一些问题。缺点是没法控制重复处理消息,若是把消息交给另一个后台线程去处理,自动提交机制可能在消息尚未处理完毕就提交偏移量。
4.auto.commit.interval.ms
与第三个参数有直接联系,若是选择了自动提交,能够经过该参数配置自动提交的频度,默认是5秒提交一次。
6.5.2显式提交偏移量
1.老是在处理完事件后再提交偏移量
若是全部处理在轮询里完成,而且不须要在轮询之间维护状态,那么可使用自动提交,或者轮询结束后进行自动提交。
2.提交频率是性能和重复消息数量之间的权衡
能够在一个循环屡次提交偏移量,或者多个循环只提交一次,这取决于你在性能和重复之间的权衡
3.确保对提交的偏移量内心有数
处理完消息后再提交偏移量是很是关键的——不然会致使消费者错过消息
4.再均衡
通常要在分区被撤销以前提交偏移量,并在分配到新分区时清理以前的状态。
5.消费者可能须要重试
假设要把Kafka数据写到数据库,不过那时数据库不可用,想稍后重试,要注意,你提交的是偏移量,不是确认,能够采用两种办法解决:
一是提交最后一个处理成功的偏移量,而后把尚未处理的消息保存到缓冲区,调用消费者的pause方法来确保其它轮询不会返回数据,在保持轮询的同时尝试从新处理,若是重试成功,或者重试次数到达上限并决定放弃,那么把错误消息记录下来并丢弃消息,而后调用resume方法让消费者从轮询继续获取新数据。
二是遇到可重试错误时,把错误写入一个独立主题,而后继续。一个独立的消费者群组复制从该主题读取错误消息,并进行重试,或者使用其中一个消费者同时从该主题读取错误并进行重试,不太重试时须要暂停该主题。这个模式有点像dead-letter-queue.
6.消费者可能须要维护状态
若是须要在多个轮询之间维护状态,有一个办法是提交偏移量同时把最近计算的结果写到一个主题上,消费者线程重启以后,它就能够拿到最近的平均数接着计算,这个问题很复杂,建议使用KafkaStream。
7.长时间处理
若是处理数据须要很长时间,暂停轮询不能超过几秒,即便不想获取更多数据,也要保持轮询,这样客户端才能向broker发送心跳。这种状况下,一种常见的作法是使用一个线程池来处理数据,由于使用多个线程能够并行处理,从而加快速度。把数据交给线程池处理后,就能够暂停消费者而后保持轮询,但不获取新数据,直到工做线程处理完成。继续获取新数据,由于消费者保持轮询,心跳就会正常,就不会发生在均衡。
8.仅一次传递
实现仅一次处理最简单最经常使用的办法是把结果写到一个支持惟一键的系统里,好比键值存储引擎,关系数据库,elasticSearch或者其余数据存储引擎。这种状况下,要么消息自己包含一个惟一键(一般是这样),要么使用主题,分区和偏移量的组合建立一个惟一键——它们的组合能够惟一标识一个Kafka记录。若是出现重复记录,只须要覆盖原来的消息便可,就像没有出现重复数据同样。这个模式叫作幂等姓写入,它是一种很常见也颇有用的模式。
若是写入消息的系统支持事务,最简单的使用关系数据库,hdfs有一些被从新定义过的原子操做也常常用来达到相同目的。咱们把消息和偏移量放在同一个事物里,这样它们就能保持同步。消费者重启时,就会获取最近被处理过的消息的偏移量。而后调用seek方法从该偏移量继续读取数据。
6.6验证系统的可靠性
建议作三个层面的验证——配置验证,应用程序验证和生产环境的应用程序监控。
6.6.1验证配置
从应用程序能够很容易对broker和客户端配置进行验证,有如下两方面缘由:
1.验证配置是否知足你的需求
2.帮助理解系统的行为,了解对Kafka基本准则的理解是否存在误差,而后改进,理解这些准则是如何被应用到各类场景中的。
Kafka提供了两个重要工具验证配置:org.apache.kafka.tools包里的VerifiableProducer和VerifiableConsumer这两个类。咱们能够从命令行容许这两个类,或者把它们嵌入到自动化测试框架。
VerifiableProducer生成一系列消息,这些消息包含从1到你指定的某个数字,你可使用与生产者相同的方式配置VerifiableProducer,在运行VerifiableProducer时,它会把每一个消息是否成功发送到broker的结果打印出来。
VerifiableConsumer执行另外一种检查,它读取事件,并按照顺序打印这些事件。他也会打印已提交的偏移量和再均衡的相关信息。
能够考虑如下测试:
1.首领选举,生产者和消费者恢复正常须要多久?
2.控制器选举,重启控制器系统须要多久恢复?
3.依次重启,能够重启broker而不丢失数据吗?
4.不彻底首领选举测试,若是依次中止全部副本,而后启动一个不一样步的broker会发生什么?要怎么恢复正常?这样作能够接受吗?
Kafka代码库包含了大量测试用例,它们使用VerifiableProducer和VerifiableConsumer来确保迭代的版本可以正常工做。
6.6.2应用程序验证
应用程序的验证包含检查自定义的错误处理代码,偏移量提交方式,再均衡监听器和其它使用了Kafka客户端的地方。
建议作以下测试:
1.客户端从服务器断开链接
2.首领选举
3.依次重启broker
4.依次重启消费者
5.依次重启生产者
看测试结果是否符合预期。
6.6.3在生产环境监控可靠性
首先Kafka的Java客户端包含了JMX度量指标,这些指标能够用于监控客户端的状态和事件。对于生产者来讲最重要的指标时消息的error-rate和retry-rate,若是两个指标上升,说明系统出现了问题。
除此以外,还要监控生产者日志——发送消息的错误日志被设为WARN级别。
对于消费者来讲最重要的指标时consumer-lag,这个指标代表了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。理想状况下,该指标老是0,消费者总能读到最新的消息。不过实际中,poll方法会返回不少消息,所以该指标会有波动,关键是确保消费者最终会遇上去,而不是越落越远。
监控数据流是为了确保全部生成的数据会被及时读取,为了确保数据可以及时读取,须要知道数据是何时生成的。0.10.0版本的Kafka在消息里增长了时间戳,代表了消息的生成时间。若是使用的是更早的客户端,建议在消息里加入时间戳,应用程序名字和机器名,这样有助于诊断问题。
为了确保全部消息可以在合理时间被读取,应用程序须要记录生成消息的数量,而消费者须要记录已经读取消息的数量以及消息生成时间到当前时间的时间差
而后须要工具来比较生产者和消费者记录的消息数量(为了确保没有丢失消息),确保这二者之间的时间差不会超出咱们容许的范围。