Kafka consumer poll(long)与poll(Duration)的区别

最近在StackOverflow碰到的一个问题,即在consumer.poll以后assignment()返回为空的问题,以下面这段代码所示:分布式

consumer.subscribe(Arrays.asList("test")); consumer.poll(Duration.ofMillis(0)); // consumer.poll(0);
Set<TopicPartition> assignment = consumer.assignment(); // empty!

有意思的是,若是是consumer.poll(0);则assignment不为空。以前我觉得poll(long)被标记为“Deprecated”以后使用poll(Duration)是相同的效果,如今看来二者仍是要有差异的。为何poll(0)就能获取到consumer分配方案,而使用poll(Duration)就不能呢?fetch

 

调研了一番以后发现缘由以下:在poll(0)中consumer会一直阻塞直到它成功获取了所需的元数据信息,以后它才会发起fetch请求去获取数据。虽然poll能够指定超时时间,但这个超时时间只适用于后面的消息获取,前面更新元数据信息不计入这个超时时间。poll(Duration)这个版本修改了这样的设计,会把元数据获取也计入整个超时时间。因为本例中使用的是0,即瞬时超时,所以consumer根本没法在这么短的时间内链接上coordinator,因此只能赶在超时前返回一个空集合。这就是为何使用不一样版本的poll命令assignment不一样的缘由。spa

 

仔细想一想为何社区要作这样的变动?poll(0)这种设计的一个问题在于若是远端的broker不可用了, 那么consumer程序会被无限阻塞下去。用户指定了超时时间但却被无限阻塞,显然这样的设计时有欠缺的。特别是对于Kafka Streams而言,这个设计可能致使的问题在于Stream Thread没法正常关闭。目前源代码中依然有一些无限阻塞的场景,好比以前处理的initTransaction,commitTransaction和abortTransaction也是无限等待。看来后面社区仍是须要慢慢地将它们都替换掉,毕竟在分布式系统中没有什么场景是须要绝对地等待的。设计

相关文章
相关标签/搜索