涨姿式了解一下Kafka消费位移可好?

摘要:Kafka中的位移是个极其重要的概念,由于数据一致性、准确性是一个很重要的语义,咱们都不但愿消息重复消费或者丢失。而位移就是控制消费进度的大佬。本文就详细聊聊kafka消费位移的那些事,包括:java

概念剖析

kafka的两种位移

关于位移(Offset),其实在kafka的世界里有两种位移:数据库

  • 分区位移:生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表征。假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、一、…、9;编程

  • 消费位移:消费者须要记录消费进度,即消费到了哪一个分区的哪一个位置上,这是消费者位移(Consumer Offset)。bootstrap

注意,这和上面所说的消息在分区上的位移彻底不是一个概念。上面的“位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不一样,它多是随时变化的,毕竟它是消费者消费进度的指示器。api

消费位移

消费位移,记录的是 Consumer 要消费的下一条消息的位移,切记,是下一条消息的位移! 而不是目前最新消费消息的位移异步

假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。函数

至于为何要有消费位移,很好理解,当 Consumer 发生故障重启以后,就可以从 Kafka 中读取以前提交的位移值,而后从相应的位移处继续消费,从而避免整个消费过程重来一遍。就好像书签同样,须要书签你才能够快速找到你上次读书的位置。学习

那么了解了位移是什么以及它的重要性,咱们天然而然会有一个疑问,kafka是怎么记录、怎么保存、怎么管理位移的呢?fetch

位移的提交

Consumer 须要上报本身的位移数据,这个汇报过程被称为位移提交。由于 Consumer 可以同时消费多个分区的数据,因此位移的提交其实是在分区粒度上进行的,即Consumer 须要为分配给它的每一个分区提交各自的位移数据。优化

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,KafkaConsumer API提供了多种提交位移的方法,每一种都有各自的用途,这些都是本文将要谈到的方案。

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的总结一下。位移提交分为自动提交和手动提交;而手动提交又分为同步提交和异步提交。

自动提交

当消费配置enable.auto.commit=true的时候表明自动提交位移。

自动提交位移是发生在何时呢?auto.commit.interval.ms默认值是50000ms。即kafka每隔5s会帮你自动提交一次位移。自动位移提交的动做是在 poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求以前会检查是否能够进行位移提交,若是能够,那么就会提交上一次轮询的位移。假如消费数据量特别大,能够设置的短一点。

越简单的东西功能越不足,自动提交位移省事的同时确定会带来一些问题。自动提交带来重复消费和消息丢失的问题:

  • 重复消费: 在默认状况下,Consumer 每 5 秒自动提交一次位移。如今,咱们假设提交位移以后的 3 秒发生了 Rebalance 操做。在 Rebalance 以后,全部 Consumer 从上一次提交的位移处继续消费,但该位移已是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的全部数据都要从新再消费一次。虽然你可以经过减小 auto.commit.interval.ms 的值来提升提交频率,但这么作只能缩小重复消费的时间窗口,不可能彻底消除它。这是自动提交机制的一个缺陷。

  • 消息丢失: 假设拉取了100条消息,正在处理第50条消息的时候,到达了自动提交窗口期,自动提交线程将拉取到的每一个分区的最大消息位移进行提交,若是此时消费服务挂掉,消息并未处理结束,但却提交了最大位移,下次重启就从100条那消费,即发生了50-100条的消息丢失。

手动提交

当消费配置enable.auto.commit=false的时候表明手动提交位移。用户必须在适当的时机(通常是处理完业务逻辑后),手动的调用相关api方法提交位移。好比在下面的案例中,我须要确认个人业务逻辑返回true以后再手动提交位移

while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 处理业务
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                 } else {
                     log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                 }
             }
             // 手动提交offset
             consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        
         } 
     } catch (Exception e) {
         log.info("kafka consume error." ,e);
     }
 }

手动提交明显能解决消息丢失的问题,由于你是处理完业务逻辑后再提交的,假如此时消费服务挂掉,消息并未处理结束,那么重启的时候还会从新消费。

