关于控制Broker端入站链接数的讨论

Kafka Broker端处理请求采用Reactor模型。每台Broker上有个相似于Dispatcher的Acceptor线程,还有若干个处理请求的Processor线程(固然真正处理请求逻辑的线程不是Processor,其实是KafkaRequestHandler)。每一个Processor线程启动后大体作如下这么几件事情:java

1. 设置新的入站链接bootstrap

2. 处理新的请求响应(所谓的处理也就是放入到响应队列中)安全

3. 执行Selector.select操做获取那些准备完毕的IO操做socket

4. 接收新的入站请求spa

5. 执行已发送响应的回调逻辑.net

6. 处理已断开链接线程

每一个Broker启动以后它建立的Processor线程会不停地执行以上这些动做,循环往复,直至Broker被关闭。debug

咱们重点看看第一步中的逻辑,如下是1.1.1版本的源码(选择1.1.1版本不是特地的,其实全部2.3版本以前都是差很少的情形):code

/**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        selector.register(connectionId(channel.socket), channel)
      } catch {
        // We explicitly catch all exceptions and close the socket to avoid a socket leak.
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

注意我标成红色的语句。基本上Processor线程设置新入站链接的方式就是一次性处理完才罢休。代码中的newConnections是java.util.concurrent.ArrayBlockingQueue实例。Acceptor线程也会访问newConnections,所以必须是线程安全的。server

这种一次性处理完成才收手的作法在某些状况下是有风险的,好比当Kafka集群遭遇到DDOS攻击时,外部IP会建立海量的入站链接所有砸向newConnections中。此时Processor线程运行时会一直尝试消耗掉这些新链接,不然它不会干其余事情——好比处理请求等。换句话说,目前Kafka对新入站链接的处理优先级要高于已有链接。当遭遇链接风暴时,Kafka Broker端会优先处理新链接,所以可能形成已有链接上的请求处理被暂停,并最终致使超时。这样客户端获得请求超时通知后会会进一步地发送新的请求,于是出现雪崩效应。

 

另外Broker端维护每一个链接也不是没有开销的。链接信息自己确定要占用一些内容资源。若是是启用了SSL的链接,Kafka为额外为其维护一个48KB的临时缓冲区。所以一旦遭遇链接风暴,OOM错误是很常见的。

 

鉴于这些缘由,社区在2.3版本改进了Broker端处理新链接请求的方式。首先阻塞队列保存新链接的个数再也不是没有限制了,而是被固定为20,即每一个Processor的新链接队列最大就是20个链接——这个写死在代码里面了,目前无法修改。第2、社区引入了新参数max.connections,用于控制Broker端所容许链接的最大链接数。你能够调节这个参数来控制一个Broker最多能接收多少个入站链接。这个参数能够在server.properties中被设置,也可使用kafka-configs脚本动态修改。max.connections是全局性的,你也能够给每一个监听器设置不一样的链接数上限。好比你的监听器中同时使用了PLAINTEXT和SSL,那么你可以使用listener.name.plaintext.max.connections和listener.name.ssl.max.connections来为这两个listeners配置各自的链接数,命令以下:

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config max.connections=100$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.plaintext.max.connections=80
Completed updating config for broker: 0.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.ssl.max.connections=80
Completed updating config for broker: 0.

 

第三是Kafka Broker的每一个Processor线程会在每轮任务结束以前尝试去关闭多余的链接。判断是否须要关闭多余链接的依据有两点:1. 总的链接数超过了max.connections值;2. 你为Broker设置了多个监听器,但Kafka会保护Broker内部链接使用的那个监听器。好比你若是设置了多个监听器:PLAINTEXT://9092, SSL://9093,SASL://9094,而后设置inter.broker.listener.name=SSL,那么SSL这套监听器下的链接是不会被Processor强行关闭的。

 

最后提一句,若是全部Processor的阻塞队列都满了, 那么前面的Acceptor线程会阻塞住,不会再接收任何入站请求。社区新增长了一个JMX指标来计算Acceptor线程被阻塞的时间比例:kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listenerName}

相关文章
相关标签/搜索