mybatis datasource的工厂方法模式(深度好文)

工厂方法模式是使用抽象工厂(抽象类或接口)来生产抽象产品(抽象类或接口)的一个过程,由抽象工厂来决定抽象产品的生产过程,实际生产中由具体的工厂子类或者实现类来完成具体的产品子类或者实现类的生产。具体例子请参考 设计模式整理 java

在mybatis的datasource中,它的抽象工厂为sql

package org.apache.ibatis.datasource;

import java.util.Properties;
import javax.sql.DataSource;

public interface DataSourceFactory {
    //设置DataSource的相关属性,通常紧跟在初始化完成以后
    void setProperties(Properties var1);
    //获取DataSource对象
    DataSource getDataSource();
}

而它的抽象产品则为jdk中的DataSource接口数据库

package javax.sql;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Wrapper;

public interface DataSource  extends CommonDataSource, Wrapper {

  Connection getConnection() throws SQLException;

  Connection getConnection(String username, String password)
    throws SQLException;
}

Mybatis中有两个具体的产品——UnpooledDataSource和PooledDataSource,咱们先来看一下UnpooledDataSource,这是一个知足基本功能的没有链接池的DataSource产品,跟以前的自实现的事务差很少,具体参考 Spring事务说明与自实现 。能够先看到它的属性以及静态代码块,加载DriverManager中注册的JDBC Driver复制一份到registeredDrivers中。另外它有不少的构造器。而抽象产品(DataSource接口)中的两个getConnection方法都是由doGetConnection方法来实现的。apache

    //加载Driver类的类加载器
    private ClassLoader driverClassLoader;
    //数据库链接驱动的相关配置
    private Properties driverProperties;
    //缓存全部已注册的数据库链接驱动
    private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap();
    //数据库链接的驱动名称
    private String driver;
    //数据库URL
    private String url;
    //用户名
    private String username;
    //密码
    private String password;
    //是否自动提交
    private Boolean autoCommit;
    //事务隔离级别
    private Integer defaultTransactionIsolationLevel;
//将DriverManager中注册的JDBC驱动复制一份到registeredDrivers中
static {
    Enumeration drivers = DriverManager.getDrivers();

    while(drivers.hasMoreElements()) {
        Driver driver = (Driver)drivers.nextElement();
        registeredDrivers.put(driver.getClass().getName(), driver);
    }

}
//接口中的方法
public Connection getConnection() throws SQLException {
    return this.doGetConnection(this.username, this.password);
}

public Connection getConnection(String username, String password) throws SQLException {
    return this.doGetConnection(username, password);
}
private Connection doGetConnection(String username, String password) throws SQLException {
    //将各类属性放入props中
    Properties props = new Properties();
    if(this.driverProperties != null) {
        props.putAll(this.driverProperties);
    }

    if(username != null) {
        props.setProperty("user", username);
    }

    if(password != null) {
        props.setProperty("password", password);
    }

    return this.doGetConnection(props);
}

private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化驱动
    this.initializeDriver();
    //获取数据库链接(properties中包含了username以及password)
    Connection connection = DriverManager.getConnection(this.url, properties);
    //设置该链接的事务相关配置
    this.configureConnection(connection);
    return connection;
}

private synchronized void initializeDriver() throws SQLException {
    //若是已注册的数据库链接驱动没有当前的驱动
    if(!registeredDrivers.containsKey(this.driver)) {
        try {
            Class driverType;
            //反射加载当前驱动,并初始化
            if(this.driverClassLoader != null) {
                driverType = Class.forName(this.driver, true, this.driverClassLoader);
            } else {
                driverType = Resources.classForName(this.driver);
            }
            //根据Class实例来获取当前驱动自己的实例
            Driver driverInstance = (Driver)driverType.newInstance();
            //在DriverManager中注册该驱动的代理
            DriverManager.registerDriver(new UnpooledDataSource.DriverProxy(driverInstance));
            //将该驱动添加到registeredDrivers中
            registeredDrivers.put(this.driver, driverInstance);
        } catch (Exception var3) {
            throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + var3);
        }
    }

}

