Kafka2.0服务端启动源码

Kafka2.0服务端启动源码
  Kafka 服务端经过Kafka.scala的主函数main方法启动。KafkaServerStartable类提供读取配置文件、启动/中止服务的方法。而启动/中止服务最终调用的是KafkaServer的startup/shutdown方法。
启动流程
启动 zk 客户端。
启动动态配置。
启动调度线程池。
启动日志管理器的后台线程,包括日志清理、日志刷盘、日志删除、日志压缩。
启动 NIO Socket 服务。
初始化一个接收器Acceptor,即启动 NIO Socket。
添加num.network.threads个接收器到请求通道RequestChannel的处理器缓存ConcurrentHashMap,key 为递增编号,value 为处理器Processor。
Acceptor执行CountDownLatch.await等待通知启动。
缓存Acceptor到ConcurrentHashMap,key 为EndPoint,value 为Acceptor。
启动副本管理器。
在 zk 注册 broker。
启动控制器。
启动组协调器。
启动事务协调器。
初始化KafkaApis。
初始化处理器线程缓存池。
启动num.io.threads个请求处理器线程KafkaRequestHandler。
从阻塞队列ArrayBlockingQueue获取请求,调用KafkaApis.handle方法,进行集中处理请求。
启动处理器线程。
首先CountDownLatch.countDown通知唤醒Acceptor线程。
使用NIO.select轮询。
若是有可接收就绪的事件,则将当前的SocketChannel加入缓存队列ConcurrentLinkedQueue
从上述缓存队列取出SocketChannel,绑定到KafkaChannel。
将接收到的请求缓存到限长阻塞队列ArrayBlockingQueueapi

请求处理流程缓存

详细源码分析Acceptor 线程def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件 startupComplete() // 通知 Acceptor 线程 var currentProcessor = 0 while (isRunning) { val ready = nioSelector.select(500) // 轮询事件 if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { val key = iter.next iter.remove() if (key.isAcceptable) { // 有可接受事件 val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) // 缓存 Processor } accept(key, processor) // 将 SocketChannel 缓存到队列 } } } }}Processor 线程override def run() { startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。 while (isRunning) { configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel processNewResponses() // 处理返回客户端的响应 poll() // Kafka.Selector 轮询读取/写入事件 processCompletedReceives() // 处理客户端的请求,放到阻塞队列 processCompletedSends() // 处理返回客户端响应后的回调 processDisconnected() // 断开链接后的处理 }}KafkaRequestHandler 线程阻塞队列def run() { while (!stopped) { val startSelectTime = time.nanoseconds // 从阻塞队列拉取请求 val req = requestChannel.receiveRequest(300) req match { case request: RequestChannel.Request => try { apis.handle(request) // 调用KafkaApis.handle方法,进行集中处理请求。 } } }}KSelector
  参考客户端源码分析。ide

相关文章
相关标签/搜索