可是对于业务层面的失败致使消息未消费成功,是没法处理的。由于业务层的逻辑变幻无穷、好比格式不正确,你叫Kafka消费端程序怎么去处理?应该要业务层面本身处理,记录失败日志作好监控等。

可是手动提交不能解决消息重复的问题,也很好理解,假如消费0-100条消息,50条时挂了,重启后因为没有提交这一批消息的offset,是会从0开始从新消费。至于如何避免重复消费的问题,在这篇文章有说。

手动提交又分为异步提交和同步提交。

同步提交

上面案例代码使用的是commitSync() ,顾名思义,是同步提交位移的方法。同步提交位移Consumer 程序会处于阻塞状态,等待 Broker 返回提交结果。同步模式下提交失败的时候一直尝试提交,直到遇到没法重试的状况下才会结束。在任何系统中,由于程序而非资源限制而致使的阻塞均可能是系统的瓶颈,会影响整个应用程序的 TPS。固然,你能够选择拉长提交间隔,但这样作的后果是 Consumer 的提交频率降低,在下次 Consumer 重启回来后,会有更多的消息被从新消费。所以,为了解决这些不足,kafka还提供了异步提交方法。

异步提交

异步提交会当即返回,不会阻塞,所以不会影响 Consumer 应用的 TPS。因为它是异步的,Kafka 提供了回调函数,供你实现提交以后的逻辑,好比记录日志或处理异常等。下面这段代码展现了调用 commitAsync() 的方法

consumer.commitAsync((offsets, exception) -> {
 if (exception != null)
     handleException(exception);
 });