private void configureConnection(Connection conn) throws SQLException {
    //设置是否自动提交事务
    if(this.autoCommit != null && this.autoCommit.booleanValue() != conn.getAutoCommit()) {
        conn.setAutoCommit(this.autoCommit.booleanValue());
    }
    //设置事务的隔离等级
    if(this.defaultTransactionIsolationLevel != null) {
        conn.setTransactionIsolation(this.defaultTransactionIsolationLevel.intValue());
    }

}

在DriverManager中是会去注册Drivers的全部信息的设计模式

// 注册JDBC驱动的列表
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
//注册driver
public static synchronized void registerDriver(java.sql.Driver driver,
        DriverAction da)
    throws SQLException {

    /* Register the driver if it has not already been added to our list */
    if(driver != null) {
        //若是列表中不存在该驱动,则添加至末尾,不然不添加
        registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
    } else {
        // This is for compatibility with the original DriverManager
        throw new NullPointerException();
    }

    println("registerDriver: " + driver);

}

如今咱们来看一下与UnpooledDataSource对应的具体工厂实现类UnpooledDataSourceFactory。这种工厂,它只生产UnpooledDataSource这一种产品。缓存

首先咱们来看一下抽象工厂(DataSourceFactory接口)的setProperties方法安全

private static final String DRIVER_PROPERTY_PREFIX = "driver.";
private static final int DRIVER_PROPERTY_PREFIX_LENGTH = "driver.".length();
protected DataSource dataSource = new UnpooledDataSource();
public void setProperties(Properties properties) {
    //Properties是继承于HashTable的子类,不接受null值,线程安全,单线程下比HashMap慢,它的迭代器是枚举型迭代器 enumerator 
    Properties driverProperties = new Properties();
    //MetaObject是mybatis自带的一个反射工具,它的主要做用是经过反射给对象中拥有getter,setter的属性,设置属性值
    MetaObject metaDataSource = SystemMetaObject.forObject(this.dataSource);
    //获取方法参数属性集合的迭代器
    Iterator var4 = properties.keySet().iterator();
    //遍历该属性集合
    while(var4.hasNext()) {
        Object key = var4.next();
        String propertyName = (String)key;
        String value;
        //若是该属性名称以driver.开头
        if(propertyName.startsWith("driver.")) {
            //获取该属性的值
            value = properties.getProperty(propertyName);
            //以"driver."开头的配置项是对DataSource的配置,记录到driverProperties中保存
            driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
        } else {  //若是不以driver.开头
            //若是该dataSource中没有Setter的属性,则抛出异常
            if(!metaDataSource.hasSetter(propertyName)) {
                throw new DataSourceException("Unknown DataSource property: " + propertyName);
            }
            
            value = (String)properties.get(propertyName);
            //根据属性类型进行类型转换
            Object convertedValue = this.convertValue(metaDataSource, propertyName, value);
            //设置该dataSource的属性
            metaDataSource.setValue(propertyName, convertedValue);
        }
    }
    //若是driverProperties有值
    if(driverProperties.size() > 0) {
        //设置dataSource的该属性
        metaDataSource.setValue("driverProperties", driverProperties);
    }

}

抽象类工厂(DataSourceFactory接口)的getDataSource()的方法实现比较简单网络

public DataSource getDataSource() {
    return this.dataSource;
}

如今咱们来看一下它的另外一个产品PooledDataSource,这是一个带数据库链接池的产品。数据库的链接过程很是耗时,数据库可以创建的链接数也很是有限,因此在绝大多数系统中,数据库链接是很是珍贵的资源,使用数据库链接池就显得尤其必要。数据库链接池在初始化时,通常会建立必定数量的数据库链接并添加到链接池中备用。当程序须要使用数据库时,从池中请求链接;当程序再也不使用该链接时,会将其返回到池中缓存,等下下次使用,而不是直接关闭。而咱们以前写的事务自实现中的链接则是每条线程执行完事务则会把链接关闭。mybatis

先看一下Mybatis中链接池类PooledConnection的实现,它是一个实现了动态代理的类app

class PooledConnection implements InvocationHandler

它的核心字段以下

