Kafka 的 Lag 计算误区及正确实现

前言

消息堆积是消息中间件的一大特点,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积实际上是一把亦正亦邪的双刃剑,若是应用场合不恰当反而会对上下游的业务形成没必要要的麻烦,好比消息堆积势必会影响上下游整个调用链的时效性,有些中间件如RabbitMQ在发生消息堆积时在某些状况下还会影响自身的性能。对于Kafka而言,虽然消息堆积不会对其自身性能带来多大的困扰,但不免不会影响上下游的业务,堆积过多有可能会形成磁盘爆满,或者触发日志清除策略而形成消息丢失的状况。如何利用好消息堆积这把双刃剑,监控是最为关键的一步。java

正文

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。对于Kafka而言,消息被发送至Topic中,而Topic又分红了多个分区(Partition),每个Partition都有一个预写式的日志文件,虽然Partition能够继续细分为若干个段文件(Segment),可是对于上层应用来讲能够将Partition当作最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每一个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。咱们来看下图,其就是Partition的一个真实写照: node

上图中有四个概念:bootstrap

  1. LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增长以及日志清除策略的影响,这个值会阶段性的增大。
  2. ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  3. HighWatermark:简称HW,表明消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  4. LogEndOffset:简称LEO, 表明Partition的最高日志位移,其值对消费者不可见。好比在ISR(In-Sync-Replicas)副本数等于3的状况下(以下图所示),消息发送到Leader A以后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新本身,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。因为B、C拉取A消息之间延时问题,因此HW必然不会一直与Leader的LEO相等,即LEO>=HW。

要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,而后针对每一个Topic来计算其中每一个Partition的Lag,每一个Partition的Lag计算就显得很是的简单了,参考下图:bash

由图可知消费Lag=HW - ConsumerOffset。对于这里你们有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值。LEO是对消费者不可见的,既然不可见何来消费滞后一说。app

那么这里就引入了一个新的问题,HW和ConsumerOffset的值如何获取呢? 性能

首先来讲说ConsumerOffset,Kafka中有两处能够存储,一个是Zookeeper,而另外一个是”**consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,可是随着版本的迭代更新,如今愈来愈趋向于后者。就拿1.0.0版原本说,虽然默认是存储在”**consumer_offsets”中,可是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,而后哪一个有值的取哪一个。不过这里还有一个问题,对于消费位移来讲,其通常不会实时的更新,而更多的是定时更新,这样能够提升总体的性能。那么这个定时的时间间隔就是ConsumerOffset的偏差区间之一。ui

再来讲说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中能够看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,可是这个值不是LEO而是HW。spa

那么怎样正确的计算消费的Lag呢?对Kafka熟悉的同窗可能会想到Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例以下:线程

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
复制代码

咱们深究一下kafka-consumer_groups.sh脚本,发现只有一句代码:scala

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
复制代码

其含义就是执行kafka.admin.ConsumerGroupCommand而已。进一步深究,在ConsumerGroupCommand内部抓住了2句关键代码:

val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()
复制代码

代码详解:consumerGroupService的类型是ConsumerGroupServicesealed trait类型),而KafkaConsumerGroupService只是ConsumerGroupService的一种实现,还有一种实现是ZkConsumerGroupService,分别对应新版的消费方式(消费位移存储在__consumer_offsets中)和旧版的消费方式(消费位移存储在zk中),详细计算步骤参考下一段落的内容。opt参数是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等参数。第2句代码是调用describeGroup()方法来获取具体的信息,即二元组中的assignments,这个assignments中保存了上面打印信息中的全部内容。

Scala小知识: 在Scala中trait(特征)至关于Java的接口,实际上它比接口更大强大。与Java中的接口不一样的是,它还能够定义属性和方法的实现(JDK8起的接口默认方法)。通常状况下Scala中的类只能继承单一父类,可是若是是trait的话就能够继承多个,从结果来看是实现了多重继承。被sealed声明的trait仅能被同一文件的类继承。

ZkConsumerGroupService中计算消费lag的步骤以下:

  1. 经过zk获取一些基本信息,对应上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不过不会有HOST和CLIENT-ID。
  2. 经过OffsetFetchRequest请求获取消费位移(offset),若是获取失败则在经过zk获取。
  3. 经过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO,可见的LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

KafkaConsumerGroupService中计算消费lag的步骤以下:

  1. 经过DescibeGroupsRequest请求获取一些基本信息,不只包括TOPIC、PARTITION、CONSUMER-ID,还有HOST和CLIENT-ID。其实还有经过 FindCoordinatorRequest请求来获取coordinator信息,若是不了解coordinator在这里也没影响。
  2. 经过OffsetFetchRequest请求获取消费位移。
  3. 经过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

能够看到KafkaConsumerGroupService与ZkConsumerGroupService的计算Lag的方式都差很少,可是KafkaConsumerGroupService能获取更多消费详情,而且ZkConsumerGroupService也被标注为@Deprecated的了,后面内容都针对KafkaConsumerGroupService来作说明。既然Kafka已经为咱们提供了线程的方法来获取Lag,那么咱们有何须再重复造轮子,这里笔者写了一个调用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala语言编写的,在Java的程序里使用相似scala.collection.Seq这样的全名称以防止混淆):

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand
        .PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();
scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();
while (iterable.hasNext()) {
    ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
    System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
            pas.topic().get(), pas.partition().get(), pas.offset().get(),
            pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
            pas.host().get(), pas.clientId().get()));
}
复制代码

在使用时,你能够封装一下这段代码而后返回一个相似List<ConsumerGroupCommand.PartitionAssignmentState>的东西给上层业务代码作进一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代码以下:

case class PartitionAssignmentState(
  group: String, coordinator: Option[Node], topic: Option[String],
  partition: Option[Int], offset: Option[Long], lag: Option[Long],
  consumerId: Option[String], host: Option[String],
  clientId: Option[String], logEndOffset: Option[Long])
复制代码

Scala小知识: 对于case class, 在这里你能够简单的把它当作是一个JavaBean,可是它远比JavaBean强大,好比它会自动生成equals、hashCode、toString、copy、伴生对象、apply、unapply等等东西。在 scala 中,对保护(Protected)成员的访问比 java 更严格一些。由于它只容许保护成员在定义了该成员的的类的子类中被访问。而在java中,用protected关键字修饰的成员,除了定义了该成员的类的子类能够访问,同一个包里的其余类也能够进行访问。Scala中,若是没有指定任何的修饰符,则默认为 public。这样的成员在任何地方均可以被访问。

若是你正在试着运行上面一段程序,你会发现编译失败,报错:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。这时候须要将所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState类前面的protected修饰符去掉才能编译经过。

相关文章
相关标签/搜索