kafka shutdown中止很慢问题 java
在数据量大的时候,consumer一次抓取数据的数据不少,进入到业务处理的数据可能有不少, apache
假设一次poll有1万条数据进入业务程序,并且业务程序是和poll绑定在一块儿线程同步执行的,假设平均每条数据,执行业务程序花费100ms, 服务器
那么poll一次的数据,至少要执行 1w*0.1s = 1000s = 16.67分钟。 网络
因此,在数据量大的时候,中止一个线程(须要先等待业务程序处理完数据),可能要十几分钟。 异步
shutdown问题解决方案 this
一、改为异步处理数据,consumer取出来的数据,放到BlockQueue中,由异步线程去处理,当异步线程处理不过来时,阻塞consumer,调用consumer.pause()方法avoid group management rebalance,代码以下(来源于Spring-Kafka): spa
// avoid group management rebalance due to a slow consumer this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()])); public void onPartitionsAssigned(Collection<TopicPartition> partitions) { this.assignedPartitions = partitions; }
二、若是是同步执行数据处理,考虑提升业务程序 处理数据的速度。 线程
三、同步处理数据,可是改为手动提交offset,当shutdown的时候,poll的数据不须要所有处理,只须要记录处理的位置便可。代码示例以下: component
list data = consumer.poll(); for(record: data) { if(shutdown) { // 收到shutdown命令后当即中止,未处理的数据将丢弃 break; } deal(record); saveTopicOffset(record); } submitDealtDataOffset();
另外, kafka
Kafka停不掉shutdown关闭不了问题
缘由是卡在了consumer.close()方法里面,它会提交offset信息,若是网络中断或者kafka服务器有问题致使提交不了offset,则consumer.close方法会一直卡住(不停的循环尝试提交offset,永不中断)。
参见:Kafka poll一直等待的bug:
https://issues.apache.org/jira/browse/KAFKA-4189?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
https://issues.apache.org/jira/browse/KAFKA-3172?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC
解决方法:目前尚未好的办法,只能将offset的自动提交改为手动提交offset。可是,我写了一个程序能够在调用consumer.close后将线程强行杀死,做为临时解决方案。