在使用redisson消息订阅时,我针对门店商品库存减扣进行订阅的操做(在这里一个商品一个监听队列),当正式投入生产时,发现一直再报Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
的错误,索性根据提示翻了翻源码看看缘由:java
在redisson里先关注一个类:RedisPubSubConnection
该类继承自RedisConnection
,根据名字咱们可知它是一个典型的发布与订阅的类。那么在redisson
使用时,会使用PubSubConnectionEntry
进行一次包装:git
public class PubSubConnectionEntry { private final AtomicInteger subscribedChannelsAmount; private final RedisPubSubConnection conn; private final ConcurrentMap<ChannelName, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<ChannelName, SubscribeListener>(); private final ConcurrentMap<ChannelName, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<ChannelName, Queue<RedisPubSubListener<?>>>(); public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection); } //.....省略其余代码 }
在这里咱们能够看到其有一个比较重要的属性 subscribedChannelsAmount
,而这个值就是经过PublishSubscribeService
进行调用的:github
private void connect(final Codec codec, final ChannelName channelName, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) { //.... RedisPubSubConnection conn = future.getNow(); final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); //.... }
那么此属性就是根据config的subscriptionsPerConnection
里设置的,那么此值就表明了每一个链接的最大订阅数。当tryAcqcurie
的时候会减小这个数量:redis
public int tryAcquire() { while (true) { int value = subscribedChannelsAmount.get(); if (value == 0) { return -1; } if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { return value - 1; } } }
若是当此值为0时,那么会从新获取一个可用的链接,代码以下:spring
int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); subscribe(channelName, promise, type, lock, oldEntry, listeners); return; } if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release();
若是此时没有可用的链接的话,恐怕这次操做就会等待新的链接直至超时,超时了就报上述的错误了,不过根据提示。咱们此时的解决办法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。当咱们使用springboot时能够经过设置spring.redis.redisson.config
(具体设置请参考官网)来指定redisson的配置文件或者从新建立RedissonClient:promise
@Bean(destroyMethod = "shutdown") public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException { Config config = new Config(); String prefix = "redis://"; Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl"); if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) { prefix = "rediss://"; } config.useSingleServer() .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort()) .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在这里指定数目 .setDatabase(redisProperties.getDatabase()) .setPassword(redisProperties.getPassword()); return Redisson.create(config); }