ps:这篇文章自我感受说的很大白话了!但愿大家看过了以后能有收获。java
不了解 Kafka 的朋友建议先看一看个人下面这几篇文章,第一篇必定要看,其余的能够按需学习。git
生产者(Producer) 调用send
方法发送消息以后,消息可能由于网络问题并无发送过去。github
因此,咱们不能默认在调用send
方法发送消息以后消息消息发送成功了。为了肯定消息是发送成功,咱们要判断消息发送的结果。可是要注意的是 Kafka 生产者(Producer) 使用 send
方法发送消息其实是异步的操做,咱们能够经过 get()
方法获取调用结果,可是这样也让它变为了同步操做,示例代码以下:面试
详细代码见个人这篇文章:Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 做为消息队列?apache
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
复制代码
可是通常不推荐这么作!能够采用为其添加回调函数的形式,示例代码以下:安全
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,缘由:{}", ex.getMessage()));
复制代码
若是消息发送失败的话,咱们检查失败的缘由以后从新发送便可!网络
另外这里推荐为 Producer 的retries
(重试次数)设置一个比较合理的值,通常是 3 ,可是为了保证消息不丢失的话通常会设置比较大一点。设置完成以后,当出现网络问题以后可以自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,由于间隔过小的话重试的效果就不明显了,网络波动一次你3次一会儿就重试完了异步
咱们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 经过偏移量(offset)能够保证消息在分区内的顺序性。ide
当消费者拉取到了分区的某个消息以后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,忽然挂掉了,消息实际上并无被消费,可是 offset 却被自动提交了。函数
解决办法也比较粗暴,咱们手动关闭闭自动提交 offset,每次在真正消费完消息以后以后再本身手动提交 offset 。 可是,细心的朋友必定会发现,这样会带来消息被从新消费的问题。好比你刚刚消费完消息以后,还没提交 offset,结果本身挂掉了,那么这个消息理论上就会被消费两次。
咱们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫作 leader 的家伙,其余副本称为 follower。咱们发送的消息会被发送到 leader 副本,而后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你能够理解为其余副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种状况:假如 leader 副本所在的 broker 忽然挂掉,那么就要从 follower 副本从新选出一个 leader ,可是 leader 的数据还有一些没有被 follower 副本的同步的话,就会形成消息丢失。
解决办法就是咱们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,表明咱们的消息被leader副本接收以后就算被成功发送。当咱们配置 acks = all 表明则全部副本都要接收到该消息以后该消息才算真正成功被发送。
为了保证 leader 副本能有 follower 副本能同步消息,咱们通常会为 topic 设置 replication.factor >= 3。这样就能够保证每一个 分区(partition) 至少有 3 个副本。虽然形成了数据冗余,可是带来了数据的安全性。
通常状况下咱们还须要设置 min.insync.replicas> 1 ,这样配置表明消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽可能避免默认值 1。
可是,为了保证整个 Kafka 服务的高可用性,你须要确保 replication.factor > min.insync.replicas 。为何呢?设想一下加入二者相等的话,只要是有一个副本挂掉,整个分区就没法正常工做了。这明显违反高可用性!通常推荐设置成 replication.factor = min.insync.replicas + 1。
Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改成false
咱们最开始也说了咱们发送的消息会被发送到 leader 副本,而后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步状况不同,当咱们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样下降了消息丢失的可能性。
《70k Star Java开源项目出PDF阅读版本啦!》 。
做者介绍: Github 70k Star 项目 JavaGuide(公众号同名) 做者。每周都会在公众号更新一些本身原创干货。公众hao后台回复“1”领取Java工程师必备学习资料+面试突击pdf。