持续更新:https://github.com/dchack/Mybatis-source-code-learnjava
有这么个定律,有链接的地方就有池。
在市面上,能够适配Mybatis DateSource的链接池有很对,好比:git
Mybatis也自带来链接池的功能,先学习下Mybatis的,相对简单的实现。
涉及的类:
github
public class PoolState { protected PooledDataSource dataSource; // 空闲链接集合 protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>(); // 正在使用的链接集合 protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>(); // 请求次数,每次获取链接,都会自增,用于 protected long requestCount = 0; // 累计请求耗时,每次获取链接时计算累加,除以requestCount能够得到平均耗时 protected long accumulatedRequestTime = 0; // 累计链接使用时间 protected long accumulatedCheckoutTime = 0; // 过时链接次数 protected long claimedOverdueConnectionCount = 0; protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 累计等待获取链接时间 protected long accumulatedWaitTime = 0; // 等待获取链接的次数 protected long hadToWaitCount = 0; // 链接已关闭的次数 protected long badConnectionCount = 0; public PoolState(PooledDataSource dataSource) { this.dataSource = dataSource; } public synchronized long getRequestCount() { return requestCount; } public synchronized long getAverageRequestTime() { return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount; } public synchronized long getAverageWaitTime() { return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount; } public synchronized long getHadToWaitCount() { return hadToWaitCount; } public synchronized long getBadConnectionCount() { return badConnectionCount; } public synchronized long getClaimedOverdueConnectionCount() { return claimedOverdueConnectionCount; } public synchronized long getAverageOverdueCheckoutTime() { return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount; } public synchronized long getAverageCheckoutTime() { return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount; } public synchronized int getIdleConnectionCount() { return idleConnections.size(); } public synchronized int getActiveConnectionCount() { return activeConnections.size(); } }
注意代码中的字段都是用protected修饰的,表示pooled包内均可访问,在写这份代码的时候必然默认这个包下实现一个独立的功能,内部字段均可以共享使用,不然都写set,get方法太麻烦了。
在PoolState
类中,不少指标好比requestCount
,claimedOverdueConnectionCount
等都不和链接池核心逻辑相关,纯粹只是表示链接池的一些指标而已。
做为链接池,在这里最重要的就是两个List:sql
synchronized
关键字来处理并发场景的。组成池的两个List中存储的是PooledConnection
,而PooledConnection
经过java动态代理机制实现代理真正Connection。
PooledConnection
继承InvocationHandler
,因此实现了invoke
方法:数据库
/* * Required for InvocationHandler implementation. * * @param proxy - not used * @param method - the method to be executed * @param args - the parameters to be passed to the method * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[]) */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) { dataSource.pushConnection(this); return null; } else { try { if (!Object.class.equals(method.getDeclaringClass())) { // issue #579 toString() should never fail // throw an SQLException instead of a Runtime checkConnection(); } return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } } } private void checkConnection() throws SQLException { if (!valid) { throw new SQLException("Error accessing PooledConnection. Connection is invalid."); } }
主要看到这个代理实现处理了close
方法,就是将链接从使用列表中弹出。
对于其余方法,会判断方法是否属于Object中的方法,若是不是则进行链接合法的校验,而后执行真正Connection
即realConnection
中对应的方法。
得到一个代理类的代码,即调用Proxy.newProxyInstance
方法,在PooledConnection
中的构造函数中:缓存
/* * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in * * @param connection - the connection that is to be presented as a pooled connection * @param dataSource - the dataSource that the connection is from */ public PooledConnection(Connection connection, PooledDataSource dataSource) { this.hashCode = connection.hashCode(); this.realConnection = connection; this.dataSource = dataSource; this.createdTimestamp = System.currentTimeMillis(); this.lastUsedTimestamp = System.currentTimeMillis(); this.valid = true; this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this); }
咱们能够看到realConnection
是在构造函数时就传入的了。mybatis
而配置这个池的参数都是在PooledDataSource
中:并发
官方文档:
poolMaximumActiveConnections – 在任意时间能够存在的活动(也就是正在使用)链接数量,默认值:10
poolMaximumIdleConnections – 任意时间可能存在的空闲链接数。
poolMaximumCheckoutTime – 在被强制返回以前,池中链接被检出(checked out)时间,默认值:20000 毫秒(即 20 秒)
poolTimeToWait – 这是一个底层设置,若是获取链接花费了至关长的时间,链接池会打印状态日志并从新尝试获取一个链接(避免在误配置的状况下一直安静的失败),默认值:20000 毫秒(即 20 秒)。
poolMaximumLocalBadConnectionTolerance – 这是一个关于坏链接容忍度的底层设置, 做用于每个尝试从缓存池获取链接的线程。 若是这个线程获取到的是一个坏的链接,那么这个数据源容许这个线程尝试从新获取一个新的链接,可是这个从新尝试的次数不该该超过 poolMaximumIdleConnections 与 poolMaximumLocalBadConnectionTolerance 之和。 默认值:3 (新增于 3.4.5)
poolPingQuery – 发送到数据库的侦测查询,用来检验链接是否正常工做并准备接受请求。默认是“NO PING QUERY SET”,这会致使多数数据库驱动失败时带有一个恰当的错误消息。
poolPingEnabled – 是否启用侦测查询。若开启,须要设置 poolPingQuery 属性为一个可执行的 SQL 语句(最好是一个速度很是快的 SQL 语句),默认值:false。
poolPingConnectionsNotUsedFor – 配置 poolPingQuery 的频率。能够被设置为和数据库链接超时时间同样,来避免没必要要的侦测,默认值:0(即全部链接每一时刻都被侦测 — 固然仅当 poolPingEnabled 为 true 时适用)。app
PooledDataSource
完成池功能的类,直接看拿链接的popConnection
方法:ide
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; // 触发获取链接的当前时间 long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { // 同步 synchronized (state) { // 判断空闲列表中是否能够提供链接 if (!state.idleConnections.isEmpty()) { // Pool has available connection conn = state.idleConnections.remove(0); if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else { // Pool does not have available connection // 判断是否达到最大链接数限制 if (state.activeConnections.size() < poolMaximumActiveConnections) { // Can create new connection conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // Cannot create new connection PooledConnection oldestActiveConnection = state.activeConnections.get(0); long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 判断最老一个链接使用时间是否超过最大值 if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { /* Just log a message for debug and continue to execute the following statement like nothing happend. Wrap the bad connection with a new PooledConnection, this will help to not intterupt current executing thread and give current thread a chance to join the next competion for another valid/good database connection. At the end of this loop, bad {@link @conn} will be set as null. */ log.debug("Bad connection. Could not roll back"); } } // 这里看到将包装在oldestActiveConnection中的RealConnection从新用PooledConnection包装出来直接使用,看前面操做是将链接进行回滚,可是可能失败,却不关心,注释解释是,在后面的代码中会进行isValid的判断,其中就会判断链接是否可用。 conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); // 将老链接设置成invalid oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); // 线程等待,也释放了锁 state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } state.badConnectionCount++; localBadConnectionCount++; // 不可用的链接会被设置成null,被回收器回收 conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } } if (conn == null) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } return conn; }
popConnection
方法实如今一个池中获取链接的基本逻辑,依赖最大链接数,获取等待时间,链接使用超时时间等来完成一个池的核心能力。
注意这里使用wait
方法来等待,在java线程池中使用阻塞队列来出来暂时拿不到资源的请求。
前面提到,在使用Connection
时,调用close
方法,会调用到dataSource.pushConnection(this);
,就是将这个链接使用完了还回池的动做:
protected void pushConnection(PooledConnection conn) throws SQLException { // 同样加锁 synchronized (state) { // 从使用线程列表中删除 state.activeConnections.remove(conn); if (conn.isValid()) { // 判断空闲链接列表是否超过最大值 if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); // 加入到空闲链接列表中 state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } // 通知等待线程 state.notifyAll(); } else { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; } } }
归还链接时,须要查看空闲列表中的线程数量是否已经到到设置的最大值,若是已经达到,就不须要归还了,凡是须要加入空闲列表的都须要进行notifyAll
操做,来通知那些等待的线程来抢这个归还的链接,可是若是此时链接池中空闲链接充足,并无线程等待,这个操做也就浪费了,因此能够思考前面popConnection
中的wait
和这里的notifyAll
是能够用等待队列来完成。
另一个方法,用于判断链接是否可用:
protected boolean pingConnection(PooledConnection conn) { boolean result = true; try { // 先用isClosed来获取结果 result = !conn.getRealConnection().isClosed(); } catch (SQLException e) { if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } result = false; } if (result) { // 能够经过poolPingEnabled配置来决定是否使用自定义sql if (poolPingEnabled) { if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) { try { if (log.isDebugEnabled()) { log.debug("Testing connection " + conn.getRealHashCode() + " ..."); } Connection realConn = conn.getRealConnection(); Statement statement = realConn.createStatement(); // 执行poolPingQuery ResultSet rs = statement.executeQuery(poolPingQuery); rs.close(); statement.close(); if (!realConn.getAutoCommit()) { realConn.rollback(); } result = true; if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is GOOD!"); } } catch (Exception e) { log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage()); try { conn.getRealConnection().close(); } catch (Exception e2) { //ignore } result = false; if (log.isDebugEnabled()) { log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage()); } } } } } return result; }
从代码中能够看到isClosed
方法并不可靠,最终仍是经过执行sql来判断链接是否可用,这个在不少涉及判断数据库链接是否有效的地方都是这么作的,详细能够看一下isClosed
方法的注释。
继承UnpooledDataSourceFactory,直接返回PooledDataSource对象
public class PooledDataSourceFactory extends UnpooledDataSourceFactory { public PooledDataSourceFactory() { this.dataSource = new PooledDataSource(); } }
在整个线程池的实现代码中,能够学习到一个池的实现的要素有哪些,以及录用基础代码如何实现一个池。对于那些封装成高层次的池的代码来讲,这个实现显得又些单薄和不够全面,但是不管链接池如何实现核心池的实现逻辑是不会变的。