//记录当前PooledConnection对象所在的PooledDataSource对象。该PooledConnection是从该PooledDataSource中获取的;当调用close()方法时会将PooledConnection放回
//该PooledDataSource中
private final PooledDataSource dataSource;
//真正的数据库链接
private final Connection realConnection;
//数据库链接的代理对象
private final Connection proxyConnection;
//从链接池中取出该链接的时间戳
private long checkoutTimestamp;
//该链接建立的时间戳
private long createdTimestamp;
//最后一次被使用的时间戳
private long lastUsedTimestamp;
//由数据库URL,用户名,密码计算出来的hash值,可用于标识该链接所在的链接池
private int connectionTypeCode;
//检测当前PooledConnection是否有效,主要是为了防止程序经过close()方法将链接归还给链接池以后,依然经过该链接操做数据库
private boolean valid;

咱们来看一下它的动态代理方法invoke,它代理的其实是UnpooledDataSource中的getConnection方法返回的Connection自己实例的全部方法,用以对该链接进行加强。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //若是调用的方法为close,则将其从新放入链接池,而不是真正关闭数据库链接
    if("close".hashCode() == methodName.hashCode() && "close".equals(methodName)) {
        this.dataSource.pushConnection(this);
        return null;
    } else {
        try {
            if(!Object.class.equals(method.getDeclaringClass())) {
                this.checkConnection(); //经过valid字段检测链接是否有效
            }
            //调用真正数据库链接对象的对应方法
            return method.invoke(this.realConnection, args);
        } catch (Throwable var6) {
            throw ExceptionUtil.unwrapThrowable(var6);
        }
    }
}
private void checkConnection() throws SQLException {
    if(!this.valid) {
        throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
    }
}

PoolState是用于管理PooledConnection对象状态的组件,它经过两个ArrayList<PooledConnection>集合分别管理空闲状态的链接和活跃状态的链接

//空闲的PooledConnection集合
protected final List<PooledConnection> idleConnections = new ArrayList();
//活跃的PooledConnection集合
protected final List<PooledConnection> activeConnections = new ArrayList();

其余统计字段

//请求数据库链接的次数
protected long requestCount = 0L;
//获取链接的累计时间
protected long accumulatedRequestTime = 0L;
//记录全部链接累计的checkout时长
protected long accumulatedCheckoutTime = 0L;
//当链接长时间未归还给链接池时,会被认为该链接超时,记录超时的链接个数
protected long claimedOverdueConnectionCount = 0L;
//累计超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0L;
//累计等待时间
protected long accumulatedWaitTime = 0L;
//等待次数
protected long hadToWaitCount = 0L;
//无效的链接数
protected long badConnectionCount = 0L;

咱们再来看一下PooledDataSource这个产品,它的主要字段以下

//经过PoolState管理链接池的状态并记录统计信息
private final PoolState state = new PoolState(this);
//真正管理数据库链接的对象,用于生成真实的数据库链接对象
private final UnpooledDataSource dataSource;
//最大活跃链接数
protected int poolMaximumActiveConnections = 10;
//最大空闲链接数
protected int poolMaximumIdleConnections = 5;
//最大checkout时长
protected int poolMaximumCheckoutTime = 20000;
//在没法获取链接时,线程须要等待的时间
protected int poolTimeToWait = 20000;
// 每个尝试从缓存池获取链接的线程. 若是这个线程获取到的是一个坏的链接,那么这个数据源容许这个线程尝试从新获取一个新的链接,可是这个从新尝试的次数不该该超 
//过 poolMaximumIdleConnections 与 poolMaximumLocalBadConnectionTolerance 之和。
protected int poolMaximumLocalBadConnectionTolerance = 3;
//在检测一个数据库是否可用时,会给数据库发送一个测试SQL语句
protected String poolPingQuery = "NO PING QUERY SET";
//是否容许发送测试SQL语句
protected boolean poolPingEnabled;
//当链接超过poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测链接是否正常
protected int poolPingConnectionsNotUsedFor;
//根据数据库的URL、用户名和密码生成的一个hash值,该哈希值用于标志着当前的链接池
private int expectedConnectionTypeCode;

接口中要实现的两个方法

public Connection getConnection() throws SQLException {
    return this.popConnection(this.dataSource.getUsername(), this.dataSource.getPassword()).getProxyConnection();
}

public Connection getConnection(String username, String password) throws SQLException {
    return this.popConnection(username, password).getProxyConnection();
}

