kafka-网络层框架

kafka-网络层介绍

kafka的请求都是经过socket进行通讯的,网络层就是负责接收请求,而且发送响应的。kafka网络层使用了java的nio异步框架,大大提升了性能。java

框架图

输入图片说明

Acceptor只监听新的链接,而后经过新的链接轮询发送给Processor。算法

Processor负责与链接的数据交互,而且将请求转发给RequestHandler处理。网络

RequestHandler负责处理Processor转发的请求。session

KafkaSelector是对java nio的Selector封装,负责读取客户的请求和发送响应。框架

网络层初始化-SocketServer类

SocketServer负责上述类的初始化异步

def startup() {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
       .......
      var processorBeginIndex = 0
      // config.listeners表示监听地址
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        // numProcessorThreads表示Processor的数目
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        // 初始化Processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
        // 初始化Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        // 启动acceptor线程
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        // 等待Acceptor完成开始动做
        acceptor.awaitStartup()
        processorBeginIndex = processorEndIndex
      }

AbstractServerThread类

AbstractServerThread是Runnable的封装,提供startup和shutdown的过程。它是Acceptor和Processor的基类。下面介绍startup过程,shutdown过程相似。socket

abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
    // 计数器,代表是否startup完成
    private val startupLatch = new CountDownLatch(1)
    @volatile private var shutdownLatch = new CountDownLatch(0)
  // 等待线程startup阶段完成,由外界调用
  def awaitStartup(): Unit = startupLatch.await

  // 通知startup完成,由线程自身调用
  protected def startupComplete(): Unit = {
    // Replace the open latch with a closed one
    shutdownLatch = new CountDownLatch(1)
    startupLatch.countDown()
  }

Acceptor类

Acceptor的初始化过程ide

// 将java nio的Selector重命名为NSelector
  import java.nio.channels.{Selector => NSelector}
  // 实例化NSelector
  private val nioSelector = NSelector.open()
  // 实例化server channel
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   
  // 启动Processor 
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor, false).start()
    }
  }

openServerSocket方法性能

private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    // 实例化serverChannel
    val serverChannel = ServerSocketChannel.open()
    // 设置非阻塞
    serverChannel.configureBlocking(false)
    // bind地址
    serverChannel.socket.bind(socketAddress)
.......

Acceptor本质是一个线程,下面是它的run方法this

def run() {
    // 注册监听ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    .........
    // currentProcessor表示processor的索引号
    var currentProcessor = 0
    while (isRunning) {
        // 等待500ms,返回就绪的通道
        val ready = nioSelector.select(500)
        if (ready > 0) {
            // 取出就绪的keys
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            // 遍历keys
            while (iter.hasNext && isRunning) {
                if (key.isAcceptable)
                    // 接收新的链接,发送给索引为currentProcessor的processor
                    accept(key, processors(currentProcessor))
                 // 更新currentProcessor,这里体现了轮询的算法
                currentProcessor = (currentProcessor + 1) % processors.length
                ........

下面是accept方法,它只是接收的新的链接,而后发送给processor

def accept(key: SelectionKey, processor: Processor) {
    .........
    // 接收新的链接
    val socketChannel = serverSocketChannel.accept()
    // 设置非阻塞
    socketChannel.configureBlocking(false)
    // 禁用了Nagle 算法,保证数据能立马发送出去
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
   
    // 调用processor的accept方法
    processor.accept(socketChannel)
    .........

Processor类

Processor 属性

// newConnections是acceptor线程和processor通讯的queue,
//  acceptor会向队列添加新的链接,processor会从队列取出链接,而后进行处理。
newConnections = new ConcurrentLinkedQueue[SocketChannel]()

//kafka的Selector
selector = new KSelector(......)

下面是Processor的accept方法,在上面由acceptor调用

def accept(socketChannel: SocketChannel) {
   // acceptor将新的链接,加入到newConnections里面
   newConnections.add(socketChannel)
   // 唤醒selector,使processor能即时处理新链接
   wakeup()

接下来看看Processor的run方法

override def run() {
    startupComplete()
    while (isRunning) {
        ..........
        // 处理newConnections队列里面的新链接
        configureNewConnections()
        // 处理新的响应
        processNewResponses()
        
        poll()
        // 处理已经读取完的请求
        processCompletedReceives()
        // 处理已经发送的请求
        processCompletedSends()
        //处理关闭的链接
        processDisconnected()
        .........
    }

    .........
  }

configureNewConnections方法,处理新链接

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      // 从newConnections队列里面取出新链接
      val channel = newConnections.poll()
      // 获取链接信息
      val localHost = channel.socket().getLocalAddress.getHostAddress
      val localPort = channel.socket().getLocalPort
      val remoteHost = channel.socket().getInetAddress.getHostAddress
      val remotePort = channel.socket().getPort
      // 构件链接id
      val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
      // 调用selecotr注册新链接
      selector.register(connectionId, channel)
      ........

下面是processNewResponses方法

private def processNewResponses() {
    // 从requestChannel取出链接
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
        // 根据响应的Action做不一样的处理
        curr.responseAction match {
            case RequestChannel.NoOpAction =>
                val channelId = curr.request.connectionId
                if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                    // 若是链接存在而且没有准备关闭,程序会继续等待读取数据。
                    // unmute方法,实质是注册新链接的读事件
                    selector.unmute(channelId)
           case RequestChannel.SendAction =>
                val responseSend = curr.responseSend.getOrElse(
                          throw new IllegalStateException(s"responseSend must be defined for SendAction,         response: $curr"))
                // 发送响应
                sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
                //关闭链接
                close(selector, curr.request.connectionId)
        }
    ..........

poll方法,实质就是selector.poll的封装,后面篇幅会讲到Selector的实现细节

private def poll() {
    ......
    selector.poll(300)
    ......
 }

processCompletedReceives方法

private def processCompletedReceives() {
    // 从selector遍历completedReceives
    selector.completedReceives.asScala.foreach { receive =>
    val openChannel = selector.channel(receive.source)
    val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
    val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
    // 构建Request
    val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          buffer = receive.payload, startTimeNanos = time.nanoseconds,
          listenerName = listenerName, securityProtocol = securityProtocol)
    // 发送request到requestChannel
    requestChannel.sendRequest(req)
    // 取消这个链接的读事件,不继续读取请求
    selector.mute(receive.source)

processCompletedSends方法

private def processCompletedSends() {
    // 从selector遍历completedSends
    selector.completedSends.asScala.foreach { send =>
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      // 增长这个链接的读事件,继续读取请求
      selector.unmute(send.destination)
    }
  }

processDisconnected方法

private def processDisconnected() {
    // 从selector遍历disconnected
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }

归纳

SocketServer: 负责框架初始化。实例Acceptor和Processor

Acceptor: 使用java nio框架的Selector,绑定监端口,负责接收新链接,而且经过基于线程间的队列,把新链接轮询发送给Processor

Processor:接收acceptor的新链接,使用KSelector负责读取链接的请求,而后把请求发送给

requestChannel处理。而后从requestChannel获取响应后,将响应基于KSelector发送出去

相关文章
相关标签/搜索