上一篇文章重点介绍了一下Java的Future模式,最后意淫了一个数据库链接池的场景。本想经过Future模式来防止,当多个线程同时获取数据库链接时各自都生成一个,形成资源浪费。可是忽略了一个根本的功能,就是多个线程同时调用get方法时,获得的是同一个数据库链接的多个引用,这会致使严重的问题。数据库
因此,我抽空看了看呼声很高的Druid的数据库链接池实现,固然关注点主要是多线程方面的处理。我以为,带着问题去看源码是一种很好的思考方式。数组
Druid不只仅是一个数据库链接池,还有不少标签,好比统计监控、过滤器、SQL解析等。既然要分析链接池,那先看看DruidDataSource类缓存
getConnection方法的实现:数据结构
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
返回的是一个DruidPooledConnection,这个类后面再说;另外这里传入了一个long类型maxWait,应该是用来作超时处理的;init方法在getConnection方法里面调用,这也是一种很好的设计;里面的过滤器链的处理就很少说了。多线程
public void init() throws SQLException { if (inited) { return; } final ReentrantLock lock = this.lock; // 使用lock而不是synchronized try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException("interrupt", e); } boolean init = false; try { if (inited) { return; } init = true; connections = new DruidConnectionHolder[maxActive]; // 数组 try { // init connections for (int i = 0, size = getInitialSize(); i < size; ++i) { Connection conn = createPhysicalConnection(); // 生成真正的数据库链接 DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); connections[poolingCount] = holder; incrementPoolingCount(); } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); connectError = ex; } createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await(); initedTime = new Date(); registerMbean(); if (connectError != null && poolingCount == 0) { throw connectError; } } catch (SQLException e) { LOG.error("dataSource init error", e); throw e; } catch (InterruptedException e) { throw new SQLException(e.getMessage(), e); } finally { inited = true; lock.unlock(); // 释放锁 if (init && LOG.isInfoEnabled()) { LOG.info("{dataSource-" + this.getID() + "} inited"); } } }
我这里作了删减,加了一些简单的注释。经过这个方法,正好复习一下以前写的那些知识点,若是感兴趣,能够看看我以前写的文章。ide
这里使用了lock,而且保证只会被执行一次。根据初始容量,先生成了一批数据库链接,用一个数组connections存放这些链接的引用,并且专门定义了一个变量poolingCount来保存这些链接的总数量。ui
看到initedLatch.await有一种似曾相识的感受this
private final CountDownLatch initedLatch = new CountDownLatch(2);
这里调用了await方法,那countDown方法在哪些线程里面被调用呢url
protected void createAndStartCreatorThread() { if (createScheduler == null) { String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); createConnectionThread = new CreateConnectionThread(threadName); createConnectionThread.start(); return; } initedLatch.countDown(); }
这里先判断createScheduler这个调度线程池是否被设置,若是没有设置,直接countDown;不然,就开启一个建立数据库链接的线程,固然这个线程的run方法仍是会调用countDown方法。可是这里我有一个疑问:开启建立链接的线程,为何必定要有一个调度线程池呢???spa
难道是当数据库链接建立失败的时候,须要过了指定时间后,再重试?这么理解好像有点牵强,但愿高人来评论。
还有就是,当开启destroy线程的时候也会调用countDown方法。
接着在看getConnection方法,一直调用到getConnectionInternal方法
DruidConnectionHolder holder; try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCount.incrementAndGet(); throw new SQLException("interrupt", e); } try { if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } } catch (InterruptedException e) { connectErrorCount.incrementAndGet(); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCount.incrementAndGet(); throw e; } finally { lock.unlock(); } holder.incrementUseCount(); DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection;
我这里仍是作了删减。大致逻辑是:先从链接池中取出DruidConnectionHolder,而后再封装成DruidPooledConnection对象返回。再看看取holder的方法:
DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } notEmptyWaitCount++; if (!enable) { connectErrorCount.incrementAndGet(); throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; }
这个方法很是好的诠释了Lock-Condition的使用场景,几行绿色的注释解释的很明白了,若是对empty和notEmpty看不太懂,能够去看看我以前写的那篇文章。
这个方法的逻辑:先判断池中的链接数,若是到0了,那么本线程就得被挂起,同时释放empty信号,而且等待notEmpty的信号。若是还有链接,就取出数组的最后一个,同时更改poolingCount。
到这里,基本理解了Druid数据库链接池获取链接的实现流程。可是,若是不去看看里面的数据结构,仍是会一头雾水。咱们就看看几个基本的类,以及它们之间的持有关系。
一、DruidDataSource持有一个DruidConnectionHolder的数组,保存全部的数据库链接
private volatile DruidConnectionHolder[] connections; // 注意这里的volatile
二、DruidConnectionHolder持有数据库链接,还有所在的DataSource等
private final DruidAbstractDataSource dataSource; private final Connection conn;
三、DruidPooledConnection持有DruidConnectionHolder,所在线程等
protected volatile DruidConnectionHolder holder; private final Thread ownerThread;
对于这种设计,我很好奇为何要添加一层holder作封装,数组里直接存放Connection好像也何尝不可。
其实,这么设计是有道理的。好比说,一个Connection对象能够产生多个Statement对象,当咱们想同时保存Connection和对应的多个Statement的时候,就比较纠结。
再看看DruidConnectionHolder的成员变量
private PreparedStatementPool statementPool; private final List<Statement> statementTrace = new ArrayList<Statement>(2);
这样的话,既能够作缓存,也能够作统计。
最终咱们对Connection的操做都是经过DruidPooledConnection来实现,好比commit、rollback等,它们大都是经过实际的数据库链接完成工做。而我比较关心的是close方法的实现,close方法最核心的逻辑是recycle方法:
public void recycle() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } if (!this.abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); dataSource.recycle(this); } this.holder = null; conn = null; transactionInfo = null; closed = true; }
经过最后几行代码,可以看出,并无调用实际数据库链接的close方法,而只是断开了以前那张图里面的4号引用。用这种方式,来实现数据库链接的复用。