咱们能够看到它都是由popConnection来实现的

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
    //若是链接代理为空
    while(conn == null) {
        //获取链接池状态
        PoolState var8 = this.state;
        synchronized(this.state) {  //加锁同步状态
            //检测空闲链接(不为空)
            if(!this.state.idleConnections.isEmpty()) {
                //取出第一个链接(集合中第0个)
                conn = (PooledConnection)this.state.idleConnections.remove(0);
                if(log.isDebugEnabled()) {
                    log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
                }
              //若是没有空闲链接且活跃链接数没有到达最大值
            } else if(this.state.activeConnections.size() < this.poolMaximumActiveConnections) {
                //建立一个线程池链接,该链接其实是UnpooledDataSource的数据库链接的代理对象实例
                conn = new PooledConnection(this.dataSource.getConnection(), this);
                if(log.isDebugEnabled()) {
                    log.debug("Created connection " + conn.getRealHashCode() + ".");
                }
            //若是没有空闲链接且活跃链接数已经最大了,没法建立新链接
            } else {
                //取得最先的活跃链接
                PooledConnection oldestActiveConnection = (PooledConnection)this.state.activeConnections.get(0);
                //获取该链接的过时时间
                long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                //检测该链接是否超时
                if(longestCheckoutTime > (long)this.poolMaximumCheckoutTime) {
                    //若是已超时,如下三条代码都是对超时链接信息进行统计
                    ++this.state.claimedOverdueConnectionCount;
                    this.state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                    this.state.accumulatedCheckoutTime += longestCheckoutTime;
                    //将该超时的链接移除
                    this.state.activeConnections.remove(oldestActiveConnection);
                    //若是该超时链接的事务未提交
                    if(!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                        try {
                            //对该超时链接的事务进行回滚
                            oldestActiveConnection.getRealConnection().rollback();
                        } catch (SQLException var16) {
                            log.debug("Bad connection. Could not roll back");
                        }
                    }
                    //再次创建该超时链接的真实链接的新代理,由于该超时链接自己就是一个代理,须要拿到它的目标对象实例
                    conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                    conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
                    conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
                    //将该超时链接设置为无效
                    oldestActiveConnection.invalidate();
                    if(log.isDebugEnabled()) {
                        log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                    }
                  //无空闲链接,活跃链接数最大没法建立新链接,且无超时链接
                } else {
                    try {
                        if(!countedWait) {
                            //统计等待次数
                            ++this.state.hadToWaitCount;
                            countedWait = true;
                        }

                        if(log.isDebugEnabled()) {
                            log.debug("Waiting as long as " + this.poolTimeToWait + " milliseconds for connection.");
                        }
                        long wt = System.currentTimeMillis();
                        //线程阻塞等待
                        this.state.wait((long)this.poolTimeToWait);
                        //统计累计的等待时间
                        this.state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                    } catch (InterruptedException var17) {
                        break;
                    }
                }
            }
            //若是链接代理不为空
            if(conn != null) {
                //检测链接代理是否有效
                if(conn.isValid()) {
                    //若是有效且事务未提交则回滚事务
                    if(!conn.getRealConnection().getAutoCommit()) {
                        conn.getRealConnection().rollback();
                    }
                    //配置链接代理的相关属性
                    conn.setConnectionTypeCode(this.assembleConnectionTypeCode(this.dataSource.getUrl(), username, password));
                    conn.setCheckoutTimestamp(System.currentTimeMillis());
                    conn.setLastUsedTimestamp(System.currentTimeMillis());
                    //将该链接代理添加到活跃链接中
                    this.state.activeConnections.add(conn);
                    //进行相关统计
                    ++this.state.requestCount;
                    this.state.accumulatedRequestTime += System.currentTimeMillis() - t;
                //若是无效
                } else {
                    if(log.isDebugEnabled()) {
                        log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                    }
                    //进行一些错误链接的相关统计
                    ++this.state.badConnectionCount;
                    ++localBadConnectionCount;
                    conn = null;
                    if(localBadConnectionCount > this.poolMaximumIdleConnections + this.poolMaximumLocalBadConnectionTolerance) {
                        if(log.isDebugEnabled()) {
                            log.debug("PooledDataSource: Could not get a good connection to the database.");
                        }

                        throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                    }
                }
            }
        }
    }

    if(conn == null) {
        if(log.isDebugEnabled()) {
            log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }

        throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    } else {
        return conn;
    }
}

