关于SpringBoot中Redis线程池的有关探讨

探讨原由

最近在写一个小项目,用redis过时来实现验证码的时间限制。由于SpringBoot默认采用 lettuce做为客户端,引入了commons-pool2依赖以后作了以下配置:java

spring:
 redis:
 host: 192.168.56.1
 lettuce:
 pool:
 min-idle: 2
 max-active: 8     #默认
 max-idle: 8       #默认
复制代码

原本觉得这样作就好了,而后写了以下代码测了下node

@Test
    public void test() throws InterruptedException {
        int i;
        CountDownLatch c = new CountDownLatch(5000);
        long x = System.currentTimeMillis();
        for (i = 0; i < 5000; i++) {
            new Thread(() -> {
                redisTemplate.opsForValue().set("1", "1");
                c.countDown();
            }).start();
        }
        c.await();
    }
复制代码

测试期间,实际客户端最大接入:react

# Clients # 减去一个redis-cli链接,一个为以前的项目链接
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
复制代码

???,我设置的配置去哪里了???,因而开始了漫长的探索redis

尝试

首先看下默认下是怎样的。去除了pool的设置,再跑一下。spring

# Clients # 减去一个redis-cli链接,一个为以前的项目链接
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
复制代码

很好,一样的结果。说明刚刚的配置根本没有效果。没准是lettuce的缘由?因而我修改了pom,用jedis测试了一下。shell

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.1.7.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>lettuce-core</artifactId>
                    <groupId>io.lettuce</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <!-- 注意版本和SpringData兼容 -->
            <version>2.9.1</version>
        </dependency>
复制代码
spring:
 redis:
 host: 192.168.56.1
 jedis:
 pool:
 min-idle: 2
 max-active: 8
 max-idle: 8
复制代码

看下结果:bash

# Clients # 减去一个redis-cli链接,一个为以前的项目链接
connected_clients:10
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
复制代码

最大换成15再试试:app

# Clients
connected_clients:17
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
复制代码

jedis线程池正常,那说明是lettuce的配置上除了问题。async

探索

首先先来看看LettuceConnectionFactory中的getConnection是怎样实现的。ide

public RedisConnection getConnection() {
        if (this.isClusterAware()) {
            return this.getClusterConnection();
        } else {
            LettuceConnection connection;
            if (this.pool != null) {
                connection = new LettuceConnection(this.getSharedConnection(), this.getTimeout(), (AbstractRedisClient)null, this.pool, this.getDatabase());
            } else {
                connection = new LettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase());
            }

            connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
            return connection;
        }
    }
复制代码

首先,是关于集群判断,由于我没设置任何集群相关,因此直接来到else。此处的pool 并非咱们设置的pool。它对应的是以下方法,且只有这一处对它进行赋值:

/** * @param pool * @deprecated since 2.0, use pooling via {@link LettucePoolingClientConfiguration}. */
	@Deprecated
	public LettuceConnectionFactory(LettucePool pool) {

		this(new MutableLettuceClientConfiguration());
		this.pool = pool;
	}
复制代码

LettuceConnectionConfiguration中的注入继续追下去能够看到并无调用这个过期方法:

@Bean
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources) throws UnknownHostException {
		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,
				this.properties.getLettuce().getPool());
		return createLettuceConnectionFactory(clientConfig);
	}

	// 以后来到
	public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig, LettuceClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null!");

		this.standaloneConfig = standaloneConfig;
		this.configuration = this.standaloneConfig;
	}
复制代码

因此,上面判断时此的pool恒为null,进入else。在实例化以前,首先会根据getSharedConnection获取一个StatefulRedisConnection链接:

protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
		return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
	}

	private SharedConnection<byte[]> getOrCreateSharedConnection() {

		synchronized (this.connectionMonitor) {

			if (this.connection == null) {
				this.connection = new SharedConnection<>(connectionProvider);
			}

			return this.connection;
		}
	}
复制代码

再接着向下则进入LettuceConnection构造函数:

