swoole实现Timer定时器、心跳检测及Task进阶实例:mysql链接池

Table of Contents

环境说明: 系统:Ubuntu14.04 (安装教程包括CentOS6.5)
PHP版本:PHP-5.5.10
swoole版本:1.7.7-stablephp

1.Timer定时器

在实际应用中,每每会遇到须要每隔一段时间重复作一件事,好比心跳检测、订阅消息、数据库备份等工做。一般,咱们会借助PHP的time()以及相关函数本身实现一个定时器,或者使用crontab工具来实现。可是,自定义的定时器容易出错,而使用crontab则须要编写额外的脚本文件,不管是迁移仍是调试都比较麻烦。
所以,Swoole提供了一个内置的Timer定时器功能,经过函数addtimer便可在Swoole中添加一个定时器,该定时器会在创建以后,按照预先设定好的时间间隔,每到对应的时间就会调用一次回调函数onTimer通知Server。
简单示例以下:mysql

$this->serv->on('Timer', array($this, 'onTimer'));

    public function onWorkerStart( $serv , $worker_id) {
        // 在Worker进程开启时绑定定时器
        // 只有当worker_id为0时才添加定时器,避免重复添加
        if( $worker_id == 0 ) {
            $serv->addtimer(500);
            $serv->addtimer(1000);
            $serv->addtimer(1500);
        }
    }

    public function onTimer($serv, $interval) {
        switch( $interval ) {
            case 500: { // 
                echo "Do Thing A at interval 500\n";
                break;
            }
            case 1000:{
                echo "Do Thing B at interval 1000\n";
                break;
            }
            case 1500:{
                echo "Do Thing C at interval 1500\n";
                break;
            }
        }
    }

能够看到,在onWorkerStart回调函数中,经过addtimer添加了三个定时器,时间间隔分别为500、1000、1500。而在onTimer回调中,正好经过间隔的不一样来区分不一样的定时器回调,从而执行不一样的操做。
须要注意的是,在上述示例中,当1000ms的定时器被触发时,500ms的定时器一样会被触发,可是不能保证会在1000ms定时器前触发仍是后触发,所以须要注意,定时器中的操做不能依赖其余定时器的执行结果。git

点此查看完整示例github

(PS:在Swoole-1.7.7版本,新提供了一个after函数, 这个功能的用法会在之后的教程中给出。)sql

2.心跳检测

上文提到过,使用Timer定时器功能能够实现发送心跳包的功能。事实上,Swoole已经内置了心跳检测功能,能自动close掉长时间没有数据来往的链接。而开启心跳检测功能,只须要设置heartbeat_check_intervalheartbeat_idle_time便可。以下:数据库

$this->serv->set(
    array(
        'heartbeat_check_interval' => 60,
        'heartbeat_idle_time' => 600,
    )
);

其中heartbeat_idle_time的默认值是heartbeat_check_interval的两倍。 在设置这两个选项后,swoole会在内部启动一个线程,每隔heartbeat_check_interval秒后遍历一次所有链接,检查最近一次发送数据的时间和当前时间的差,若是这个差值大于heartbeat_idle_time,则会强制关闭这个链接,并经过回调onClose通知Server进程。 点此查看完整示例 小技巧: 结合以前的Timer功能,若是咱们想维持链接,就设置一个略小于若是这个差值大于heartbeat_idle_time的定时器,在定时器内向全部链接发送一个心跳包。若是收到心跳回应,则判断链接正常,若是没有收到,则关闭这个链接或者再次尝试发送。json

3.Task进阶:MySQL链接池

上一章中我简单讲解了如何开启和使用Task功能。这一节,我将提供一个Task的高级用法。swoole

在PHP中,访问MySQL数据库每每是性能提高的瓶颈。而MySQL链接池我想你们都不陌生,这是一个很好的提高数据库访问性能的方式。传统的MySQL链接池,是预先申请必定数量的链接,每个新的请求都会占用其中一个链接,请求结束后再将链接放回池中,若是全部链接都被占用,新来的链接则会进入等待状态。
知道了MySQL链接池的实现原理,那咱们来看如何使用Swoole实现一个链接池。
首先,Swoole容许开启必定量的Task Worker进程,咱们可让每一个进程都拥有一个MySQL链接,并保持这个链接,这样,咱们就建立了一个链接池。
其次,设置swoole的dispatch_mode为抢占模式(主进程会根据Worker的忙闲状态选择投递,只会投递给处于闲置状态的Worker)。这样,每一个task都会被投递给闲置的Task Worker。这样,咱们保证了每一个新的task都会被闲置的Task Worker处理,若是所有Task Worker都被占用,则会进入等待队列。并发

下面直接上关键代码:框架

public function onWorkerStart( $serv , $worker_id) {
    echo "onWorkerStart\n";
    // 断定是否为Task Worker进程
    if( $worker_id >= $serv->setting['worker_num'] ) {
        $this->pdo = new PDO(
            "mysql:host=localhost;port=3306;dbname=Test", 
            "root", 
            "123456", 
            array(
                PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
                PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                PDO::ATTR_PERSISTENT => true
            )
        );
    }
}

首先,在每一个Task Worker进程中,建立一个MySQL链接。这里我选用了PDO扩展。

public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
    $sql = array(
        'sql'=>'select * from Test where pid > ?',
        'param' => array(
            0
        ),
        'fd' => $fd
    );
    $serv->task( json_encode($sql) );
}

其次,在须要的时候,经过task函数投递一个任务(也就是发起一次SQL请求)

public function onTask($serv,$task_id,$from_id, $data) {
    $sql = json_decode( $data , true );

    $statement = $this->pdo->prepare($sql['sql']);
    $statement->execute($sql['param']);     

    $result = $statement->fetchAll(PDO::FETCH_ASSOC);
    $serv->send( $sql['fd'],json_encode($result));
    return true;
}

最后,在onTask回调中,根据请求过来的SQL语句以及相应的参数,发起一次MySQL请求,并将获取到的结果经过send发送给客户端(或者经过return返回给Worker进程)。并且,这样的一次MySQL请求还不会阻塞Worker进程,Worker进程能够继续处理其余的逻辑。

能够看到,简单十几行代码,就实现了一个高效的异步MySQL链接池。
经过测试,单个客户端一共发起1W次select请求,共耗时9s;
1W次insert请求,共耗时21s。
(客户端会在每次收到前一个请求的结果后才会发起下一次请求,而不是并发)。

点此查看完整服务端代码
点此查看完整客户端代码

4.Task实战:yii中应用task

在YII框架中结合了swoole 的task 作了异步处理。 本例中 主要用到 一、protected/commands/ServerCommand.php 用来作server。 二、protected/event/下的文件 这里是在异步中的具体实现。

客户端调用参照 TestController

<?php
class TestController extends Controller{
    public function actionTT(){
        $message['uid'] = 2;
        $message['email'] = '83212019@qq.com';
        $message['title'] = '接口报警邮件';
        $message['contents'] = "'EmailEvent'接口请求过程出错! 错误信息以下:err_no:'00000' err_msg:'测试队列' 请求参数为:'[]'";
        $message['type'] = 2;

        $data['param'] = $message;
        $data['class'] = 'Email';
        $client = new EventClient();
        $data = $client->send($data);
    }
}
?>

有个task表是用来记录异步任务的。若是失败重试3次。sql在protected/data/sql.sql里。
点此查看完整客户端代码

下章预告:Swoole多端口监听、热重启以及Timer进阶:简单crontab

相关文章
相关标签/搜索