由以上代码可知,带线程池的DataSource不过是对链接进行了一次动态代理,加强链接方法,并将代理链接放入集合中处理。

图片来源网络

如今咱们来看一下close()后将该链接代理从新放入链接池的相关代码pushConnection(如下没有特别说明,链接均指链接代理)

protected void pushConnection(PooledConnection conn) throws SQLException {
    //获取链接池状态
    PoolState var2 = this.state;
    synchronized(this.state) { //锁同步该状态
        //从活动链接中移除链接
        this.state.activeConnections.remove(conn);
        //判断链接是否有效
        if(conn.isValid()) {
            //若是有效,检测空闲链接池是否已达到上限以及该链接是不是链接池中的链接
            if(this.state.idleConnections.size() < this.poolMaximumIdleConnections && conn.getConnectionTypeCode() == this.expectedConnectionTypeCode) {
                //累计checkout时长
                this.state.accumulatedCheckoutTime += conn.getCheckoutTime();
                //回滚未提交的事务
                if(!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                //为返还链接建立新的链接对象
                PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
                //空闲链接加入该新对象
                this.state.idleConnections.add(newConn);
                newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
                newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
                //将原链接对象设置无效
                conn.invalidate();
                if(log.isDebugEnabled()) {
                    log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
                }
                //唤醒全部阻塞中的线程
                this.state.notifyAll();
              //空闲链接数已达到上限或该链接不属于该链接池
            } else {
                //累计checkout时长
                this.state.accumulatedCheckoutTime += conn.getCheckoutTime();
                //回滚未提交的事务
                if(!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                //关闭真正的数据库链接
                conn.getRealConnection().close();
                if(log.isDebugEnabled()) {
                    log.debug("Closed connection " + conn.getRealHashCode() + ".");
                }
                //将该链接设置为无效
                conn.invalidate();
            }
        } else {
            if(log.isDebugEnabled()) {
                log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
            }
            //统计无效链接对象个数
            ++this.state.badConnectionCount;
        }

    }
}

这其中都有对链接池(链接代理)的有效监测conn.isValid(),咱们来看一下它的实现

public boolean isValid() {
    return this.valid && this.realConnection != null && this.dataSource.pingConnection(this);
}

其中this.dataSource.pingConnection(this)就是进行一个心跳检测

protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;

    try {
        //检测真正的数据库链接是否已经关闭
        result = !conn.getRealConnection().isClosed();
    } catch (SQLException var8) {
        if(log.isDebugEnabled()) {
            log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + var8.getMessage());
        }

        result = false;
    }
    //检测是否运行测试SQL语句且长时间未使用的链接才须要ping操做来检测数据库链接是否正常
    if(result && this.poolPingEnabled && this.poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > (long)this.poolPingConnectionsNotUsedFor) {
        try {
            if(log.isDebugEnabled()) {
                log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            //执行心跳检测SQL语句
            Connection realConn = conn.getRealConnection();
            Statement statement = realConn.createStatement();
            ResultSet rs = statement.executeQuery(this.poolPingQuery);
            rs.close();
            statement.close();
            if(!realConn.getAutoCommit()) {
                realConn.rollback();
            }

            result = true;
            if(log.isDebugEnabled()) {
                log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
        } catch (Exception var7) {
            log.warn("Execution of ping query '" + this.poolPingQuery + "' failed: " + var7.getMessage());

            try {
                conn.getRealConnection().close();
            } catch (Exception var6) {
                ;
            }

            result = false;
            if(log.isDebugEnabled()) {
                log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + var7.getMessage());
            }
        }
    }

    return result;
}

如今咱们来看一下该dataSource对应的工厂PooledDataSourceFactory,它是UnpooledDataSourceFactory的子类,只是将UnpooledDataSourceFactory的dataSource属性由UnpooledDataSource改为了带线程池的PooledDataSource。

public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
    public PooledDataSourceFactory() {
        this.dataSource = new PooledDataSource();
}
}
相关文章
相关标签/搜索