Kafka(七)消费者偏移量

在Kafka0.9版本以前消费者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消费者不在保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“__consumer_offsets”,该主题默认有50个分区,每一个分区3个副本,分区数量有参数offset.topic.num.partition设置。经过消费者组ID的哈希值和该参数取模的方式来肯定某个消费者组已消费的偏移量保存到__consumer_offsets主题的哪一个分区中。
异步


Kafka消费者API提供两种方法用来查询偏移量。一个是committed(TopicPartition partition)方法,这个方法返回一个OffsetAndMetadata对象,经过这个对象能够获取指定分区已提交的偏移量;另一个方法position(TopicPartition partition)返回的是下一次拉取位置。ide

同时Kafka消费者还提供了重置消费偏移量的方法,seek(TopicPartition partition, long offset),该方法用于指定消费起始位置,另外还有seekToBeginning()和seekToEnd(),从名字就能看出来是干吗的。spa


偏移量提交有自动和手动,默认是自动(enable.auto.commit = true)。自动提交的话每隔多久自动提交一次呢?这个由消费者协调器参数auto.commit.interval.ms 毫秒执行一次提交。有些场景咱们须要手动提交偏移量,尤为是在一个长事务中而且保证消息不被重复消费以及消息不丢失,好比生产者一个订单提交消息,消费者拿到后要扣减库存,扣减成功后该消息才能提交,因此在这种场景下须要手动提交,由于库存扣减失败这个消息就不能消费,同时客户这个订单状态也不能是成功。手动提交也有两种一个是同步提交一个是异步提交,其区别就是消费者线程是否阻塞。若是使用手动提交就要关闭自动提交,由于自动提交默认是开启的。线程

相关文章
相关标签/搜索