本文分析的Kafka代码为kafka-0.8.2.1。另外,因为Kafka目前提供了两套Producer代码,一套是Scala版的旧版本;一套是Java版的新版本。虽然Kafka社区极力推荐你们使用Java版本的producer,但目前不少已有的程序仍是调用了Scala版的API。今天咱们就分析一下旧版producer的代码。java
val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") .withRequiredArg .describedAs("request required acks") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) // 此处默认设置为0
do { message = reader.readMessage() // 从LineMessageReader类中读取消息。该类接收键盘输入的一行文本做为消息 if(message != null) producer.send(message.topic, message.key, message.message) // key默认是空,若是想要指定,需传入参数parse.key=true,默认key和消息文本之间的分隔符是'\t' } while(message != null) // 循环接收消息,除非Ctrl+C或其余其余引起IOException操做跳出循环
下面代码是Producer.scala中的发送方法: api
def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) //若是producer已经关闭了抛出异常退出 throw new ProducerClosedException recordStats(messages //更新producer统计信息 sync match { case true => eventHandler.handle(messages) //若是是同步发送,直接使用DefaultEventHandler的handle方法发送 case false => asyncSend(messages) // 不然,使用ayncSend方法异步发送消息——本文不考虑这种状况 } } }
由上面的分析能够看出,真正的发送逻辑实际上是由DefaultEventHandler类的handle方法来完成的。下面咱们重点分析一下这个类的代码结构。数组
5、DefaultEventHandler与消息发送缓存
这个类的handler方法能够同时支持同步和异步的消息发送。咱们这里只考虑同步的代码路径。下面是消息发送的完整流程图:并发
如下代码是发送消息的核心逻辑:app
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { // 属性message.send.max.retries指定了消息发送的重试次数,而outstandingProducerRequests就是序列化以后待发送的消息集合 topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //将待发送消息所属topic加入到待刷新元数据的topic集合 if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { //查看是否已过刷新元数据时间间隔 Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) // 更新topic元数据信息 sendPartitionPerTopicCache.clear() //若是消息key是空,代码随机选择一个分区并记住该分区,之后该topic的消息都会往这个分区里面发送。sendPartitionPerTopicCache就是这个缓存 topicMetadataToRefresh.clear //清空待刷新topic集合 lastTopicMetadataRefreshTime = SystemTime.milliseconds } outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) // 真正的消息发送方法 if (outstandingProduceRequests.size > 0) { // 若是还有未发送成功的消息 info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // 等待一段时间并重试 // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 // 更新剩余重试次数 producerStats.resendRate.mark() } }
下面具体说说各个子模块的代码逻辑: dom
serializedMessages += new KeyedMessage[K,Message]( topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) // new Message时没有指定key
构建完KeyedMessage以后返回对应的消息集合便可。异步
def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil // TopicMetadata = topic信息+ 一组PartitionMetadata (partitionId + leader + AR + ISR) val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) //构造TopicMetadataRequest并随机排列全部broker,而后从第一个broker开始尝试发送请求。一旦成功就终止后面的请求发送尝试。 topicsMetadata = topicMetadataResponse.topicsMetadata //从response中取出zookeeper中保存的对应topic元数据信息 // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) //更新到broker的topic元数据缓存中 } else warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, ErrorMapping.exceptionFor(pmd.errorCode).getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) }
关于上面代码中的最后一行, 咱们须要着重说一下。每一个producer应用程序都会保存一个producer池对象来缓存每一个broker上对应的同步producer实例。具体格式为brokerId -> SyncProducer。SyncProducer表示一个同步producer,其主要的方法是send,支持两种请求的发送:ProducerRequest和TopicMetadataRequest。前者是发送消息的请求,后者是更新topic元数据信息的请求。为何须要这份缓存呢?咱们知道,每一个topic分区都应该有一个leader副本在某个broker上,而只有leader副本才能接收客户端发来的读写消息请求。对producer而言,即只有这个leader副本所在的broker才能接收ProducerRequest请求。在发送消息时候,咱们会首先找出这个消息要发给哪一个topic,而后发送更新topic元数据请求给任意broker去获取最新的元数据信息——这部分信息中比较重要的就是要获取topic各个分区的leader副本都在哪些broker上,这样咱们稍后会建立链接那些broker的阻塞通道(blocking channel)去实现真正的消息发送。Kafka目前的作法就是重建全部topic分区的leader副本所属broker上对应的SyncProducer实例——虽然我以为这样实现有线没有必要,只更新消息所属分区的缓存信息应该就够了(固然,这只是个人观点,若是有不一样意见欢迎拍砖)。如下是更新producer缓存的一些关键代码:socket
val newBrokers = new collection.mutable.HashSet[Broker] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) //遍历topic元数据信息中的每一个分区元数据实例,若是存在leader副本的,添加到newBrokers中以备后面更新缓存使用 newBrokers+=(pmd.leader.get) }) }) lock synchronized { newBrokers.foreach(b => { //遍历newBrokers中的每一个broker实例,若是在缓存中已经存在,直接关闭掉而后建立一个新的加入到缓存中;不然直接建立一个加入 if(syncProducers.contains(b.id)){ syncProducers(b.id).close() syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) }
前面说了,若是只发送一条消息的话,其实真正须要更新的分区leader副本所述broker对应的SyncProducer实例只有一个,但目前的代码中会更新全部分区,不知道Java版本的producer是否也是这样实现,这须要后面继续调研! async
Topic | 分区 | Leader副本所在的broker ID |
test-topic | P0 | 0 |
test-topic | P1 | 1 |
test-topic | P2 | 3 |
若是基于这样的配置,假定咱们使用producer API一次性发送4条消息,分别是M1,M2, M3和M4。如今就能够开始分析代码了,首先从消息分组及整理开始:
消息 | 要被发送到的分区ID | 该分区leader副本所在broker ID |
M1 | P0 | 0 |
M2 | P0 | 0 |
M3 | P1 | 1 |
M4 | P2 | 3 |
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 随机肯定broker id val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 加入缓存中以便后续使用
def startup() { ... // 建立一个请求处理的线程池,在构造时就会开启多个线程准备接收请求 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) ... } class KafkaRequestHandlerPool { ... for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() // 启动每一个请求处理线程 } ... }
KafkaRequestHandler其实是一个Runnable,它的run核心方法中以while (true)的方式调用api.handle(request)不断地接收请求处理,以下面的代码所示:
class KafkaRequestHandler... extends Runnable { ... def run() { ... while (true) { ... apis.handle(request) // 调用apis.handle等待请求处理 } ... } ... }
在KafkaApis中handle的主要做用就是接收各类类型的请求。本文只关注ProducerRequest请求:
def handle(request: RequestChannel.Request) { ... request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 若是接收到ProducerRequest交由handleProducerOrOffsetCommitRequest处理 case ... } ... }
如此看来,核心的方法就是handleProducerOrOffsetCommitRequest了。这个方法之因此叫这个名字,是由于它同时能够处理ProducerRequest和OffsetCommitRequest两种请求,后者其实也是一种特殊的ProducerRequest。从Kafka 0.8.2以后kafka使用一个特殊的topic来保存提交位移(commit offset)。这个topic名字是__consumer_offsets。本文中咱们关注的是真正的ProducerRequest。下面来看看这个方法的逻辑,以下图所示:
总体逻辑看上去很是简单,以下面的代码所示:
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { ... val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 将消息追加写入本地提交日志 val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 计算是否存在发送失败的分区 if(produceRequest.requiredAcks == 0) { // request.required.acks = 0时的代码路径 if (numPartitionsInError != 0) { info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) // 关闭底层Socket以告知客户端程序有发送失败的状况 } else { ... } } else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0时的代码路径,固然还有其余两个条件 produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 发送response给客户端 } else { // request.required.acks = -1时的代码路径 // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce(...) // 此时须要构造延时请求进行处理,此段逻辑比较复杂,须要理解Purgatory的概念,本文暂不考虑 ... }
由上面代码可见,不管request.required.acks是何值,都须要首先将待发送的消息集合追加写入本地的提交日志中。此时如何按照默认值是是0的状况,那么这写入日志后须要判断下全部消息是否都已经发送成功了。若是出现了发送错误,那么就将关闭连入broker的Socket Server以通知客户端程序错误的发生。如今的关键是追加写是如何完成的?即方法appendToLocalLog如何实现的?该方法总体逻辑流程图以下图所示:
因为逻辑很直观,不对代码作详细分析,不过值得关注的是这个方法会捕获不少异常:
异常名称 | 具体含义 | 异常处理 |
KafakStorageException | 这多是不可恢复的IO错误 | 既然没法恢复,则终止该broker上JVM进程 |
InvalidTopicException | 显式给__consumer_offsets topic发送消息就会有这个异常抛出,不要这么作,由于这是内部topic | 将InvalidTopicException封装进ProduceResult返回 |
UnknownTopicOrPartitionException | topic或分区不在该broker上时抛出该异常 | 将UnknownTopicOrPartitionException封装进ProduceResult返回 |
NotLeaderForPartitionException | 目标分区的leader副本不在该broker上 | 将NotLeaderForPartitionException封装进ProduceResult返回 |
NotEnoughReplicasException | 只会出如今request.required.acks=-1且ISR中的副本数不知足min.insync.replicas指定的最少副本数时会抛出该异常 | 将NotEnoughReplicasException封装进ProduceResult返回 |
其余 | 处理ProducerRequest时发生的其余异常 | 将对应异常封装进ProduceResult返回 |
okay,貌似如今咱们就剩下最后一个主要的方法没说了。分析完这个方法以后整个producer发送消息的流程应该就算是完整地走完了。最后的这个方法就是Partition的appendMessagesToLeader,其主要代码以下:
def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = { inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() // 判断目标分区的leader副本是否在该broker上 leaderReplicaOpt match { case Some(leaderReplica) => // 若是leader副本在该broker上 val log = leaderReplica.log.get // 获取本地提交日志文件句柄 val minIsr = log.config.minInSyncReplicas val inSyncSize = inSyncReplicas.size // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等于-1时才会判断ISR数是否不足 throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" .format(topic,partitionId,minIsr,inSyncSize)) } val info = log.append(messages, assignOffsets = true) // 真正的写日志操做,因为涉及Kafka底层写日志的,之后有机会写篇文章专门探讨这部分功能 // probably unblock some follower fetch requests since log end offset has been updated replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info case None => // 若是不在,直接抛出异常代表leader不在该broker上 throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" .format(topic, partitionId, localBrokerId)) } }
至此,一个最简单的scala版同步producer的代码走读就算正式完成了,能够发现Kafka设计的思路就是在每一个broker上启动一个server不断地处理从客户端发来的各类请求,完成对应的功能并按需返回对应的response。但愿本文能对但愿了解Kafka producer机制的人有所帮助。