Attempted to decrease connection count for address with no connections
[2017-09-20 19:37:05,265] ERROR Found invalid messages during fetch for partition [xxxx,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:05,458] ERROR Found invalid messages during fetch for partition [xxxx,75] offset 1501373 error Message found with corrupt size (0) in shallow iterator (kafka.server.ReplicaFetcherThread) [2017-09-20 19:37:07,455] ERROR [ReplicaFetcherThread-0-5], Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: error processing data for partition [xxxx,87] offset 1503346 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:147) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbractFeherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.lang.RuntimeException: Offset mismatch: fetched offset = 1503346, log end offset = 1503297. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:110) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:138)
ERROR Found invalid messages during fetch for partition [qssnews_download,87] offset 1503297 error Message is corrupt (stored crc = 286782282, computed crc = 400317671) (kafka.server.ReplicaFetcherThread)
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log truncation is not allowed for topic test, Current leader 1's latest offset 0 is less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)
unclean.leader.election.enable=false
, 而后走到了代码里下面这段逻辑if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("...") Runtime.getRuntime.halt(1) }
调用Runtime.getRuntime.halt(1)
直接暴力退出了.
可参考Kafka issue: Unclean leader election and "Halting because log truncation is not allowed"php
WARN [Replica Manager on Broker 3]: While recording the replica LEO, the partition [orderservice.production,0] hasn't been created. (kafka.server.ReplicaManager)
和css
ERROR [ReplicaFetcherThread-0-58], Error for partition [reptest,0] to broker 58:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
replica
从错误的partition leader
上去同步数据了, 这理论上不该该啊;
/brokers/[topic]
节点的内容里直接去掉了这个partiton的信息, 可是kafka controller
并不会处理partiton减小的状况, 可参考KafkaController分析 /controller
临时节点;[2017-09-30 10:49:36,126] ERROR [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) java.lang.IllegalArgumentException: requirement failed: 138296566648 messages in segment __consumer_offsets-5/00000000000000000000.log but offset map can fit only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads at scala.Predef$.require(Predef.scala:219) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584) at kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580) at scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580) at kafka.log.Cleaner.clean(LogCleaner.scala:322) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
LogCleaner
的源码可知,是00000000000000000000.log
这个logSegment
的segment.nextOffset() - segment.baseOffset
大于了maxDesiredMapSize
, 致使了LogClean
线程的终止, 从而没法清理, 这不该该啊?!val segmentSize = segment.nextOffset() - segment.baseOffset
require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can in了crease log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) if (map.size + segmentSize <= maxDesiredMapSize) offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) else full = true
00000000000000000000.log
和00000000000000000000.index
, 而后删掉了cleaner-offset-checkpoint
中相关的项,重启broker, 日志开始了压缩清理logSegment
的segment.nextOffset() - segment.baseOffset
大于了maxDesiredMapSize
, 猜想是有个业务是手动提交offset到这个partition, 没有控制好,致使每秒能提交8,9MByte上来;stop the world
,broker全部线程都中止工做, 天然也没法与zk保持心跳;GC
是个大麻烦, 网上也搜了一圈, 没找到有效的解决方案, 我的水平有限, 哪位大神有什么好的方法, 能够留言给我,谢谢~zookeeper.forceSync=no
, 下降写盘IO, 这个配置有其反作用, 在线上使用时还需慎重;Grafana
来显示和报警;Attempted to decrease connection count for address with no connections
[2016-10-13 00:00:00,495] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /xxx.xxx.xxx.xxx at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450) at kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:445) at java.lang.Thread.run(Thread.java:745)
[2017-10-12 16:52:38,141] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:745)
SocketServer
中:try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) }
在处理Request时并未处理这个异常,致使这个异常被其外层的try...catch...
处理, 直接进入了下一轮的selector.poll(300)
, 而在这个selector.poll(300)
中会清理以前全部的接收到的Requests, 这就致使在这种状况下,可能会漏处理一些Request, 这样看起来仍是个比较严重的问题;html
selector.completedReceives.asScala.foreach { receive => var isClose = false try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) case e : ArrayIndexOutOfBoundsException => error("NotSupport Request | Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) } if (!isClose) { selector.mute(receive.source) } }
inflightResponses
会缓存住须要发送但尚未发送完成的response, 这个response又同时持有其对应的request的引用, 访问请求量大的时候其内存占用很多.inflightResponses
0.9.0.1代码中只在completedSends中做了remove, 在disconnected
和close
中没有处理;inflightResponses
变量去掉, 但这会有个反作用,会影响到Metrics的统计;disconnected
和close
也加入移除的操做;