MySQL 异步驱动浅析 (三):链接池改进方案

本文由 Jilen 发表在 ScalaCool 团队博客。mysql

上一篇文章分析了 Mysql 异步驱动的一些缺点,大部分已经在咱们内部版本中修复了。git

其中分区设计的连接池在实际使用过程当中会产生一些很是严重的问题。github

链接池中的锁阻塞

Mysql Async Pool
Mysql Async Pool

前文中曾经提到 SingleThreadedAsyncObjectPool 这个单线程的链接池实现并非彻底非阻塞的,再多个线程请求连接状况下仍旧会产生锁阻塞。
同时文章中也提到 Play!Framework 这样的框架主线程数能够很是少,因此不用过度担心。sql

事实证实这是错误的,由于 PartitionedAsyncObjectPool 默认使用了 Executors.newCachedThreadPool, 这就致使不论主线程数多少,高并发状况下会建立大量线程同时去获取连接。
SingleThreadedAsyncObjectPool 使用了 Executors.newFixedThreadPool,显然这意味着每次入队都会产生一个锁阻塞,在系统并发很是高的状况下,这会极大加重锁竞争,一旦得到锁线程被中断,则全部的线程都会处于并发

频繁的线程切换

驱动中默认状况下,存在多个 ExecutionContext,凭空增长了内存消耗和上下文切换框架

难以定位的内存泄漏

在实际使用过程当中,咱们经历了运行一段时间后 JVM 疯狂 FGC 的状况。
经分析发现存在连接泄漏,链接池存在大量未被回收的 MySQLConnection 对象,而且很是诡异的是咱们没法定位究竟是谁持有了这些未释放的 Connection。异步

考虑到上述问题,我开始着手设计一个全新的连接池,名字就叫 NewPoolasync

设计一个彻底无锁无阻塞的链接池

这种全新链接池实现主要依赖如下设计高并发

  • 使用两个 ConcurrentLinkedQueue 保存等待列表和空闲连接,全程不存在锁
  • 使用 Semaphore 保证链接数和队列长度不超过限制

主要代码以下(部分)post

val conns: ConcurrentLinkedQueue[Future[Connection]] = ...
val queue: ConcurrentLinkedQueue[Promise[Connection]] = ...
val createSemaphore: Semaphore = ...
val queueSemaphore: Semaphore = ...

def withConnection[A](f: Connection => Future[A]): Future[A] = {
    val c = acquire()
    c.flatMap { cc =>
      f(cc).andThen { //此处可能须要 try catch 处理不按套路抛出异常的状况
        case _ => release(c)
      }
    }
  }

  private def acquire(): Future[Connection] = {
    val conn = conns.poll()
    if (conn != null) { //有空余连接,则返回这个连接
      reconnectIfDead(conn)
    } else if (createSemaphore.tryAcquire()) { //链接数少于最大连接数,建立一个
      createNew()
    } else if (queueSemaphore.tryAcquire()) { //队列未满,入队
      val p = Promise[Connection]
      enqueueTask(p)
      p.future
    } else { //返回队列已满
      Future.failed(QueueIsFull)
    }
  }


  private def release(c: Future[Connection]) = {
    val wait = queue.poll()
    if (wait == null) {
      conns.offer(c)
    } else {
      wait.completeWith(c)
      queueSemaphore.release()
    }
  }复制代码

Semaphore 的 trypAcquire 操做和 ConcurrentLinkedQueue 都不会产生锁,确实作到了 Lock-Free。

性能测试

为了验证上述猜想,我基于 scalameter 作了简单的性能测试。结果以下

简单查询(SELECT 1)

新的方案(图中蓝色线条)对很是简单的查询,仍旧有 100% 左右的性能提高

select performance
select performance

简单事务(SELECT + UPDATE)

执行 SQL 以下

for {
  u <- c.sendQuery(s"SELECT * FROM user WHERE id = ${id}")
  r <- c.sendQuery(s"UPDATE user SET remain = remain + 100 WHERE id = ${id}")
} yield r复制代码

能够看到新方案(图中绿色线条)有很是大幅度提高

transaction performance
transaction performance
相关文章
相关标签/搜索