不管是线程池仍是db链接池,他们都有一个共同的特性:资源复用,在普通的场景中,咱们使用一个链接,它的生命周期多是这样的: java
顾名思义,链接池中的池字已经很生动形象的阐明了它的用意,它用将全部链接放入一个"池子"
中统一的去控制链接的建立和销毁,和原始生命周期去对比,链接池多了如下特性:mysql
所以,当使用链接池后,咱们使用一个链接的生命周期将会演变成这样: git
通灵之术 - 传送门:github.com/ainilili/ho…,DEMO为Java语言实现!github
事前,咱们须要点支烟分析一下时间一个链接池须要作哪些事情:sql
close
以及isClosed
方法。链接池不只仅只是对Connection
生命周期的控制,还应该加入一些特点,例如初始链接数,最大链接数,最小链接数、最大空闲时长以及获取链接的等待时长,这些咱们也简单支持一下。shell
目标以明确,开始动工。安全
要保证线程安全,咱们能够将目标瞄准在JUC
包下的神通们,设咱们想要的容器为x
,那么x
不只须要知足基本的增删改查功能,并且也要提供获取超时功能,这是为了保证当池内长时间没有空闲链接时不会致使业务阻塞,即刻熔断。另外,x
须要知足双向操做,这是为了链接池能够识别出饱和的空闲链接,方便回收操做。app
综上所述,LinkedBlockingDeque
是最合适的选择,它使用InterruptibleReentrantLock
来保证线程安全,使用Condition
来作获取元素的阻塞,另外支持双向操做。ide
另外,咱们能够将链接池拆分为3个类型:单元测试
其中,工做池和回收池大可没必要用双向对列,或许用单向队列或者Set
均可以代替之:
private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;
复制代码
链接池的输出是Connection
,它表明着一个db链接,上游服务使用它作完操做后,会直接调用它的close
方法来释放链接,而咱们必须作的是在调用者无感知的状况下改变它的关闭逻辑,当调用close
的方法时,咱们将它放回空闲队列中,保证其的可复用性!
所以,咱们须要对原来的Connection
作装饰,其作法很简单,可是很累,这里新建一个类来实现Connection
接口,经过重写全部的方法来实现一个**"可编辑"**的Connection
,咱们称之为Connection
的装饰者:
public class HoneycombConnectionDecorator implements Connection{
protected Connection connection;
protected HoneycombConnectionDecorator(Connection connection) {
this.connection = connection;
}
此处省略对方法实现的三百行代码...
}
复制代码
以后,咱们须要新建一个本身的Connection
来继承这个装饰者,并重写相应的方法:
public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
@Override
public void close() { do some things }
@Override
public boolean isClosed() throws SQLException { do some things }
省略...
}
复制代码
DataSource
是JDK为了更好的统合和管理数据源而定义出的一个规范,获取链接的入口,方便咱们在这一层更好的扩展数据源(例如增长特殊属性),使咱们的链接池的功能更加丰富,咱们须要实现一个本身的DataSource
能:
public class HoneycombWrapperDatasource implements DataSource{
protected HoneycombDatasourceConfig config;
省略其它方法的实现...
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return DriverManager.getConnection(config.getUrl(), username, password);
}
省略其它方法的实现...
}
复制代码
咱们完成了对数据源的实现,可是这里获取链接的方式是物理建立,咱们须要知足池化的目的,须要重写HoneycombWrapperDatasource
中的链接获取逻辑,作法是建立一个新的类对父类方法重写:
public class HoneycombDataSource extends HoneycombWrapperDatasource{
private HoneycombConnectionPool pool;
@Override
public Connection getConnection() throws SQLException {
这里实现从pool中取出链接的逻辑
}
省略...
}
复制代码
在当前结构体系下,咱们的链接池逐渐浮现出了雏形,但远远不够的是,咱们须要在此结构下能够作自由的扩展,使链接池对链接的控制更加灵活,所以咱们能够引入特性这个概念,它容许咱们在其内部访问链接池,并对链接池作一系列的扩展操做:
public abstract class AbstractFeature{
public abstract void doing(HoneycombConnectionPool pool);
}
复制代码
AbstractFeature
抽象父类须要实现doing
方法,咱们能够在方法内部实现对链接池的控制,其中一个典型的例子就是对池中空闲链接左回收:
public class CleanerFeature extends AbstractFeature{
@Override
public void doing(HoneycombConnectionPool pool) {
这里作空闲链接的回收
}
}
复制代码
通过上述分析,要完成一个链接池,须要这些模块的配合,整体流程以下:
在初始化DataSource
以前,咱们须要将各属性设置进去,这里使用HoneycombWrapperDatasource
中的HoneycombDatasourceConfig
来承载各属性:
public class HoneycombDatasourceConfig {
//db url
private String url;
//db user
private String user;
//db password
private String password;
//driver驱动
private String driver;
//初始化链接数,默认为2
private int initialPoolSize = 2;
//最大链接数,默认为10
private int maxPoolSize = 10;
//最小链接数,默认为2
private int minPoolSize = 2;
//获取链接时,最大等待时长,默认为60s
private long maxWaitTime = 60 * 1000;
//最大空闲时长,超出要被回收,默认为20s
private long maxIdleTime = 20 * 1000;
//特性列表
private List<AbstractFeature> features;
public HoneycombDatasourceConfig() {
features = new ArrayList<AbstractFeature>(5);
}
省略getter、setter....
复制代码
设置好属性以后,咱们须要完成链接池的初始化工做,在HoneycombDataSource
的init
方法中实现:
private void init() throws ClassNotFoundException, SQLException {
//阻塞其余线程初始化操做,等待初始化完成
if(initialStarted || ! (initialStarted = ! initialStarted)) {
if(! initialFinished) {
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.await();
} catch (InterruptedException e) {
} finally {
INITIAL_LOCK.unlock();
}
}
return;
}
//config参数校验
config.assertSelf();
Class.forName(getDriver());
//实例化线程池
pool = new HoneycombConnectionPool(config);
//初始化最小链接
Integer index = null;
for(int i = 0; i < config.getInitialPoolSize(); i ++) {
if((index = pool.applyIndex()) != null) {
pool.putLeisureConnection(createNativeConnection(pool), index);
}
}
//触发特性
pool.touchFeatures();
//完成初始化并唤醒其余阻塞
initialFinished = true;
try {
INITIAL_LOCK.lock();
INITIAL_CONDITION.signalAll();
}catch(Exception e) {
}finally {
INITIAL_LOCK.unlock();
}
}
复制代码
在init
的方法中,若是initialPoolSize
大于0,会去建立指定数量的物理链接放入链接池中,建立数量要小于最大链接数maxPoolSize
:
public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
return new HoneycombConnection(super.getConnection(), pool);
}
复制代码
完成初始化后,下一步就是获取链接。
咱们以前将链接池分红了三个,它们分别是空闲池、工做池和回收池。
咱们能够经过HoneycombDataSource
的getConnection
方法来获取链接,当咱们须要获取时,首先考虑的是空闲池是否有空闲链接,这样能够避免建立和激活新的链接:
@Override
public Connection getConnection() throws SQLException {
try {
//初始化链接池
init();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
HoneycombConnection cn = null;
Integer index = null;
if(pool.assignable()) {
//空闲池可分配,从空闲池取出
cn = pool.getIdleConnection();
}else if(pool.actionable()) {
//回收池可分配,从回收池取出
cn = pool.getFreezeConnection();
}else if((index = pool.applyIndex()) != null) {
//若是链接数未满,建立新的物理链接
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
if(cn == null) {
//若是没法获取链接,阻塞等待空闲池链接
cn = pool.getIdleConnection();
}
if(cn.isClosedActive()) {
//若是物理链接关闭,则获取新的链接
cn.setConnection(super.getConnection());
}
return cn;
}
复制代码
若是空闲池不可分配,那么说明链接供不该求,也许以前有些空闲链接已经被回收(物理关闭),那么咱们在建立新链接以前,能够到回收池看一下是否存在已回收链接,若是存在直接取出:
else if(pool.actionable()) {
//回收池可分配,从回收池取出
cn = pool.getFreezeConnection();
}
复制代码
若是回收池也不可分配,此时要判断链接池链接数量是否已经达到最大链接,若是没有达到,建立新的物理链接并直接添加到工做池中:
else if((index = pool.applyIndex()) != null) {
//若是链接数未满,建立新的物理链接,添加到工做池
cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
复制代码
若是上述三种状况都不知足,那么只能从空闲池等待其余链接的释放:
if(cn == null) {
//若是没法获取链接,阻塞等待空闲池链接
cn = pool.getIdleConnection();
}
复制代码
具体逻辑封装在HoneycombConnectionPool
的getIdleConnection
方法中:
public HoneycombConnection getIdleConnection() {
try {
//获取最大等待时间
long waitTime = config.getMaxWaitTime();
while(waitTime > 0) {
long beginPollNanoTime = System.nanoTime();
//设置超时时间,阻塞等待其余链接的释放
HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if(nc != null) {
//状态转换
if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
return nc;
}
}
long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);
//也许在超时时间内获取到了链接,可是状态转换失败,此时刷新超时时间
waitTime -= timeConsuming;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
}
throw new RuntimeException("获取链接超时");
}
复制代码
最后,判断一下链接是否被物理关闭,若是是,咱们须要打开新的链接替换已经被回收的链接:
if(cn.isClosedActive()) {
//若是物理链接关闭,则获取新的链接
cn.setConnection(super.getConnection());
}
复制代码
若是在某段时间内咱们的业务量剧增,那么须要同时工做的链接将会不少,以后过了不久,咱们的业务量降低,那么以前已经建立的链接明显饱和,这时就须要咱们对其进行回收,咱们能够经过AbstractFeature
入口操做链接池。
对于回收这个操做,咱们经过CleanerFeature
来实现:
public class CleanerFeature extends AbstractFeature{
private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);
public CleanerFeature(boolean enable, long interval) {
//enable表示是否启用
//interval表示扫描间隔
super(enable, interval);
}
@Override
public void doing(HoneycombConnectionPool pool) {
LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
Thread t = new Thread() {
@Override
public void run() {
while(true) {
try {
//回收扫描间隔
Thread.sleep(interval);
//回收时,空闲池上锁
synchronized (idleQueue) {
logger.debug("Cleaner Model To Start {}", idleQueue.size());
//回收操做
idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
try {
if(! c.isClosedActive() && c.idle()) {
c.closeActive();
pool.freeze(c);
}
} catch (SQLException e) {
e.printStackTrace();
}
});
logger.debug("Cleaner Model To Finished {}", idleQueue.size());
}
}catch(Throwable e) {
logger.error("Cleaner happended error", e);
}
}
}
};
t.setDaemon(true);
t.start();
}
}
复制代码
这里的操做很简单,对空闲池加锁,扫描全部链接,释放空闲时间超过最大空闲时间设置的链接,其实这里只要知道当前链接的空闲时长就一目了然了,咱们在链接放入空闲池时候去刷新他的空闲时间点,那么当前的空闲时长就等于当前时间减去空闲开始时间:
idleTime = nowTime - idleStartTime
复制代码
在切换状态为空闲时刷新空闲开始时间:
@Override
public boolean switchIdle() {
return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}
复制代码
体验成果的最快途径就是投入使用,这里搞一个单元测试体验一下:
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000, 1000, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
@Test
public void testConcurrence() throws SQLException, InterruptedException{
long start = System.currentTimeMillis();
HoneycombDataSource dataSource = new HoneycombDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=Asia/Shanghai");
dataSource.setUser("root");
dataSource.setPassword("root");
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
dataSource.setMaxPoolSize(50);
dataSource.setInitialPoolSize(10);
dataSource.setMinPoolSize(10);
dataSource.setMaxWaitTime(60 * 1000);
dataSource.setMaxIdleTime(10 * 1000);
dataSource.addFeature(new CleanerFeature(true, 5 * 1000));
test(dataSource, 10000);
System.out.println(System.currentTimeMillis() - start + " ms");
}
public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
CountDownLatch cdl = new CountDownLatch(count);
for(int i = 0; i < count; i ++) {
tpe.execute(() -> {
try {
HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
Statement s = connection.createStatement();
s.executeQuery("select * from test limit 1");
connection.close();
}catch(Exception e) {
}finally {
cdl.countDown();
}
});
}
cdl.await();
tpe.shutdown();
}
复制代码
PC配置:Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz 2.30 GHz 4核8G 512SSD
10000次查询,耗时:
938 ms
复制代码
结束语:再次召唤传送门:github.com/ainilili/ho…