可是测试发现enable.auto.commit参数设置成了false,kafka的offset依然提交了(也没有进行人工提交offset)。spring
查看源码springboot
若是咱们enable.auto.commit设置为false,那么就会走标红的if语句。并且下面有个stopInvokerAndCommitManualAcks()方法,看名字就知道是人工提交的意思。那么咱们进去stopInvokerAndCommitManualAcks()方法瞅瞅。 测试
如上图所示有个processCommits()方法,那么继续追进去: spa
单单看标红的方法是否是就知道这方法里面是更新offset和提交offset的方法。那么咱们继续追进去:code
结论:若是咱们把enable.auto.commit参数设置成true。那么offset交给kafka来管理,offset进行默认的提交模式。
enable.auto.commit参数设置成false。那么就是Spring来替为咱们作人工提交,从而简化了人工提交的方式。
因此kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。enable.auto.commit为true是采用kafka的默认提交模式。 blog
spring.kafka.consumer.enable-auto-commit设置为false,设置AckMode的值get
/** * The offset commit behavior enumeration. */ public enum AckMode { /** * Commit after each record is processed by the listener. */ RECORD, /** * Commit whatever has already been processed before the next poll. */ BATCH, /** * Commit pending updates after * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed. */ TIME, /** * Commit pending updates after * {@link ContainerProperties#setAckCount(int) ackCount} has been * exceeded. */ COUNT, /** * Commit pending updates after * {@link ContainerProperties#setAckCount(int) ackCount} has been * exceeded or after {@link ContainerProperties#setAckTime(long) * ackTime} has elapsed. */ COUNT_TIME, /** * User takes responsibility for acks using an * {@link AcknowledgingMessageListener}. */ MANUAL, /** * User takes responsibility for acks using an * {@link AcknowledgingMessageListener}. The consumer is woken to
默认
)@KafkaListener(topics = "k010") public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception { LOGGER.info(cr.toString()); ack.acknowledge(); }
方法参数里头传递Acknowledgment,而后手工ackkafka
若是只添加上面语句会报错:源码
the listener container must have a MANUAL Ackmode to populate the Acknowledgment
咱们要配置AckMode为MANUAL Ackmodeit
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);