公司在访问Redis时使用了JedisPool。当Redis实例不可达时,会将该实例放入黑名单。后台线程周期性扫描黑名单列表,若是可达,则恢复。在检测时会新建新的JedisPool,经过jedisPool.getResource().close()
的方式检测可达性。因为是周期性检测,每次检测都会new一个新的JedisPool,并且在建立JedisPool时,配置了minIdle为1。这样就埋下隐患。若是Redis长时间不可达,会new不少的JedisPool,当Redis恢复时,因为JedisPool有后台的周期性驱逐线程(若是链接长时间空闲,则销毁;为保证该pool内有足够minIdle链接,又会建立新的链接),这样会建立大量的链接。达到Redis的最大链接数限制,正常请求的链接会收到服务端返回的ERR max number of clients reached
错误而抛出异常。注意,虽然客户端收到了错误,可是对于客户端而言链接是创建上了,客户端将请求发送到了服务端,在读取服务端请求的返回值时,服务端返回了ERR max number of clients reached
错误。对于Redis服务端而言,对于形成服务端达到“最大链接数限制”的链接,服务端会直接关闭。java
Caused by: redis.clients.jedis.exceptions.JedisDataException: ERR max number of clients reached
at redis.clients.jedis.Protocol.processError(Protocol.java:130)
at redis.clients.jedis.Protocol.process(Protocol.java:164)
at redis.clients.jedis.Protocol.read(Protocol.java:218)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:341)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:277)
at redis.clients.jedis.BinaryJedis.mget(BinaryJedis.java:606)
复制代码
有个疑问: 为何日志中还有写失败的请求呢?不该该是正常创建的那些链接,能够正常写数据吗?由于被“达到最大链接数异常”的链接已经被回收了,不可能再被客户端使用了。难道服务端有清理链接的逻辑?redis
Caused by: java.net.SocketException: Connection reset by peer (Write failed) at java.base/java.net.SocketOutputStream.socketWrite0(Native Method) at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110) at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150) at redis.clients.util.RedisOutputStream.flushBuffer(RedisOutputStream.java:52) at redis.clients.util.RedisOutputStream.flush(RedisOutputStream.java:216) at redis.clients.jedis.Connection.flush(Connection.java:332) ... 30 more 复制代码
/** * Create a new <code>GenericObjectPool</code> using a specific * configuration. * * @param factory The object factory to be used to create object instances * used by this pool * @param config The configuration to use for this pool instance. The * configuration is used by value. Subsequent changes to * the configuration object will not be reflected in the * pool. */
public GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {
// 还记得以前的JMX问题吗?
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
setConfig(config);
// 这里开启驱逐线程
startEvictor(getTimeBetweenEvictionRunsMillis());
}
复制代码
能够看到,驱逐线程是在构造函数中建立开启的。也就是说,每new一个JedisPool都会有一个对应的驱逐线程在周期性执行。 回忆一下,也是在这个构造函数里往JMX进行了注册,并引起了另一个问题: new JedisPool可能会很慢。markdown
/** * <p>Starts the evictor with the given delay. If there is an evictor * running when this method is called, it is stopped and replaced with a * new evictor with the specified delay.</p> * * <p>This method needs to be final, since it is called from a constructor. * See POOL-195.</p> * * @param delay time in milliseconds before start and between eviction runs */
final void startEvictor(long delay) {
synchronized (evictionLock) {
if (null != evictor) {
EvictionTimer.cancel(evictor);
evictor = null;
evictionIterator = null;
}
if (delay > 0) {
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
复制代码
注释写的很清楚,两点:并发
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = -1L;
private volatile long timeBetweenEvictionRunsMillis =
BaseObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
/** * Returns the number of milliseconds to sleep between runs of the idle * object evictor thread. When non-positive, no idle object evictor thread * will be run. * * @return number of milliseconds to sleep between evictor runs * * @see #setTimeBetweenEvictionRunsMillis */
public final long getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}
复制代码
注释写的也很清楚:若是是非正数(包括负数或0),那么就不会有空闲对象的驱逐线程被建立。app
能够看到上面的默认值是-1,也就是不开启驱逐线程。可是JedisPoolConfig却给出了JedisPool的默认值,每30s调度一次驱逐线程:less
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
// defaults to make your life with connection pool easier :)
setTestWhileIdle(true);
setMinEvictableIdleTimeMillis(60000);
setTimeBetweenEvictionRunsMillis(30000);
setNumTestsPerEvictionRun(-1);
}
}
复制代码
上面的注释说:这些默认值会使得你链接池的生命周期更容易。这个life是链接池的仍是coder的life?socket
/** * The idle object evictor {@link TimerTask}. * * @see GenericKeyedObjectPool#setTimeBetweenEvictionRunsMillis */
class Evictor extends TimerTask {
/** * Run pool maintenance. Evict objects qualifying for eviction and then * ensure that the minimum number of idle instances are available. * Since the Timer that invokes Evictors is shared for all Pools but * pools may exist in different class loaders, the Evictor ensures that * any actions taken are under the class loader of the factory * associated with the pool. */
@Override
public void run() {
try {
// 省略关于class loader部分的代码
// Evict from the pool
try {
evict();
} catch(Exception e) {
swallowException(e);
} catch(OutOfMemoryError oome) {
// Log problem but give evictor thread a chance to continue
// in case error is recoverable
oome.printStackTrace(System.err);
}
// Re-create idle instances.
try {
ensureMinIdle();
} catch (Exception e) {
swallowException(e);
}
} finally {
// Restore the previous CCL
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
}
}
复制代码
注释写的也很清楚:ide
/** * <p>Perform <code>numTests</code> idle object eviction tests, evicting * examined objects that meet the criteria for eviction. If * <code>testWhileIdle</code> is true, examined objects are validated * when visited (and removed if invalid); otherwise only objects that * have been idle for more than <code>minEvicableIdleTimeMillis</code> * are removed.</p> * * @throws Exception when there is a problem evicting idle objects. */
@Override
public void evict() throws Exception {
PooledObject<T> underTest = null;
EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleTimeMillis(),
getSoftMinEvictableIdleTimeMillis(),
getMinIdle());
boolean testWhileIdle = getTestWhileIdle();
for (int i = 0, m = getNumTests(); i < m; i++) {
// 从idleObjects中获取要检测的对象
// 是否能够驱逐该对象
boolean evict;
try {
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (Throwable t) {
//
}
if (evict) {
// 驱逐,销毁该对象
destroy(underTest);
} else {
// 该对象还不知足驱逐条件
// 若是须要探测,则进行探测逻辑
if (testWhileIdle) {
boolean active = false;
try {
factory.activateObject(underTest);
active = true;
} catch (Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
if (active) {
// 此处进行ping探测:失败则销毁,成功则什么都不作
if (!factory.validateObject(underTest)) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
// 什么都不作
}
}
}
}
}
}
复制代码
注释内容:函数
numTestspost
/** * Returns the maximum number of objects to examine during each run (if any) * of the idle object evictor thread. When positive, the number of tests * performed for a run will be the minimum of the configured value and the * number of idle instances in the pool. When negative, the number of tests * performed will be <code>ceil({@link #getNumIdle}/ * abs({@link #getNumTestsPerEvictionRun}))</code> which means that when the * value is <code>-n</code> roughly one nth of the idle objects will be * tested per run. */
private int getNumTests() {
int numTestsPerEvictionRun = getNumTestsPerEvictionRun();
if (numTestsPerEvictionRun >= 0) {
return Math.min(numTestsPerEvictionRun, idleObjects.size());
} else {
return (int) (Math.ceil(idleObjects.size() /
Math.abs((double) numTestsPerEvictionRun)));
}
}
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
// defaults to make your life with connection pool easier :)
setTestWhileIdle(true);
setMinEvictableIdleTimeMillis(60000);
setTimeBetweenEvictionRunsMillis(30000);
// 这里给的是-1
setNumTestsPerEvictionRun(-1);
}
}
复制代码
能够看到:
其实,在JedisPool中默认就是检测全部的空闲对象
驱逐策略
/** * Provides the default implementation of {@link EvictionPolicy} used by the * pools. Objects will be evicted if the following conditions are met: * <ul> * <li>the object has been idle longer than * {@link GenericObjectPool#getMinEvictableIdleTimeMillis()} / * {@link GenericKeyedObjectPool#getMinEvictableIdleTimeMillis()}</li> * <li>there are more than {@link GenericObjectPool#getMinIdle()} / * {@link GenericKeyedObjectPoolConfig#getMinIdlePerKey()} idle objects in * the pool and the object has been idle for longer than * {@link GenericObjectPool#getSoftMinEvictableIdleTimeMillis()} / * {@link GenericKeyedObjectPool#getSoftMinEvictableIdleTimeMillis()} * </ul> * This class is immutable and thread-safe. * */
public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> {
@Override
public boolean evict(EvictionConfig config, PooledObject<T> underTest, int idleCount) {
if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
config.getMinIdle() < idleCount) ||
config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
return true;
}
return false;
}
}
复制代码
注释内容,知足下面两个条件之一就驱逐:
JedisPool默认值:
// 空闲60s就驱逐
setMinEvictableIdleTimeMillis(60000);
// soft空闲时间是-1。也就是当前池子里只要空闲对象数超过了minIdle就能够驱逐
private volatile long softMinEvictableIdleTimeMillis =
BaseObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
public static final long DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS = -1;
复制代码
idleTime
这个时间是怎么计算的?
@Override
public long getIdleTimeMillis() {
final long elapsed = System.currentTimeMillis() - lastReturnTime;
// elapsed may be negative if:
// - another thread updates lastReturnTime during the calculation window
// - System.currentTimeMillis() is not monotonic (e.g. system time is set back)
return elapsed >= 0 ? elapsed : 0;
}
复制代码
当前时间距离上次该对象归还给线程池的时间,就是空闲时间。
那若是在归还以后,驱逐以前,这个对象又被borrow了怎么办?lastReturnTime会更新吗? 这种状况是不会发生的。 驱逐会从idleObjects(LinkedBlockingDeque)中获取对象,而borrow时会将该对象从idleObjects中移除,好像是有并发的风险。
destroy
private void destroy(PooledObject<T> toDestory) throws Exception {
toDestory.invalidate();
idleObjects.remove(toDestory);
allObjects.remove(new IdentityWrapper<T>(toDestory.getObject()));
try {
factory.destroyObject(toDestory);
} finally {
destroyedCount.incrementAndGet();
createCount.decrementAndGet();
}
}
复制代码
探活逻辑
若是不知足驱逐条件,也会对该对象进行探活检测:发ping命令。
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
复制代码
虽然被驱逐了,可是还要保证池子里有足够的minIdle对象。
/** * Tries to ensure that {@code idleCount} idle instances exist in the pool. * <p> * Creates and adds idle instances until either {@link #getNumIdle()} reaches {@code idleCount} * or the total number of objects (idle, checked out, or being created) reaches * {@link #getMaxTotal()}. If {@code always} is false, no instances are created unless * there are threads waiting to check out instances from the pool. * * @param idleCount the number of idle instances desired * @param always true means create instances even if the pool has no threads waiting * @throws Exception if the factory's makeObject throws */
private void ensureIdle(int idleCount, boolean always) throws Exception {
if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
return;
}
while (idleObjects.size() < idleCount) {
PooledObject<T> p = create();
if (p == null) {
// Can't create objects, no reason to think another call to
// create will work. Give up.
break;
}
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
复制代码
注释内容,不建立线程的条件,要么是池子里的idle对象数达到最小空闲数,要么池子里的对象数超过了最大对象数。
也就是说,即便池子里idle数量不够,可是已经超过了池子中最大对象数,也不会建立新的对象。
为何既有minIdle又有maxIdle?
上面全部的阐述都是围绕minIdle来说的。池中必需要有minIdle个空闲对象备用。
maxIdle用在哪里?
在returnObject
时会被用到:若是归还时,发现池子中已经有足够的空闲对象,那么直接销毁吧。
/** * <p> * If {@link #getMaxIdle() maxIdle} is set to a positive value and the * number of idle instances has reached this value, the returning instance * is destroyed. * <p> * If {@link #getTestOnReturn() testOnReturn} == true, the returning * instance is validated before being returned to the idle instance pool. In * this case, if validation fails, the instance is destroyed. * <p> */
@Override
public void returnObject(T obj) {
// 1. 更新对象状态为RETURNING
synchronized(p) {
final PooledObjectState state = p.getState();
if (state != PooledObjectState.ALLOCATED) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
} else {
p.markReturning(); // Keep from being marked abandoned
}
}
long activeTime = p.getActiveTimeMillis();
// 2. 归还时探测该对象的活性
if (getTestOnReturn()) {
if (!factory.validateObject(p)) {
try {
// ping失败,则销毁
destroy(p);
} catch (Exception e) {
swallowException(e);
}
try {
// 销毁后,还要保证minIdle
ensureIdle(1, false);
} catch (Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
}
// 3. 更新对象状态为IDLE;更新归还时间
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
// 4. 是否到达最大空闲数
int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
// 直接销毁
destroy(p);
} catch (Exception e) {
swallowException(e);
}
} else {
// 放入空闲列表,正常归还给池子
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
复制代码
作了两件大事:
public class PoolLeak {
public static void main(String[] args) {
GenericObjectPoolConfig config = new JedisPoolConfig();
config.setMinIdle(1);
for (int i = 0; i < 5; i++) {
JedisPool jedisPool = new JedisPool(config, "localhost");
try {
jedisPool.getResource().close();
} catch (Exception e) {
e.printStackTrace();
// jedisPool.destroy();
}
}
System.out.println("over...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
在本地Redis server关闭的状况下,执行上面的代码。
能够看到,直接被RST了。
开启redis server。
ensureMinIdle
,Redis server未启动,链接失败ensureMinIdle
时,建立链接成功能够看到上面,客户端发送了quit命令。
其实destroy链接池就行了。也就是把上面代码中被注释的代码:// jedisPool.destroy();
。
/** * Closes the pool. Once the pool is closed, {@link #borrowObject()} will * fail with IllegalStateException, but {@link #returnObject(Object)} and * {@link #invalidateObject(Object)} will continue to work, with returned * objects destroyed on return. * <p> * Destroys idle instances in the pool by invoking {@link #clear()}. */
@Override
public void close() {
if (isClosed()) {
return;
}
synchronized (closeLock) {
if (isClosed()) {
return;
}
// Stop the evictor before the pool is closed since evict() calls
// assertOpen()
// 关闭驱逐调度任务
startEvictor(-1L);
closed = true;
// This clear removes any idle objects
// 移除全部空闲任务
clear();
// 注销JMX
jmxUnregister();
// Release any threads that were waiting for an object
idleObjects.interuptTakeWaiters();
}
}
复制代码
borrowObject
就会失败returnObject
、invalidateObject
还能工做,这样被归还的对象立马被销毁。