对于0.10.1以上版本的kafka, 如何从外部重置一个运行中的consumer group的进度呢?好比有一个控制台,能够主动重置任意消费组的消费进度重置到12小时以前, 而用户的程序能够保持运行状态,无需下线或重启。java
须要这么几个步骤:git
1. 加入这个groupgithub
2. 踢掉全部其它group memeberapp
3. try assign all TopicPartition to this clientide
4. commit offsetsui
5. leave groupthis
其中第二步是为了让本身当上leader,固然有可能不须要踢掉其它全部成员就能当上leader(由于谁能当leader其实是按hashmap的迭代次序来的)。spa
当上consumer group的leader之后,须要把全部partition assign给本身,这个须要一个特殊的PartitionAssignor。因为这个assignor的协议跟其它consumer group协议不一样(可是也能够搞一个表面上协议相同,实际上逻辑不一样的assignor),而cooridnator会阻止与当前leader使用的协议不一样的成员加入,因此仍是须要踢掉其它成员。scala
public class ExclusiveAssignor extends AbstractPartitionAssignor { public interface Callback { void onSuccess(); } private static Logger LOGGER = LoggerFactory.getLogger(ExclusiveAssignor.class); public static String NAME = "exclusive"; private String leaderId = null; private Callback callback = null; public void setLeaderId(String leaderId) { this.leaderId = leaderId; } public void setCallBack(Callback callBack){this.callback = callBack;} @Override public String name() { return NAME; } private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) { Map<String, List<String>> res = new HashMap<>(); for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); for (String topic : subscriptionEntry.getValue()) put(res, topic, consumerId); } return res; } @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { LOGGER.info("perform exclusive assign"); if(leaderId == null) throw new IllegalArgumentException("leaderId should already been set before assign is called"); if(callback == null) throw new IllegalArgumentException("callback should already been set before assign is called"); List<TopicPartition> allPartitions = new ArrayList<TopicPartition>(); partitionsPerTopic.forEach((topic, partitionNumber) -> { for(int i=0; i < partitionNumber; i++) allPartitions.add(new TopicPartition(topic, i)); }); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) { assignment.put(memberId, new ArrayList<TopicPartition>()); if(memberId.equals(leaderId)){ assignment.get(memberId).addAll(allPartitions); } } callback.onSuccess(); return assignment; } }
这个assignor须要知道leaderId是哪一个,而leaderId能够在KafkaConsumer的code
protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions)
中获取,因此还须要修改一下KafkaConsumer的代码,以确保这个KafkaConsumer的poll并不实际拉取消息,而只是执行commit。
驱逐其它member,可使用AdminClient完成
def forceLeave(coordinator: Node, memberId: String, groupId: String) = { logger.info(s"forcing group member: $memberId to leave group: $groupId ") send(coordinator, ApiKeys.LEAVE_GROUP, new LeaveGroupRequest(groupId, memberId)) }
最终的逻辑就是
private def forceCommit(consumer: SimpleKafkaConsumer[_, _], groupId: String, topics: Seq[String], maxRetries: Int, toCommit: Map[TopicPartition, OffsetAndMetadata], coordinatorOpt: Option[Node] = None) = { consumer.subscribe(JavaConversions.seqAsJavaList(topics)) val assignedAll = new AtomicBoolean(false) consumer.setExclusiveAssignorCallback(new Callback { override def onSuccess(): Unit = assignedAll.set(true) }) var currentRetries = 0 val coordinatorNode = coordinatorOpt.getOrElse(adminClient.findCoordinator(groupId)) while (!assignedAll.get() && currentRetries < maxRetries) { logger.info(s"trying to reset offset for $groupId, retry count $currentRetries ....") clearCurrentMembers(coordinatorNode, groupId, Some(ConsumerGroupManager.magicConsumerId)) consumer.poll(5000) printCurrentAssignment(consumer) currentRetries = currentRetries + 1 } if (currentRetries >= maxRetries) throw new RuntimeException(s"retry exhausted when getting leadership of $groupId") val javaOffsetToCommit = JavaConversions.mapAsJavaMap(toCommit) consumer.commitSync(javaOffsetToCommit) logger.info(s"successfully committed offset for $groupId: $toCommit") consumer.unsubscribe() }
def forceReset(offsetLookupActor: ActorRef, groupId: String, ts: Long, maxRetries: Int)(implicit executionContext: ExecutionContext): Boolean = { logger.info(s"resetting offset for $groupId to $ts") val groupSummary = adminClient.describeConsumerGroup(groupId) val topics = groupSummary.subscribedTopics if (topics.isEmpty) throw new IllegalStateException(s"group $groupId currently subscribed no topic") val offsetToCommit = getOffsetsBehindTs(offsetLookupActor, topics, ts, 10000) val consumer = createConsumer(groupId) try { forceCommit(consumer, groupId, topics, maxRetries, offsetToCommit) true } finally { consumer.close() } }
具体代码见 https://github.com/iBuddha/kafka-simple-ui/blob/master/app/kafka/authorization/manager/utils/ConsumerGroupManager.scala
须要注意的是,发送LeaveGroupRequest可能会使得某些成员到broker的链接断掉,发生这种状况的缘由是:当一个consumer发送JoinGroupRequest之后,外部的client再发送一个LeaveGroupRequest把这个consumer踢掉,会使得它个consumer没法收到JoinGroupResponse,从而使得NetworkClient觉得链接挂掉。不过client之后会从新链接。并且,在外部client踢掉其它成员而且从新commit offset的过程当中,其它consumer不必定有机会加入到group中,于是可能不受这个问题的影响。