可是异步提交会有一个问题,那就是它没有重试机制,不过通常状况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,由于若是提交失败是由于临时问题致使的,那么后续的提交总会有成功的。因此消息也是不会丢失和重复消费的。
但若是这是发生在关闭消费者或再均衡前的最后一次提交,就要确保可以提交成功。所以,组合使用commitAsync()commitSync()是最佳的方式。

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                boolean handleResult = handle(kafkaMessage);             
             }
             //异步提交位移               
             consumer.commitAsync((offsets, exception) -> {
             if (exception != null)
                 handleException(exception);
             });
           
        }
    }
} catch (Exception e) {
    System.out.println("kafka consumer error:" + e.toString());
} finally {
    try {
        //最后同步提交位移
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

让位移提交更加灵活和可控

若是细心的阅读了上面全部demo的代码,那么你会发现这样几个问题:

一、全部的提交,都是提交 poll 方法返回的全部消息的位移,poll 方法一次返回1000 条消息,则一次性地将这 1000 条消息的位移一并提交。可这样一旦中间出现问题,位移没有提交,下次会从新消费已经处理成功的数据。因此我想作到细粒度控制,好比每次提交100条,该怎么办?

答:能够经过commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)对位移进行精确控制。

二、poll和commit方法对于普通的开发人员而言是一个黑盒,没法精确地掌控其消费的具体位置。我都不知道此次的提交,是针对哪一个partition,提交上去的offset是多少。

答:能够经过record.topic()获取topic信息, record.partition()获取分区信息,record.offset() + 1获取消费位移,记住消费位移是指示下一条消费的位移,因此要加一。

三、我想本身管理offset怎么办?一方面更加保险,一方面下次重启以后能够精准的从数据库读取最后的offset就不存在丢失和重复消费了。
答:能够将消费位移保存在数据库中。消费端程序使用comsumer.seek方法指定从某个位移开始消费。

综合以上几个可优化点,并结合全文,能够给出一个比较完美且完整的demo:联合异步提交和同步提交,对处理过程当中全部的异常都进行了处理。细粒度的控制了消费位移的提交,而且保守的将消费位移记录到了数据库中,从新启动消费端程序的时候会从数据库读取位移。这也是咱们消费端程序位移提交的最佳实践方案。你只要继承这个抽象类,实现你具体的业务逻辑就能够了。

public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset记录到数据库中 从指定的offset处消费 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {

                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一条消息的位移。因此OffsetAndMetadata 对象时,必须使用当前消息位移加 1。
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));

                            // 细粒度控制提交 每10条提交一次offset
                            if (count % 10 == 0) {
                                // 异步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 将消费位移再记录一份到数据库中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });


                                });
                            }
                            count++;
                        } else {         
                            System.out.println("消费消息失败 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                        }
                    }


                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最后一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }


        }
    }


    /**
     * 具体的业务逻辑
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);

    public abstract List<String> getTopics();

    public abstract String getGroupId();

    void handleException(Exception e) {
        //异常处理
    }
}

控制位移提交的N种方式

刚刚咱们说本身控制位移,使用seek方法能够指定offset消费。那到底怎么控制位移?怎么重设消费组位移?seek是什么?如今就来仔细说说。

并非全部的消息队列均可以重设消费者组位移达到从新消费的目的。好比传统的RabbitMq,它们处理消息是一次性的,即一旦消息被成功消费,就会被删除。而Kafka消费消息是能够重演的,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,因此消费者不会删除消息数据。同时,因为位移数据是由消费者控制的,所以它可以很容易地修改位移的值,实现重复消费历史数据的功能。

了解如何重设位移是很重要的。假设这么一个场景,我已经消费了1000条消息后,我发现处理逻辑错了,因此我须要从新消费一下,但是位移已经提交了,我到底该怎么从新消费这1000条呢??假设我想从某个时间点开始消费,我又该如何处理呢?

首先说个误区:auto.offset.reset=earliest/latest这个参数你们都很熟悉,可是初学者很容易误会它。大部分朋友都以为在任何状况下把这两个值设置为earliest或者latest ,消费者就能够从最先或者最新的offset开始消费,但实际上并非那么回事,他们生效都有一个前提条件,那就是对于同一个groupid的消费者,若是这个topic某个分区有已经提交的offset,那么不管是把auto.offset.reset=earliest仍是latest,都将失效,消费者会从已经提交的offset开始消费。所以这个参数并不能解决用户想重设消费位移的需求。

kafka有七种控制消费组消费offset的策略,主要分为位移维度和时间维度,包括:

  • 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成咱们给定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

  • 时间维度。咱们能够给定一个时间,让消费者把位移调整成大于该时间的最小位移;也能够给出一段时间间隔,好比 30 分钟前,而后让消费者直接将位移调回 30 分钟以前的位移值。包括DateTime和Duration策略

说完了重设策略,咱们就来看一下具体应该如何实现,能够从两个角度,API方式和命令行方式。

重设位移的方法之API方式

API方式只要记住用seek方法就能够了,包括seek,seekToBeginning 和 seekToEnd。

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);

从方法签名咱们能够看出seekToBeginningseekToEnd是能够一次性重设n个分区的位移,而seek 只容许重设指定分区的位移,即为每一个分区都单独设置位移,由于不可贵出,若是要自定义每一个分区的位移值则用seek,若是但愿kafka帮你批量重设全部分区位移,好比从最新数据消费或者从最先数据消费,那么用seekToEnd和seekToBeginning。

Earliest 策略:从最先的数据开始消费

从主题当前最先位移处开始消费,这个最先位移不必定就是 0 ,由于好久远的消息会被 Kafka 自动删除,主要取决于你的删除配置。

代码以下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是构造consumer对象,这样咱们能够经过partitionsFor获取到分区的信息,而后咱们就能够构造出TopicPartition集合,传给seekToBegining方法。须要注意的一个地方是:须要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

在poll(0)中consumer会一直阻塞直到它成功获取了所需的元数据信息,以后它才会发起fetch请求去获取数据。而poll(Duration)会把元数据获取也计入整个超时时间。因为本例中使用的是0,即瞬时超时,所以consumer根本没法在这么短的时间内链接上coordinator,因此只能赶在超时前返回一个空集合。

Latest策略:从最新的数据开始消费

consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:从当前已经提交的offset处消费

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);
        });

**Special-offset策略:从指定的offset处消费 **

该策略使用的方法和current策略同样,区别在于,current策略是直接从kafka元信息中读取中已经提交的offset值,而special策略须要用户本身为每个分区指定offset值,咱们通常是把offset记录到数据库中而后能够从数据库去读取这个值

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                });

以上演示了用API方式重设位移,演示了四种常见策略的代码,另外三种没有演示,一方面是大同小异,另外一方面在实际生产中,用API的方式不太可能去作时间维度的重设,而基本都是用命令行方式。

重设位移的方法之命令行方式

命令行方式重设位移是经过 kafka-consumer-groups 脚本。比起 API 的方式,用命令行重设位移要简单得多。

Earliest 策略指定–to-earliest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略指定–to-latest。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略指定–to-current。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略指定–to-offset。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略指定–shift-by N。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略指定–to-datetime。

DateTime 容许你指定一个时间,而后将位移重置到该时间以后的最先位移处。常见的使用场景是,你想从新消费昨天的数据,那么你可使用该策略重设位移到昨天 0 点。

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略指定–by-duration。
Duration 策略则是指给定相对的时间间隔,而后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。若是你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,若是你想将位移调回到 15 分钟前,那么你就能够指定 PT0H15M0S

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

提交的位移都去哪了?

经过上面那几部分的内容,咱们已经搞懂了位移提交的方方面面,那么提交的位移它保存在哪里呢?这就要去位移主题的的世界里一探究竟了。kafka把位移保存在一个叫作__consumer_offsets的内部主题中,叫作位移主题。

注意:老版本的kafka实际上是把位移保存在zookeeper中的,可是zookeeper并不适合这种高频写的场景。因此新版本已是改进了这个方案,直接保存到kafka。毕竟kafka自己就适合高频写的场景,而且kafka也能够保证高可用性和高持久性。

既然它也是主题,那么离不开分区和副本这两个机制。咱们并无手动建立这个主题而且指定,因此是kafka自动建立的, 分区的数量取决于Broker 端参数 offsets.topic.num.partitions,默认是50个分区,而副本参数取决于offsets.topic.replication.factor,默认是3。

既然也是主题,确定会有消息,那么消息格式是什么呢?参考前面咱们手动设计将位移写入数据库的方案,咱们保存了topic,group_id,partition,offset四个字段。topic,group_id,partition无疑是数据表中的联合主键,而offset是不断更新的。无疑kafka的位移主题消息也是相似这种设计。key也是那三个字段,而消息体其实很复杂,你能够先简单理解为就是offset。

既然也是主题,确定也会有删除策略,不然消息会无限膨胀。可是位移主题的删除策略和其余主题删除策略又不太同样。咱们知道普通主题的删除是能够经过配置删除时间或者大小的。而位移主题的删除,叫作 Compaction。Kafka 使用Compact 策略来删除位移主题中的过时消息,对于同一个 Key 的两条消息 M1 和 M2,若是 M1 的发送时间早于 M2,那么 M1 就是过时消息。Compact 的过程就是扫描日志的全部消息,剔除那些过时的消息,而后把剩下的消息整理在一块儿。

Kafka 提供了专门的后台线程按期地巡检待 Compact 的主题,看看是否存在知足条件的可删除数据。这个后台线程叫 Log Cleaner。不少实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,若是你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,一般都是这个线程挂掉了致使的。

总结

kafka的位移是个极其重要的概念,控制着消费进度,也即控制着消费的准确性,完整性,为了保证消息不重复和不丢失。咱们最好作到如下几点:

  • 手动提交位移。

  • 手动提交有异步提交和同步提交两种方式,既然二者有利也有弊,那么咱们能够结合起来使用。

  • 细粒度的控制消费位移的提交,这样能够避免重复消费的问题。

  • 保守的将消费位移再记录到了数据库中,从新启动消费端程序的时候从数据库读取位移。

获取Kafka全套原创学习资料及思惟导图,关注【胖滚猪学编程】公众号,回复"kafka"。

本文来源于公众号:【胖滚猪学编程】。一枚集颜值与才华于一身,不算聪明却足够努力的女程序媛。用漫画形式让编程so easy and interesting!求关注!

相关文章
相关标签/搜索