kafka AdminClient 闲时关闭链接

AdminClient 类提供了建立、删除 topic 的 api。apache

在项目中建立了一个 AdminClient 对象,每次建立 topic 时,调用api

org.apache.kafka.clients.admin.AdminClient#createTopics

若是长时间不使用这个对象,客户端与 broker 之间的链接会被关掉,相关的参数:this

connections.max.idle.ms

这个最大空闲参数在 broker 和 客户端均可以配置,即 broker 和客户端都会关闭空闲过久的链接。spa

org.apache.kafka.common.network.Selector#maybeCloseOldestConnection.net

    private void maybeCloseOldestConnection(long currentTimeNanos) {
        if (idleExpiryManager == null)
            return;

        Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
        if (expiredConnection != null) {
            String connectionId = expiredConnection.getKey();
            KafkaChannel channel = this.channels.get(connectionId);
            if (channel != null) {
                if (log.isTraceEnabled())
                    log.trace("About to close the idle connection from {} due to being idle for {} millis",
                            connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
                channel.state(ChannelState.EXPIRED);
                close(channel, CloseMode.GRACEFUL);
            }
        }
    }

org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnectioncode

lruConnections 是 LinkedHashMap 类型,能够按照插入和访问顺序进行排序,这里是按访问顺序进行排序,访问过的顺序放到双向链表的结尾。对象

        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
            if (currentTimeNanos <= nextIdleCloseCheckTime)
                return null;

            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
                return null;
            }

            Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
            Long connectionLastActiveTime = oldestConnectionEntry.getValue();
            nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;

            if (currentTimeNanos > nextIdleCloseCheckTime)
                return oldestConnectionEntry;
            else
                return null;
        }
相关文章
相关标签/搜索