0.10.0.0版本的kafka的消费者和消费组已经不在zk上注册节点了,那么消费组是以什么形式存在的呢?java
看下kafka自带的脚本kafka-consumer-groups.sh,可见脚本调用了kafka.admin.ConsumerGroupCommandnode
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"
看下ConsumerGroupCommand,从代码中能够看出新版本的kafka不支持删除消费组操做,实际上,当消费组内消费者为空的时候消费组就会被删除。shell
def main(args: Array[String]) { // ... val consumerGroupService = { if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) // 对于新版本kafka来讲调用的是KafkaConsumerGroupService else new ZkConsumerGroupService(opts) } try { if (opts.options.has(opts.listOpt)) consumerGroupService.list() // 以此为例来看下消费组存在的形式 else if (opts.options.has(opts.describeOpt)) consumerGroupService.describe() else if (opts.options.has(opts.deleteOpt)) { consumerGroupService match { case service: ZkConsumerGroupService => service.delete() case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService") } } } // ... }
咱们以KafkaConsumerGroupService#list为例来看下消费组存在的形式。KafkaConsumerGroupService#list用于获取全部的消费组。沿着代码一直追溯能够看到其会调用AdminClient#listAllGroups。从代码中能够看出要想获取到全部消费组,就须要遍历每一个broker。而要获取某个broker上的消费组则须要发送ApiKeys.LIST_GROUPS的请求。缓存
def listAllGroups(): Map[Node, List[GroupOverview]] = { findAllBrokers.map { case broker => broker -> { // 须要遍历每一个broker try { listGroups(broker) } catch { case e: Exception => debug(s"Failed to find groups from broker ${broker}", e) List[GroupOverview]() } } }.toMap } def listGroups(node: Node): List[GroupOverview] = { // 向相应broker发送请求来获取改broker上的消费组信息 val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()) val response = new ListGroupsResponse(responseBody) Errors.forCode(response.errorCode()).maybeThrow() response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList }
看下KafkaApis.scala对应的请求处理方法handleListGroupsRequestsession
def handleListGroupsRequest(request: RequestChannel.Request) { // ... val (error, groups) = coordinator.handleListGroups() // 关键,获取消费组列表 val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } new ListGroupsResponse(error.code, allGroups.asJava) } requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) }
顺着coordinator.handleListGroups一直往下,能够看到最终是调用GroupMetadataManager#currentGroups来获取到broker上的消费组的。到这里咱们能够看出,消费组和GroupMetadataManager有关。app
def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
GroupMetadata表示一个消费组,MemberMetadata表示一个消费者。先放下总结的图
ide
GroupMetadataManager有个groupsCache属性保存了该broker所管辖的消费组this
private val groupsCache = new Pool[String, GroupMetadata]
看下GroupMetadata的内部属性scala
private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) { private val members = new mutable.HashMap[String, MemberMetadata] // 消费组的客户端 private var state: GroupState = Stable var generationId = 0 // generationId 用于reblance var leaderId: String = null var protocol: String = null // ... } // MemberMetadata表示一个消费者 private[coordinator] class MemberMetadata(val memberId: String, val groupId: String, val clientId: String, val clientHost: String, val sessionTimeoutMs: Int, var supportedProtocols: List[(String, Array[Byte])]) { var assignment: Array[Byte] = Array.empty[Byte] // 消费者分配到的partiton var awaitingJoinCallback: JoinGroupResult => Unit = null var awaitingSyncCallback: (Array[Byte], Short) => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false // ... }
以上就是消费组及其消费者的存在形式,即存在缓存变量中,而不是持久在其余什么地方debug
消费组是不会单首创建的,消费组的建立是在消费者第一次发送join_group请求的时候建立的。建立消费组过程也很简单,就是在GroupMetadataManager#groupsCache加入表明该消费组的GroupMetadata
GroupCoordinator#handleJoinGroup
def handleJoinGroup(groupId: String, memberId: String, clientId: String, clientHost: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) { // ... } else { var group = groupManager.getGroup(groupId) if (group == null) { if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) // 关键,若是group为空,则添加一个group doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) } } else { doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) } } }
GroupMetadataManager#addGroup
def addGroup(group: GroupMetadata): GroupMetadata = { val currentGroup = groupsCache.putIfNotExists(group.groupId, group) // 加入表明该消费组的GroupMetadata if (currentGroup != null) { currentGroup } else { group } }
在第一节ConsumerGroupCommand中咱们能够知道消费组是不支持手动删除的,那么消费组是怎么删除的呢,实际上当消费组中的消费者为空的时候,消费组就会被删除。
看下GroupMetadataManager#removeGroup,我先看下删除消费组都有哪些动做
def removeGroup(group: GroupMetadata) { if (groupsCache.remove(group.groupId, group)) { // 从cache中移除group // 而后再__consumer_offsets主题中该group对应的partition写一个tombstone消息,用于压缩,这是由于__consumer_offsets不会删除,只会压缩 val groupPartition = partitionFor(group.groupId) // 计算group相关联分区,默认是abs(hashcode) % 50 val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition) // 而后将tombstone写入该partition,用于压缩 val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), timestamp = timestamp, magicValue = magicValue) val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition) partitionOpt.foreach { partition => val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition) trace("Marking group %s as deleted.".format(group.groupId)) try { partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone)) } catch { case t: Throwable => error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t) // ignore and continue } } } }
由以上能够看出,删除消费组有两个动做
惟一调用GroupMetadataManager#removeGroup的地方是GroupCoordinator#onCompleteJoin,而调用GroupCoordinator#onCompleteJoin的惟一地方是DelayedJoin。
GroupCoordinator#onCompleteJoin
def onCompleteJoin(group: GroupMetadata) { // ... if (group.isEmpty) { group.transitionTo(Dead) // 先将消费组置位dead状态,而后移除 groupManager.removeGroup(group) info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) } } // ... }
GroupCoordinator#onCompleteJoin
private[coordinator] class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete) override def onExpiration() = coordinator.onExpireJoin() override def onComplete() = coordinator.onCompleteJoin(group) }
难道是在joinGroup操做的时候删除消费组吗?其实并非,而是在heartbeat超时的时候删除的,即当最后一个消费者心跳超时或者说消费组内没有了消费者的时候,该消费组就对被删除。从DelayedHeartbeat开始看下
private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) // 关注这里 override def onComplete() = coordinator.onCompleteHeartbeat() } def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group synchronized { if (!shouldKeepMemberAlive(member, heartbeatDeadline)) onMemberFailure(group, member) // 关注这里 } } } private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) { trace("Member %s in group %s has failed".format(member.memberId, group.groupId)) group.remove(member.memberId) group.currentState match { case Dead => case Stable | AwaitingSync => maybePrepareRebalance(group) // 假设消费组有一个消费者处于Stable状态,当该消费者超时后,就会调用maybePrepareRebalance case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } } private def maybePrepareRebalance(group: GroupMetadata) { group synchronized { if (group.canRebalance) prepareRebalance(group) // 关注这里 } } private def prepareRebalance(group: GroupMetadata) { if (group.is(AwaitingSync)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code) group.transitionTo(PreparingRebalance) info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId)) val rebalanceTimeout = group.rebalanceTimeout val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout) // 最终DelayedJoin在这里被调用 val groupKey = GroupKey(group.groupId) joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) }
由以上咱们能够总结出,就是在heartbeat超时后会进行reblance操做,最终调用GroupCoordinator#prepareRebalance,这个时候若是消费组中members为空则会删除。