SocketServer主要用于接收外部的网络请求,并把请求添加到请求队列中。java
在KafkaServer.scala中的start方法中,有这样的入口:ios
socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup()
这块就是启动了一个SocketServer,咱们具体看一下。数组
咱们看下SocketServer里面包含的参数:缓存
private val endpoints = config.listeners private val numProcessorThreads = config.numNetworkThreads private val maxQueuedRequests = config.queuedMaxRequests private val totalProcessorThreads = numProcessorThreads * endpoints.siz private val maxConnectionsPerIp = config.maxConnectionsPerIp private val maxConnectionsPerIpOverrides config.maxConnectionsPerIpOverride this.logIdent = "[Socket Server on Broker " + config.brokerId + "], " val requestChannel = new RequestChannel(totalProcessorThreadsmaxQueuedRequests) private val processors = new Array[Processor](totalProcessorThreads) private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _
这里面涉及几个配置内容:网络
启动的代码以下:session
/** * Start the socket server */ def startup() { this.synchronized { //每一个ip的链接数限制 connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId //这里根据每个endpoint(也就是配置的listener的协议与端口),生成处理的网络线程Processor与Acceptor实例.并启动endpoint对应的Acceptor实例.在生成Acceptor的实例时,会同时启动此实例中对应的线程处理实例数组Processor. var processorBeginIndex = 0 endpoints.values.foreach { endpoint => val protocol = endpoint.protocolType val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, protocol) val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } } newGauge("NetworkProcessorAvgIdlePercent", new Gauge[Double] { def value = allMetricNames.map( metricName => metrics.metrics().get(metricName).value()).sum / totalProcessorThreads } ) info("Started " + acceptors.size + " acceptor threads") }
这块涉及到几个配置项,主要用于生成socket中的SO_SNDBUF和SO_RCVBUF。socket
咱们先看下这个简单的赋值。ide
protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, protocol, config.values, metrics ) }
其实就是Processor的实例生成,主要涉及几个配置项:函数
每一个endPoint对应一个Acceptor,也就是每一个listener对应一个Acceptor。Acceptor主要用于接收网络请求,将请求分发到processor处理。咱们来看下Acceptor的run方法:this
def run() { //将channel注册到selector上 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { //这里进行堵塞接收,最多等500ms,若是ready返回的值是0表示尚未准备好,不然表示准备就绪.表示有通道已经被注册 val ready = nioSelector.select(500) if (ready > 0) { //这里获得已经准备好的网络通道的key的集合 val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() //若是selectkey已经注册到accept事件,经过accept函数与对应的线程Processor进行处理.这里表示这个socket的通道包含有一个client端的链接请求. if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread //每次接收一个socket请求后,用于处理的线程进行轮询到一个线程中处理. currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want the // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }
下面咱们看下accept方法:
/* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] //获得请求的socket通道 val socketChannel = serverSocketChannel.accept() try { //这里检查当前的IP的链接数是否已经达到了最大的链接数,若是是,直接throw too many connect. connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) //对应的processor处理socket通道 processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } }
上面accept方法中,调用到了processor的accept方法,咱们看下这个accept方法:
/** * Queue up a new connection for reading */ def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }
其实就是向队列中新增了一个socket通道,等待processor线程处理。下面咱们看下processor是怎么处理的。
override def run() { startupComplete() while (isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop. case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception.", e) } } debug("Closing selector - processor " + id) swallowError(closeAll()) shutdownComplete() }
这块实际上是个门面模式,里面调用的内容比较多,咱们一一看一下。
这块是从队列中取一个链接,并注册到selector上。
/** * Register any new connections that have been queued up */ private def configureNewConnections() { while (!newConnections.isEmpty) { val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString selector.register(connectionId, channel) } catch { // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other // throwables will be caught in processor and logged as uncaught exceptions. case NonFatal(e) => // need to close the channel here to avoid a socket leak. close(channel) error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e) } } }
private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while (curr != null) { try { curr.responseAction match { case RequestChannel.NoOpAction => // There is no response to send to the client, we need to read more pipelined requests // that are sitting in the server's socket buffer curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) selector.unmute(curr.request.connectionId) case RequestChannel.SendAction => sendResponse(curr) case RequestChannel.CloseConnectionAction => curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") close(selector, curr.request.connectionId) } } finally { curr = requestChannel.receiveResponse(id) } } }
private def poll() { try selector.poll(300) catch { case e @ (_: IllegalStateException | _: IOException) => error(s"Closing processor $id due to illegal state or IO exception") swallow(closeAll()) shutdownComplete() throw e } }
@Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); } addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); maybeCloseOldestConnection(); }
这块主要看一下pollSelectionKeys方法:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); lruConnections.put(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
这里开始处理socket通道中的请求,根据以下几个流程进行处理:
获得对应的请求的Request的实例,并把这个Request经过SocketServer中的RequestChannel的sendRequest的函数,把请求添加到请求的队列中.等待KafkaApis来进行处理.
private def processCompletedReceives() { selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) //这是重点!!!能够看下KafkaApis对消息的处理,后续会分析到 requestChannel.sendRequest(req) selector.mute(receive.source) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error(s"Closing socket for ${receive.source} because of error", e) close(selector, receive.source) } } }
这里的send完成表示有向client端进行响应的写操做处理完成
private def processCompletedSends() { selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() selector.unmute(send.destination) } }
若是socket server中包含有已经关闭的链接,减小这个quotas中对此ip的链接数的值. 这个状况包含connect处理超时或者说有connect的消息处理错误被发起了close的请求后的处理成功的消息.
private def processDisconnected() { selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }