码云代码仓库:https://gitee.com/tanjiajun/MysqlPoolphp
代码仓库:https://github.com/asbectJ/swoole4.githtml
在写这篇文章以前,看了好几篇实现链接池的文章,都是写的很很差的。摆明忽略了链接池的不少特性,不少都不具备抗高并发和链接复用。因此本身以为有必须把最近几天,实现一个比较完整的php数据库链接池的点滴记录下来,望能帮助各位,感激者望多点赞和打赏。mysql
所谓的数据库链接池,通常指的就是程序和数据库保持必定数量的数据库链接不断开,而且各请求的链接能够相互复用,减小重复新建数据库链接的消耗和避免在高并发的状况下出现数据库max connections等错误。本身总结了一下,若是要实现一个数据库链接池,通常有几个特色:git
总结几个特性后,一个基本链接池,大体要实现下图功能:github
swoole是一个PHP实现异步网络通讯的引擎或者扩展,其中实现了不少传统PHP-fpm没有的东西,例如异步的客户端,异步Io,常驻内存,协程等等,一个个优秀的扩展,其中异步和协程等概念能应用于高并发场景。缺点是文档和入门的门槛都比较高,须要排坑。附上swoole的运行流程和进程结构图:sql
运行流程图数据库
进程/线程架构图json
首先,为了减小你们对以后运行示例代码产生没必要要的天坑,先把注意事项和场景问题放前面:数组
一、程序中使用了协程的通讯管道channel(与go的chan差很少的),其中swoole2是不支持chan->pop($timeout)中timeout超时等待的,因此必须用swoole4版本swoole
二、使用swoole协程扩展的时候,必定不能装xdebug之类的扩展,不然报错。官方说明为:https://wiki.swoole.com/wiki/page/674.html,同时参考以下了解更多关于swoole协程的使用和注意:https://wiki.swoole.com/wiki/page/749.html
三、笔者使用的环境为:PHP 7.1.18和swoole4做为这次开发的环境
首先,这次利用swoole实现链接池,运用到swoole如下技术或者概念
一、链接变量池,这里能够看作一个数组或者队列,利用swoole全局变量的常驻内存特性,只要变量没主动unset掉,数组或队列中的链接对象能够一直保持,不释放。主要参考:https://wiki.swoole.com/wiki/page/p-zend_mm.html
二、协程。协程是纯用户状态的线程,经过协做的方式而不是抢占的方式来切换。首先这次的链接池两处用到协程:
三、Coroutine/channel通道,相似于go
语言的chan
,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。高并发时,容易出链接池为空时,若是用通常的array或者splqueue()做为介质存储链接对象变量,不能产生阻塞等待其余请求释放的效果,也就是说只能直接返回null.。因此这里用了一个swoole4协程中很牛逼的channel经过管道做为存储介质,它的出队方法pop($timeout)能够指定阻塞等待指定时间后返回。注意,是swoole2是没有超时timeout的参数,不适用此场景。在go语言中,若是chan等待或者push了没有消费或者生产一对一的状况,是会发生死锁。因此swoole4的timeout应该是为了不无限等待为空channel状况而产生。主要参考:
https://wiki.swoole.com/wiki/page/p-coroutine_channel.html
channel切换的例子:
<?php use \Swoole\Coroutine\Channel; $chan = new Channel(); go(function () use ($chan) { echo "我是第一个协程,等待3秒内有push就执行返回" . PHP_EOL; $p = $chan->pop(2);#1 echo "pop返回结果" . PHP_EOL; var_dump($p); }); go(function () use ($chan) { co::sleep(1);#2 $chan->push(1); }); echo "main" . PHP_EOL;
#1处代码会首先执行,而后遇到pop(),由于channel仍是空,会等待2s。此时协程会让出cpu,跳到第二个协程执行,而后#2出睡眠1秒,push变量1进去channel后返回#1处继续执行,成功取车经过中刚push的值1.运行结果为:
若是把#2处的睡眠时间换成大于pop()的等待时间,结果是:
<?php /** * 链接池封装. * User: user * Date: 2018/9/1 * Time: 13:36 */ use Swoole\Coroutine\Channel; abstract class AbstractPool { private $min;//最少链接数 private $max;//最大链接数 private $count;//当前链接数 private $connections;//链接池组 protected $spareTime;//用于空闲链接回收判断 //数据库配置 protected $dbConfig = array( 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 2, ); private $inited = false; protected abstract function createDb(); public function __construct() { $this->min = 10; $this->max = 100; $this->spareTime = 10 * 3600; $this->connections = new Channel($this->max + 1); } protected function createObject() { $obj = null; $db = $this->createDb(); if ($db) { $obj = [ 'last_used_time' => time(), 'db' => $db, ]; } return $obj; } /** * 初始换最小数量链接池 * @return $this|null */ public function init() { if ($this->inited) { return null; } for ($i = 0; $i < $this->min; $i++) { $obj = $this->createObject(); $this->count++; $this->connections->push($obj); } return $this; } public function getConnection($timeOut = 3) { $obj = null; if ($this->connections->isEmpty()) { if ($this->count < $this->max) {//链接数没达到最大,新建链接入池 $this->count++; $obj = $this->createObject(); } else { $obj = $this->connections->pop($timeOut);//timeout为出队的最大的等待时间 } } else { $obj = $this->connections->pop($timeOut); } return $obj; } public function free($obj) { if ($obj) { $this->connections->push($obj); } } /** * 处理空闲链接 */ public function gcSpareObject() { //大约2分钟检测一次链接 swoole_timer_tick(120000, function () { $list = []; /*echo "开始检测回收空闲连接" . $this->connections->length() . PHP_EOL;*/ if ($this->connections->length() < intval($this->max * 0.5)) { echo "请求链接数还比较多,暂不回收空闲链接\n"; }#1 while (true) { if (!$this->connections->isEmpty()) { $obj = $this->connections->pop(0.001); $last_used_time = $obj['last_used_time']; if ($this->count > $this->min && (time() - $last_used_time > $this->spareTime)) {//回收 $this->count--; } else { array_push($list, $obj); } } else { break; } } foreach ($list as $item) { $this->connections->push($item); } unset($list); }); } }
同步PDO客户端下实现
<?php /** * 数据库链接池PDO方式 * User: user * Date: 2018/9/8 * Time: 11:30 */ require "AbstractPool.php"; class MysqlPoolPdo extends AbstractPool { protected $dbConfig = array( 'host' => 'mysql:host=10.0.2.2:3306;dbname=test', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 2, ); public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolPdo(); } return self::$instance; } protected function createDb() { return new PDO($this->dbConfig['host'], $this->dbConfig['user'], $this->dbConfig['password']); } } $httpServer = new swoole_http_server('0.0.0.0', 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on("WorkerStart", function () { MysqlPoolPdo::getInstance()->init(); }); $httpServer->on("request", function ($request, $response) { $db = null; $obj = MysqlPoolPdo::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj ? $obj['db'] : null; } if ($db) { $db->query("select sleep(2)"); $ret = $db->query("select * from guestbook limit 1"); MysqlPoolPdo::getInstance()->free($obj); $response->end(json_encode($ret)); } }); $httpServer->start();
代码调用过程详解:
一、server启动时,调用init()方法初始化最少数量(min指定)的链接对象,放进类型为channelle的connections对象中。在init中循环调用中,依赖了createObject()返回链接对象,而createObject()
中是调用了原本实现的抽象方法,初始化返回一个PDO db链接。因此此时,链接池connections中有min个对象。
二、server监听用户请求,当接收发请求时,调用链接数的getConnection()方法从connections通道中pop()一个对象。此时若是并发了10个请求,server由于配置了1个worker,因此再pop到一个对象返回时,遇到sleep()的查询,由于用的链接对象是pdo的查询,此时的woker进程只能等待,完成后才能进入下一个请求。所以,池中的其他链接实际上是多余的,同步客户端的请求速度只能和woker的数量有关。
三、查询结束后,调用free()方法把链接对象放回connections池中。
ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,同步客户端方式总共运行时间为20s以上,并且mysql的链接始终维持在一条。结果以下:
<?php /** * 数据库链接池协程方式 * User: user * Date: 2018/9/8 * Time: 11:30 */ require "AbstractPool.php"; class MysqlPoolCoroutine extends AbstractPool { protected $dbConfig = array( 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'charset' => 'utf8', 'timeout' => 10, ); public static $instance; public static function getInstance() { if (is_null(self::$instance)) { self::$instance = new MysqlPoolCoroutine(); } return self::$instance; } protected function createDb() { $db = new Swoole\Coroutine\Mysql(); $db->connect( $this->dbConfig ); return $db; } } $httpServer = new swoole_http_server('0.0.0.0', 9501); $httpServer->set( ['worker_num' => 1] ); $httpServer->on("WorkerStart", function () { //MysqlPoolCoroutine::getInstance()->init()->gcSpareObject(); MysqlPoolCoroutine::getInstance()->init(); }); $httpServer->on("request", function ($request, $response) { $db = null; $obj = MysqlPoolCoroutine::getInstance()->getConnection(); if (!empty($obj)) { $db = $obj ? $obj['db'] : null; } if ($db) { $db->query("select sleep(2)"); $ret = $db->query("select * from guestbook limit 1"); MysqlPoolCoroutine::getInstance()->free($obj); $response->end(json_encode($ret)); } }); $httpServer->start();
代码调用过程详解
一、一样的,协程客户端方式下的调用,也是实现了以前封装好的链接池类AbstractPool.php。只是createDb()的抽象方法用了swoole内置的协程客户端去实现。
二、server启动后,初始化都和同步同样。不同的在获取链接对象的时候,此时若是并发了10个请求,一样是配置了1个worker进程在处理,可是在第一请求到达,pop出池中的一个链接对象,执行到query()方法,赶上sleep阻塞时,此时,woker进程不是在等待select的完成,而是切换到另外的协程去处理下一个请求。完成后一样释放对象到池中。当中有重点解释的代码段中getConnection()中。
public function getConnection($timeOut = 3) { $obj = null; if ($this->connections->isEmpty()) { if ($this->count < $this->max) {//链接数没达到最大,新建链接入池 $this->count++; $obj = $this->createObject();#1 } else { $obj = $this->connections->pop($timeOut);#2 } } else { $obj = $this->connections->pop($timeOut);#3 } return $obj; }
当调用到getConnection()时,若是此时因为大量并发请求过多,链接池connections为空,而没达到最大链接max数量时时,代码运行到#1处,调用了createObject(),新建链接返回;但若是链接池connections为空,而到达了最大链接数max时,代码运行到了#2处,也就是$this->connections->pop($timeOut),此时会阻塞$timeOut的时间,若是期间有连接释放了,会成功获取到,而后协程返回。超时没获取到,则返回false。
三、最后说一下协程Mysql客户端一项重要配置,那就是代码里$dbConfig中timeout值的配置。这个配置是意思是最长的查询等待时间。能够看一个例子说明下:
go(function () { $start = microtime(true); $db = new Swoole\Coroutine\MySQL(); $db->connect([ 'host' => '10.0.2.2', 'port' => 3306, 'user' => 'root', 'password' => 'root', 'database' => 'test', 'timeout' => 4#1 ]); $db->query("select sleep(5)"); echo "我是第一个sleep五秒以后\n"; $ret = $db->query("select user from guestbook limit 1");#2 var_dump($ret); $use = microtime(true) - $start; echo "协程mysql输出用时:" . $use . PHP_EOL; });
#1处代码,若是timeout配了4s查询超时,而第一条查询select sleep(5)阻塞后,协程切换到下一条sql的执行,其实$db并不能执行成功,由于用一个链接,同一个协程中,其实执行是同步的,因此此时第二条查询在等待4s超时后,没获取到db的链接执行,就会执行失败。而若是第一条查询执行的时间少于这个timeout,那么会执行查询成功。猜猜上面执行用时多少?结果以下:
若是把timeout换成6s呢,结果以下:
因此要注意的是,协程的客户端内执行实际上是同步的,不要理解为异步,它只是遇到IO阻塞时能让出执行权,切换到其余协程而已,不能和异步混淆。
ab -c 10 -n 10运行的结果,单个worker处理,select sleep(2) 查询睡眠2s,协程客户端方式总共运行时间为2s多。结果以下:
数据库此时的链接数为10条(show full PROCESSLIST):
再尝试 ab -c 200 -n 1000 http://127.0.0.1:9501/,200多个并发的处理,时间是20多秒,mysql链接数达到指定的最大值100个。结果以下:
如今链接池基本实现了高并发时的链接分配和控制,可是还有一些细节要处理,例如:
对于以上,但愿各大神看到后,能提供不错的意见!