LettuceConnection(@Nullable StatefulConnection<byte[], byte[]> sharedConnection,
			LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {

		Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null.");

		this.asyncSharedConn = sharedConnection;
		this.connectionProvider = connectionProvider;
		this.timeout = timeout;
		this.defaultDbIndex = defaultDbIndex;
		this.dbIndex = this.defaultDbIndex;
	}
复制代码

此时的this.asyncSharedConn = sharedConnection;,再接着向下,咱们会进入以下方法:

protected RedisClusterCommands<byte[], byte[]> getConnection() {
		// 在事务未开启时此处为false默认值
		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
复制代码

这里的asyncSharedConn即为咱们上面的共享实例链接。

分析一下,在默认状况下:

private boolean shareNativeConnection = true;
复制代码

new LettuceConnection时,通过StatefulRedisConnection()getOrCreateSharedConnection()后传入了同一个共享链接。因此在此时即便配置了线程池,在运行时有且只有一个共享实例提供操做。 那咱们验证一下,我尝试人为改变一下shareNativeConnection

@Configuration
public class Config {
    @Autowired
    public void setLettuceConnectionFactory(LettuceConnectionFactory lettuceConnectionFactory){
        lettuceConnectionFactory.setShareNativeConnection(false);
    }
}
复制代码

仍是按下面的配置及测试:

spring:
 redis:
 host: 192.168.56.1
 lettuce:
 pool:
 min-idle: 2
复制代码
@Test
    public void test() throws InterruptedException {
        int i ;
        CountDownLatch c = new CountDownLatch(5000);
        for (i = 1; i <= 5000; i++) {
            new Thread(() -> {
                System.out.println(redisTemplate.execute(RedisConnection::ping));
                c.countDown();
            }).start();
        }
        c.await();
    }
复制代码

以后结果:

# Clients
connected_clients:10                    # 减去一个redis-cli链接,一个为以前的项目链接
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
复制代码

这个结果10中的8个为pool中的默认max-active属性,但这不意味不设置pool属性时仍为默认值(下面会说)。再将max-active改成10,结果也是相符:

# Clients # 减去一个redis-cli链接,一个为以前的项目链接
connected_clients:12
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
复制代码

那改变以后和以前有何区别呢?修改后,首先在new LettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase());时,传入的共享实例为null,接着也是进入以下方法:

protected RedisClusterCommands<byte[], byte[]> getConnection() {
		// 在事务未开启时此处为false默认值
		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
复制代码

再接着则会进入getDedicatedConnection()

RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {

		if (asyncDedicatedConn == null) {

			asyncDedicatedConn = doGetAsyncDedicatedConnection();

			if (asyncDedicatedConn instanceof StatefulRedisConnection) {
				((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
			}
		}

		if (asyncDedicatedConn instanceof StatefulRedisConnection) {
			return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}
		if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
			return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).sync();
		}

		throw new IllegalStateException(
				String.format("%s is not a supported connection type.", asyncDedicatedConn.getClass().getName()));
	}
复制代码

由于首次进入,asyncDedicatedConn确定为null,继续向下doGetAsyncDedicatedConnection();

protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
		return connectionProvider.getConnection(StatefulConnection.class);
	}
复制代码

此时能够看到,链接的提供则会由LettuceConnectionFactory中传入的connectionProvider负责。connectionProviderafterPropertiesSet()进行初始化:

public void afterPropertiesSet() {

		this.client = createClient();

		this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
		this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);

		if (isClusterAware()) {
			this.clusterCommandExecutor = new ClusterCommandExecutor(
					new LettuceClusterTopologyProvider((RedisClusterClient) client),
					new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
					EXCEPTION_TRANSLATION);
		}
    
    // 建立方法 
	private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

		LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);

		if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
			return new LettucePoolingConnectionProvider(connectionProvider,
					(LettucePoolingClientConfiguration) this.clientConfiguration);
		}

		return connectionProvider;
	}
复制代码

能够看到connectionProvider与上面注入的this.properties.getLettuce().getPool()的配置息息相关。首先在建立时都为doCreateConnectionProvider:

protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

		ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null);

		if (isStaticMasterReplicaAware()) {

			List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() //
					.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort())) //
					.peek(it -> it.setDatabase(getDatabase())) //
					.collect(Collectors.toList());

			return new StaticMasterReplicaConnectionProvider((RedisClient) client, codec, nodes, readFrom);
		}

		if (isClusterAware()) {
			return new ClusterConnectionProvider((RedisClusterClient) client, codec, readFrom);
		}

		return new StandaloneConnectionProvider((RedisClient) client, codec, readFrom);
	}
复制代码

pool在配置中为null,那connectionProvider则为StandaloneConnectionProvider,不然则为LettucePoolingConnectionProvider。而在getConnection的实现上,二者也存在巨大区别。

// LettucePoolingConnectionProvider 
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

		GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
			return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
					poolConfig, false);
		});

		try {

			StatefulConnection<?, ?> connection = pool.borrowObject();

			poolRef.put(connection, pool);

			return connectionType.cast(connection);
		} catch (Exception e) {
			throw new PoolException("Could not get a resource from the pool", e);
		}
	}

// StandaloneConnectionProvider
    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

		if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
			return connectionType.cast(client.connectSentinel());
		}

		if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
			return connectionType.cast(client.connectPubSub(codec));
		}

		if (StatefulConnection.class.isAssignableFrom(connectionType)) {

			return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
					.orElseGet(() -> client.connect(codec)));
		}

		throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
	}
复制代码

至此即可看到:若是咱们将shareNativeConnection设置为false以后便会有两种情形:

  • 没有设置pool:StandaloneConnectionProvider每次请求都会建立一个链接
  • 设置了pool:LettucePoolingConnectionProvider则是以线程池的方式建立链接。

那咱们验证一下,若是我将shareNativeConnection设置为false同时不在application.yml对线程池进行任何设置,结果以下:

# Clients
connected_clients:591
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:771
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:885
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:974
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
复制代码

这链接数是直接放飞自我了。。。。。并且伴随着报错的发生。不过设置pool后,由线程池进行管理链接则可获得人为控制。

总结

经过探索,我发现Lettuce的poolshareNativeConnection息息相关,若有不对的地方,望能指证。

相关文章
相关标签/搜索