Kafka.network包源码解读

最近阅读了kafka network包的源码,主要是想了解下kafka底层通讯的一些细节,这部分都是用NIO实现的,而且用的是最基本的NIO实现模板,代码阅读起来也比较简单。抛开zookeeper这部分的通讯不看,咱们就看最基本的producerconsumer之间的基于NIO的通讯模块。在network中主要包含如下类: 数组

咱们挑选几个最主要的类说明,先从SocketServer的描述看起: 网络

/**
 * An NIO socket server. The thread model is
 *   1 Acceptor thread that handles new connections
 *   N Processor threads that each have their own selectors and handle all requests from their connections synchronously
 */
SocketServer 中采用 processors 数组保存 processor
Private val processors = new Array[Processor](numProcessorThreads)

AbstractServerThread继承了runnable,其中采用闭锁控制开始和结束,主要做用是为了实现同步。同时打开selector,为后续的继承者使用。 app

protected val selector = Selector.open();
  protected val logger = Logger.getLogger(getClass())
  private val startupLatch = new CountDownLatch(1)
  private val shutdownLatch = new CountDownLatch(1)
  private val alive = new AtomicBoolean(false)
这个类是后续讲到的两个类的基类,而且闭锁的应用是整个同步做用实现的关键,咱们看一组 stratup 的闭锁操做,其中 Unit scala 语法中你能够把他认为是 void ,也就是方法的返回值为空:
/**
   * Wait for the thread to completely start up
   */
  def awaitStartup(): Unit = startupLatch.await

  /**
   * Record that the thread startup is complete
   */
  protected def startupComplete() = {
    alive.set(true)
    startupLatch.countDown
  }
Acceptor继承了AbstractServerThread,虽然叫Acceptor,可是它并无单独拿出来使用,而是直接被socketServer引用,这点在命名和使用上与通常的通讯框架不一样:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {

这个类中主要实现了ServerSocketChannel的相关工做: 框架

val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket.bind(new InetSocketAddress(port))
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    logger.info("Awaiting connections on port " + port)
    startupComplete()

其内部操做和NIO同样: socket

/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
    
    val socketChannel = serverSocketChannel.accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    if (logger.isDebugEnabled()) {
      logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() 
          + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]")
    }

    processor.accept(socketChannel)
  }

Procesor类继承了abstractServerThread,其实主要是在Acceptor类中的accept方法中,又新启一个线程来处理读写操做: spa

private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
                               val time: Time,
                               val stats: SocketServerStats,
                               val maxRequestSize: Int) extends AbstractServerThread

因此整个kafka中使用的NIO的模型能够归结为下图: 线程

socketServer中引用Acceptor处理多个client过来的connector,并为每一个connection建立出一个processor去单独处理,每一个processor中均引用独立的selector。 scala

总体来讲,这样的设计和咱们在用NIO写传统的通讯没有什么区别,只是这里在同步上稍微作了点儿文章。更详细的网络操做仍是请看mina系列的分析。 debug

相关文章
相关标签/搜索