本文由 Jilen 发表在 ScalaCool 团队博客。mysql
上一篇文章分析了 Mysql 异步驱动的一些缺点,大部分已经在咱们内部版本中修复了。git
其中分区设计的连接池在实际使用过程当中会产生一些很是严重的问题。github
前文中曾经提到 SingleThreadedAsyncObjectPool
这个单线程的链接池实现并非彻底非阻塞的,再多个线程请求连接状况下仍旧会产生锁阻塞。
同时文章中也提到 Play!Framework 这样的框架主线程数能够很是少,因此不用过度担心。sql
事实证实这是错误的,由于 PartitionedAsyncObjectPool
默认使用了 Executors.newCachedThreadPool
, 这就致使不论主线程数多少,高并发状况下会建立大量线程同时去获取连接。
而 SingleThreadedAsyncObjectPool
使用了 Executors.newFixedThreadPool
,显然这意味着每次入队都会产生一个锁阻塞,在系统并发很是高的状况下,这会极大加重锁竞争,一旦得到锁线程被中断,则全部的线程都会处于并发
驱动中默认状况下,存在多个 ExecutionContext
,凭空增长了内存消耗和上下文切换框架
在实际使用过程当中,咱们经历了运行一段时间后 JVM 疯狂 FGC 的状况。
经分析发现存在连接泄漏,链接池存在大量未被回收的 MySQLConnection
对象,而且很是诡异的是咱们没法定位究竟是谁持有了这些未释放的 Connection。异步
考虑到上述问题,我开始着手设计一个全新的连接池,名字就叫 NewPool
async
这种全新链接池实现主要依赖如下设计高并发
ConcurrentLinkedQueue
保存等待列表和空闲连接,全程不存在锁Semaphore
保证链接数和队列长度不超过限制主要代码以下(部分)post
val conns: ConcurrentLinkedQueue[Future[Connection]] = ...
val queue: ConcurrentLinkedQueue[Promise[Connection]] = ...
val createSemaphore: Semaphore = ...
val queueSemaphore: Semaphore = ...
def withConnection[A](f: Connection => Future[A]): Future[A] = {
val c = acquire()
c.flatMap { cc =>
f(cc).andThen { //此处可能须要 try catch 处理不按套路抛出异常的状况
case _ => release(c)
}
}
}
private def acquire(): Future[Connection] = {
val conn = conns.poll()
if (conn != null) { //有空余连接,则返回这个连接
reconnectIfDead(conn)
} else if (createSemaphore.tryAcquire()) { //链接数少于最大连接数,建立一个
createNew()
} else if (queueSemaphore.tryAcquire()) { //队列未满,入队
val p = Promise[Connection]
enqueueTask(p)
p.future
} else { //返回队列已满
Future.failed(QueueIsFull)
}
}
private def release(c: Future[Connection]) = {
val wait = queue.poll()
if (wait == null) {
conns.offer(c)
} else {
wait.completeWith(c)
queueSemaphore.release()
}
}复制代码
Semaphore
的 trypAcquire 操做和 ConcurrentLinkedQueue
都不会产生锁,确实作到了 Lock-Free。
为了验证上述猜想,我基于 scalameter 作了简单的性能测试。结果以下
新的方案(图中蓝色线条)对很是简单的查询,仍旧有 100% 左右的性能提高
执行 SQL 以下
for {
u <- c.sendQuery(s"SELECT * FROM user WHERE id = ${id}")
r <- c.sendQuery(s"UPDATE user SET remain = remain + 100 WHERE id = ${id}")
} yield r复制代码
能够看到新方案(图中绿色线条)有很是大幅度提高