Kafka 服务端经过Kafka.scala
的主函数main
方法启动。KafkaServerStartable
类提供读取配置文件、启动/中止服务的方法。而启动/中止服务最终调用的是KafkaServer
的startup/shutdown
方法。api
Acceptor
,即启动 NIO Socket。num.network.threads
个接收器到请求通道RequestChannel
的处理器缓存ConcurrentHashMap
,key 为递增编号,value 为处理器Processor
。Acceptor
执行CountDownLatch.await
等待通知启动。Acceptor
到ConcurrentHashMap
,key 为EndPoint
,value 为Acceptor
。KafkaApis
。num.io.threads
个请求处理器线程KafkaRequestHandler
。ArrayBlockingQueue
获取请求,调用KafkaApis.handle
方法,进行集中处理请求。CountDownLatch.countDown
通知唤醒Acceptor
线程。
NIO.select
轮询。SocketChannel
加入缓存队列ConcurrentLinkedQueue
SocketChannel
,绑定到KafkaChannel
。ArrayBlockingQueue
def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件 startupComplete() // 通知 Acceptor 线程 var currentProcessor = 0 while (isRunning) { val ready = nioSelector.select(500) // 轮询事件 if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { val key = iter.next iter.remove() if (key.isAcceptable) { // 有可接受事件 val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) // 缓存 Processor } accept(key, processor) // 将 SocketChannel 缓存到队列 } } } } }
override def run() { startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。 while (isRunning) { configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel processNewResponses() // 处理返回客户端的响应 poll() // Kafka.Selector 轮询读取/写入事件 processCompletedReceives() // 处理客户端的请求,放到阻塞队列 processCompletedSends() // 处理返回客户端响应后的回调 processDisconnected() // 断开链接后的处理 } }
def run() { while (!stopped) { val startSelectTime = time.nanoseconds // 从阻塞队列拉取请求 val req = requestChannel.receiveRequest(300) req match { case request: RequestChannel.Request => try { apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。 } } } }
参考客户端源码分析。缓存