最近阅读了kafka network包的源码,主要是想了解下kafka底层通讯的一些细节,这部分都是用NIO实现的,而且用的是最基本的NIO实现模板,代码阅读起来也比较简单。抛开zookeeper这部分的通讯不看,咱们就看最基本的producer和consumer之间的基于NIO的通讯模块。在network中主要包含如下类: 数组
咱们挑选几个最主要的类说明,先从SocketServer的描述看起: 网络
/** * An NIO socket server. The thread model is * 1 Acceptor thread that handles new connections * N Processor threads that each have their own selectors and handle all requests from their connections synchronously */在 SocketServer 中采用 processors 数组保存 processor
Private val processors = new Array[Processor](numProcessorThreads)
在AbstractServerThread继承了runnable,其中采用闭锁控制开始和结束,主要做用是为了实现同步。同时打开selector,为后续的继承者使用。 app
protected val selector = Selector.open(); protected val logger = Logger.getLogger(getClass()) private val startupLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1) private val alive = new AtomicBoolean(false)这个类是后续讲到的两个类的基类,而且闭锁的应用是整个同步做用实现的关键,咱们看一组 stratup 的闭锁操做,其中 Unit 在 scala 语法中你能够把他认为是 void ,也就是方法的返回值为空:
/** * Wait for the thread to completely start up */ def awaitStartup(): Unit = startupLatch.await /** * Record that the thread startup is complete */ protected def startupComplete() = { alive.set(true) startupLatch.countDown }Acceptor继承了AbstractServerThread,虽然叫Acceptor,可是它并无单独拿出来使用,而是直接被socketServer引用,这点在命名和使用上与通常的通讯框架不一样:
private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor], val sendBufferSize: Int, val receiveBufferSize: Int) extends AbstractServerThread {
这个类中主要实现了ServerSocketChannel的相关工做: 框架
val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) serverChannel.socket.bind(new InetSocketAddress(port)) serverChannel.register(selector, SelectionKey.OP_ACCEPT); logger.info("Awaiting connections on port " + port) startupComplete()
其内部操做和NIO同样: socket
/* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize) val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setSendBufferSize(sendBufferSize) if (logger.isDebugEnabled()) { logger.debug("sendBufferSize: [" + socketChannel.socket().getSendBufferSize() + "] receiveBufferSize: [" + socketChannel.socket().getReceiveBufferSize() + "]") } processor.accept(socketChannel) }
Procesor类继承了abstractServerThread,其实主要是在Acceptor类中的accept方法中,又新启一个线程来处理读写操做: spa
private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping, val time: Time, val stats: SocketServerStats, val maxRequestSize: Int) extends AbstractServerThread
因此整个kafka中使用的NIO的模型能够归结为下图: 线程
socketServer中引用Acceptor处理多个client过来的connector,并为每一个connection建立出一个processor去单独处理,每一个processor中均引用独立的selector。 scala
总体来讲,这样的设计和咱们在用NIO写传统的通讯没有什么区别,只是这里在同步上稍微作了点儿文章。更详细的网络操做仍是请看mina系列的分析。 debug