本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。java
使用jmx查询出来的MaxLag跟使用ConsumerOffsetChecker查出来的老是不同,几乎是jmx查出来的是0,可是实际是存在lag的。这里探究一下这个MaxLag的计算。git
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherManager.scalagithub
abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " newGauge( "MaxLag", new Gauge[Long] { // current max lag across all fetchers/topics/partitions def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => { fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => { curMaxThread.max(fetcherLagStatsEntry._2.lag) }).max(curMaxAll) }) }, Map("clientId" -> clientId) ) }
具体newGauge是调用KafkaMetricsGroup的方法重点看这个计算逻辑,全部的数据都在fetcherThreadMap里头,key是BrokerAndFetcherId,value是AbstractFetcherThread,具体实例的类是ConsumerFetcherThread,它继承了AbstractFetcherThreadapache
AbstractFetcherThread里头有个重要的字段,就是fetcherLagStats。app
class FetcherLagStats(metricId: ClientIdAndBroker) { private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k) val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory)) def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) } }
class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup { private[this] val lagVal = new AtomicLong(-1L) newGauge("ConsumerLag", new Gauge[Long] { def value = lagVal.get }, Map("clientId" -> metricId.clientId, "topic" -> metricId.topic, "partition" -> metricId.partitionId.toString) ) def lag_=(newLag: Long) { lagVal.set(newLag) } def lag = lagVal.get }
lag值的更新在AbstractFetcherThread#processFetchRequest
private def processFetchRequest(fetchRequest: FetchRequest) { val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } } } fetcherStats.requestRate.mark() if (response != null) { // process fetched data inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple val currentOffset = partitionMap.get(topicAndPartition) // we append to the log if the current offset is defined and it is the same as the offset requested during fetch if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { partitionData.error match { case ErrorMapping.NoError => try { val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] val validBytes = messages.validBytes val newOffset = messages.shallowIterator.toSeq.lastOption match { case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get } partitionMap.put(topicAndPartition, newOffset) fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset fetcherStats.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) } catch { case ime: InvalidMessageException => // we log the error and continue. This ensures two things // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) } case ErrorMapping.OffsetOutOfRangeCode => try { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { case e: Throwable => error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition } } } } } } if(partitionsWithError.size > 0) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } }
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
这个是在AbstractFetcherThread#doWork方法里头
abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { //... override def doWork() { inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { case((topicAndPartition, offset)) => fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } } val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) } }
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { //... def doWork(): Unit override def run(): Unit = { info("Starting ") try{ while(isRunning.get()){ doWork() } } catch{ case e: Throwable => if(isRunning.get()) error("Error due to ", e) } shutdownLatch.countDown() info("Stopped ") } }
kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scalasocket
object ConsumerOffsetChecker extends Logging { private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() //... private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int) { val topicPartition = TopicAndPartition(topic, pid) val offsetOpt = offsetMap.get(topicPartition) val groupDirs = new ZKGroupTopicDirs(group, topic) val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { case Some(bid) => val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid)) consumerOpt match { case Some(consumer) => val topicAndPartition = TopicAndPartition(topic, pid) val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), owner match {case Some(ownerStr) => ownerStr case None => "none"})) case None => // ignore } case None => println("No broker for partition %s - %s".format(topic, pid)) } } }
主要是这个processPartition进行获取lag的逻辑
里头依赖的offsetMap获取逻辑以下ide
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) val topicList = topics match { case Some(x) => x.split(",").view.toList case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList } topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException => if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } }
大致的逻辑就是
问题可能就在这个HighWaterMark:fetch
HighWaterMark取的是partition对应的ISR中最小的LEO,消费者最多只能消费到HW所在的位置
毫无疑问使用leader的offset确定比使用HighWaterMark的数据要大,这样在replica延迟大的时候,表现更为明显
可是实际状况,即便消费端故意模拟耗时消费处理,也不见得这个数据变大,几乎老是0,所以问题还不是这个HighWaterMarkui
最后调试了一次,进入AbstractFetcherThread里头,看到这段数据的真实值,才恍然大悟this
val newOffset = messages.shallowIterator.toSeq.lastOption match { case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get }
原来这里统计的是fetcher拉取的最新数据的offset与partition的HighWaterMark的差值,而拉取回来是放到一个内存队列里头让业务消费线程去消费的;它衡量的fetcher拉取的速度,而不是消费者消费的速度,要看消费者与生产者的lag值,就得使用ConsumerOffsetChecker去检查。看来还真的不能望文生义,被坑了一天