kafka-python 1.4.6 版本触发的一个 rebalance 问题

在使用了最新版的 kafka-python 1.4.6 在 broker 对 topic 进行默认配置的状况下报出相似错误css

CommitFailedError
CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

这里要申明一点,在 1.4.0 以上的 kafka-python 版本使用了独立的心跳线程去上报心跳。html

这里报错大概表达的意思是 没法在默认 300000ms 中完成处理操做。咱们一般会一次性 poll 拉默认 500 条数据下来。咱们须要在 300s 中完成 500 条数据的处理。若是不能完成的话就可能会触发这个问题。python

由于这个报错的提示写得很是清楚,因此咱们先按这个方向去尝试处理这个问题。首先调高了咱们的 max_poll_interval_ms 的时间,可是无效。git

而后 records 的条数减小,依然无效,该报错仍是会报错。这不由让我怀疑触发这个问题的是否并不是这里报错建议的那些地方。github

 

因此我把目光放到了 broker 日志那边去,想看下究竟是由于什么缘由致使爆出相似错误。bootstrap

在日志上发现了一些日志,对应的 consumer 在反复的 rebalance:vim

[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-05ed83f1-aa90-4950-b097-4cf467598082 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:29,556] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,556] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-f7826720-fef7-4b02-8104-d1f38065c2fe in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Preparing to rebalance group sync_group_20180321 with old generation 1090 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,708] INFO [GroupCoordinator 0]: Member kafka-python-1.4.6-ac5f6aff-3600-4e67-a529-31674c72b1e4 in group sync_group_20180321 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,716] INFO [GroupCoordinator 0]: Stabilized group sync_group_20180321 generation 1091 (__consumer_offsets-3) (kafka.coordinator.group.GroupCoordinator)
[2019-08-18 09:19:39,721] INFO [GroupCoordinator 0]: Assignment received from leader for group sync_group_20180321 for generation 1091 (kafka.coordinator.group.GroupCoordinator)

参考 sentry 打出来的错误,咱们能够认为这和 sentry 爆出来的问题有直接关系。所以咱们要从另一个角度去思考一下为何个人 max_poll_interval_ms 已经调高而且每次拉取处理条数下降却依然会报出此问题,而且频繁触发 rebalance 。session

kafka-python 在 1.4.0 版本分离了心跳线程和 poll 主线程。个人第一反应就是会不会由于 poll 线程阻塞了心跳线程的切换,或者引发了某种死锁从而致使心跳线程没法正确的发起心跳。最后致使 broker 认为 group 已经死亡而后主动触发了 rebalance .socket

而后我去 kafka-python 的 gihub 搜索了一下相似问题,立刻就发现了有很多人都有这个问题。ide

https://github.com/dpkp/kafka-python/issues/1418

从中找到一些有趣的信息,好比来自 

I am seeing consumer rebalances even if there is no messages to consume. Start three consumers in a group and send some messages to topic and after that stop the producer. The consumer will start seeing rebalances after 5-6mins.
Sample code here:
https://stackoverflow.com/questions/54908902/kafka-consumer-rebalance-occurs-even-if-there-is-no-message-to-consume

他说即便在没有消息能够消费的状况下,也能够看到 kafka consumer 在过了 5 - 6 mins 以后开启了 rebalance 。

这就跟咱们的问题很是类似,咱们并非 process 的过程消耗的时间过长而触发了 rebalance 而是有多是由于消费得太快,致使有些消费者处于 空 poll 的状态从而阻塞了心跳线程。客观来讲,我目前还会报出这个问题的 topic 有多达 50 个partitions,我开启了5个消费者对其进行消费,平均一个消费者须要消费 10 个parititons 。若是有某个 partitions 长期没有消费过来咱们可能会被阻塞在那里最终致使 heartbeat 超时。 1.4.6 的客户端默认 10s 没心跳就超时,而发送间隔仅为 3s 。也就是连续三个周期没有发送就超时了。

下面看到 dpkp 的一个回复,表达了有可能就是被 poll 主线程阻塞,的问题,而且有 workaround 能够用来避免这种状况:

vimal: thanks for posting. I believe you may be hitting lock contention between an idle client.poll -- which can block and hold the client lock for the entire request_timeout_ms -- and the attempt by the heartbeat thread to send a new request. It seems to me that we may need to use KafkaClient.wakeup() to make sure that the polling thread drops the lock if/when we need to send a request from a different thread.

This shouldn't be an issue when messages are flowing through your topics at a steady rate. If this is just a test environment, and you expect your production environment to have more steady live data, then you could just ignore the error in testing. But if you are managing a topic w/ very low traffic -- delays of minutes between consecutive messages, for example -- you might try to reduce the request_timeout_ms to something closer to the heartbeat_interval_ms, which should prevent the read side from blocking for much longer than the heartbeat timeout. But note that other timeouts may also need to change (max_poll_interval_ms and session_timeout_ms perhaps). Another workaround might be to reduce metadata_max_age_ms to something close / equal to your heartbeat_timeout_ms. This will cause more frequent metadata requests, but should unblock the send side when there is no socket data available for reads.

 dpkp 的观点在于,若是咱们数据发送过来的频率是稳定的,消费者是正好能够消费完队列里面的信息的状况的时候,不该该出现这样的问题。出现这样的问题与咱们预期和看到报错的状况可能偏偏相反,不是咱们消费得太慢,而是咱们消费得太快,而且生产者发送消息的频率太低致使的。在 poll 不到消息的时候,主线程可能会面临阻塞,而没法及时切换到心跳线程进行心跳的发送,最终致使了这个问题。

他给到一个 trick 的方法来解决这个问题,当面临这种状况的时候咱们能够把 metadata_max_age_ms 调整到和心跳发送频率差很少 close / equal to our heartbeat_timeout_ms.

发送 metadata_request 会解除咱们发送端的阻塞,从而达到抑制死锁的效果。

self.kafka = kafka.KafkaConsumer(
    auto_offset_reset=auto_offset_reset,
    bootstrap_servers=['10.171.97.1:9092', '10.163.13.219:9092', '10.170.249.122:9092'],
    group_id=group_id,
    metadata_max_age_ms=metadata_max_age_ms
)
self.kafka.subscribe(topics)

尝试补充了 metadata_max_age_ms 大约 3000 ms ,这个问题获得了很大程度的解决和缓解。

既然肯定了多是由于消费太快,而后生产慢致使的主线程锁住的问题,剩下能够验证一下是否真的是这样。尝试打日志看一下切换线程发送心跳的状况能够来确认该问题是否如此。

另外看代码发现 poll 主线程在 poll 以前会默认会进行 wakeup() 可是在 1.4.6里面也由于以前的 某个 bug 而默认关闭了,不知道是否有影响,等后续测试以后补上。

 

 

Reference:

https://github.com/dpkp/kafka-python/issues/1418  Heartbeat failed for group xxxWorker because it is rebalancing

https://github.com/dpkp/kafka-python/issues/1760  [1.4.5] KafkaProducer raises KafkaTimeoutError when attempting wakeup()

https://www.cnblogs.com/huxi2b/p/6815797.html  Kafka 0.11版本新功能介绍 —— 空消费组延时rebalance

相关文章
相关标签/搜索