kafka的请求都是经过socket进行通讯的,网络层就是负责接收请求,而且发送响应的。kafka网络层使用了java的nio异步框架,大大提升了性能。java
Acceptor只监听新的链接,而后经过新的链接轮询发送给Processor。算法
Processor负责与链接的数据交互,而且将请求转发给RequestHandler处理。网络
RequestHandler负责处理Processor转发的请求。session
KafkaSelector是对java nio的Selector封装,负责读取客户的请求和发送响应。框架
SocketServer负责上述类的初始化异步
def startup() { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) ....... var processorBeginIndex = 0 // config.listeners表示监听地址 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName // numProcessorThreads表示Processor的数目 val processorEndIndex = processorBeginIndex + numProcessorThreads // 初始化Processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol) // 初始化Acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) // 启动acceptor线程 Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() // 等待Acceptor完成开始动做 acceptor.awaitStartup() processorBeginIndex = processorEndIndex }
AbstractServerThread是Runnable的封装,提供startup和shutdown的过程。它是Acceptor和Processor的基类。下面介绍startup过程,shutdown过程相似。socket
abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { // 计数器,代表是否startup完成 private val startupLatch = new CountDownLatch(1) @volatile private var shutdownLatch = new CountDownLatch(0) // 等待线程startup阶段完成,由外界调用 def awaitStartup(): Unit = startupLatch.await // 通知startup完成,由线程自身调用 protected def startupComplete(): Unit = { // Replace the open latch with a closed one shutdownLatch = new CountDownLatch(1) startupLatch.countDown() }
Acceptor的初始化过程ide
// 将java nio的Selector重命名为NSelector import java.nio.channels.{Selector => NSelector} // 实例化NSelector private val nioSelector = NSelector.open() // 实例化server channel val serverChannel = openServerSocket(endPoint.host, endPoint.port) // 启动Processor this.synchronized { processors.foreach { processor => Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor, false).start() } }
openServerSocket方法性能
private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) // 实例化serverChannel val serverChannel = ServerSocketChannel.open() // 设置非阻塞 serverChannel.configureBlocking(false) // bind地址 serverChannel.socket.bind(socketAddress) .......
Acceptor本质是一个线程,下面是它的run方法this
def run() { // 注册监听ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) ......... // currentProcessor表示processor的索引号 var currentProcessor = 0 while (isRunning) { // 等待500ms,返回就绪的通道 val ready = nioSelector.select(500) if (ready > 0) { // 取出就绪的keys val keys = nioSelector.selectedKeys() val iter = keys.iterator() // 遍历keys while (iter.hasNext && isRunning) { if (key.isAcceptable) // 接收新的链接,发送给索引为currentProcessor的processor accept(key, processors(currentProcessor)) // 更新currentProcessor,这里体现了轮询的算法 currentProcessor = (currentProcessor + 1) % processors.length ........
下面是accept方法,它只是接收的新的链接,而后发送给processor
def accept(key: SelectionKey, processor: Processor) { ......... // 接收新的链接 val socketChannel = serverSocketChannel.accept() // 设置非阻塞 socketChannel.configureBlocking(false) // 禁用了Nagle 算法,保证数据能立马发送出去 socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) // 调用processor的accept方法 processor.accept(socketChannel) .........
Processor 属性
// newConnections是acceptor线程和processor通讯的queue, // acceptor会向队列添加新的链接,processor会从队列取出链接,而后进行处理。 newConnections = new ConcurrentLinkedQueue[SocketChannel]() //kafka的Selector selector = new KSelector(......)
下面是Processor的accept方法,在上面由acceptor调用
def accept(socketChannel: SocketChannel) { // acceptor将新的链接,加入到newConnections里面 newConnections.add(socketChannel) // 唤醒selector,使processor能即时处理新链接 wakeup()
接下来看看Processor的run方法
override def run() { startupComplete() while (isRunning) { .......... // 处理newConnections队列里面的新链接 configureNewConnections() // 处理新的响应 processNewResponses() poll() // 处理已经读取完的请求 processCompletedReceives() // 处理已经发送的请求 processCompletedSends() //处理关闭的链接 processDisconnected() ......... } ......... }
configureNewConnections方法,处理新链接
private def configureNewConnections() { while (!newConnections.isEmpty) { // 从newConnections队列里面取出新链接 val channel = newConnections.poll() // 获取链接信息 val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort // 构件链接id val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString // 调用selecotr注册新链接 selector.register(connectionId, channel) ........
下面是processNewResponses方法
private def processNewResponses() { // 从requestChannel取出链接 var curr = requestChannel.receiveResponse(id) while (curr != null) { // 根据响应的Action做不一样的处理 curr.responseAction match { case RequestChannel.NoOpAction => val channelId = curr.request.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) // 若是链接存在而且没有准备关闭,程序会继续等待读取数据。 // unmute方法,实质是注册新链接的读事件 selector.unmute(channelId) case RequestChannel.SendAction => val responseSend = curr.responseSend.getOrElse( throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr")) // 发送响应 sendResponse(curr, responseSend) case RequestChannel.CloseConnectionAction => //关闭链接 close(selector, curr.request.connectionId) } ..........
poll方法,实质就是selector.poll的封装,后面篇幅会讲到Selector的实现细节
private def poll() { ...... selector.poll(300) ...... }
processCompletedReceives方法
private def processCompletedReceives() { // 从selector遍历completedReceives selector.completedReceives.asScala.foreach { receive => val openChannel = selector.channel(receive.source) val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress) // 构建Request val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol) // 发送request到requestChannel requestChannel.sendRequest(req) // 取消这个链接的读事件,不继续读取请求 selector.mute(receive.source)
processCompletedSends方法
private def processCompletedSends() { // 从selector遍历completedSends selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } // 增长这个链接的读事件,继续读取请求 selector.unmute(send.destination) } }
processDisconnected方法
private def processDisconnected() { // 从selector遍历disconnected selector.disconnected.keySet.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request)) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }
SocketServer: 负责框架初始化。实例Acceptor和Processor
Acceptor: 使用java nio框架的Selector,绑定监端口,负责接收新链接,而且经过基于线程间的队列,把新链接轮询发送给Processor
Processor:接收acceptor的新链接,使用KSelector负责读取链接的请求,而后把请求发送给
requestChannel处理。而后从requestChannel获取响应后,将响应基于KSelector发送出去