Swoole从入门到入土(5)——TCP服务器[异步任务]

不管对于B/S仍是C/S,程序再怎么变,惟一不变的是用户不想等过久的躁动心情。因此服务端对于客户的请求,能有多快就多快。若是服务端须要执行很耗时的操做,就须要异步任务处理机制,保证当前的响应速度不受影响。php

如今如下面的一个例子为引子:编程

<?php
$server = new Swoole\Server('0.0.0.0', 9501);
$server->set([
    'max_wait_time'=>60,
    'reload_async'=>true,
    'worker_num'=>1,
    'task_worker_num'=>1,
    'task_max_request'=>100
]);

//监听链接进入事件
$server->on('Connect', function ($server, $fd) {
    
});


//监听数据接收事件
$server->on('Receive', function ($server, $fd, $from_id, $data) {
    $server->task("receive to task:$data");    //将任务丢入异步任务列队
});

$server->on("task",function ($serv, $fd, $from_id, $data){
    //在这里处理任务
    $serv->finish("$data ok");    //处理完成后,将结果传给finish
});

$server->on("finish",function ($serv, $task_id, $data){
    //处理任务完成后的事情
    echo "finish $data\n";
});

//监听链接关闭事件
$server->on('Close', function ($server, $fd) {

});

//启动服务器
$server->start();

 

上面这个例子,在以前的代码上增长了两个属性(task_worker_num和task_max_request)和两个事件(onTask和onFinish)。代码中同时设置了这四项,即可以启动swoole的异步任务处理。是否是很简单?json

那么如今分别了解关于异步任务的新属性和事件:服务器

 

新配置:swoole

1) task_worker_num:配置 Task 进程的数量。【默认值:未配置则不启动 task异步

配置此参数后将会启用 task 功能。因此 Server 必需要注册 onTask、onFinish 2 个事件回调函数。若是没有注册,服务器程序将没法启动。socket

注意:async

Task 进程是同步阻塞的;函数

最大值不得超过 swoole_cpu_num() * 1000;学习

若是单个 task 的处理须要 100ms,那一个进程 1 秒就能够处理 1/0.1=10 个 task;则task 投递的速度,如每秒产生 2000 个 task,2000/10=200,就须要设置 task_worker_num => 200,启用 200 个 Task 进程;

Task 进程内不能使用 Swoole\Server->task 方法。

 

2) task_max_request:设置 task 进程的最大任务数。【默认值:0】

设置 task 进程的最大任务数。一个 task 进程在处理完超过此数值的任务后将自动退出。这个参数是为了防止 PHP 进程内存溢出。若是不但愿进程自动退出能够设置为 0。

 

3) task_tmpdir:设置 task 的数据临时目录。【默认值:Linux /tmp 目录】

在 Server 中,若是投递的数据超过 8180 字节,将启用临时文件来保存数据。这里的 task_tmpdir 就是用来设置临时文件保存的位置。

注意:

底层默认会使用 /tmp 目录存储 task 数据,若是你的 Linux 内核版本太低,/tmp 目录不是内存文件系统,能够设置为 /dev/shm/;

task_tmpdir 目录不存在,底层会尝试自动建立

 

4)task_use_object:使用面向对象风格的 Task 回调格式。【默认值:false】

设置为 true 时,onTask 回调将变成对象模式。

//面向对象风格代码示例$server = new Swoole\Server('127.0.0.1', 9501);
$server->set([
    'worker_num'      => 1,
    'task_worker_num' => 3,
    'task_use_object' => true,
]);
$server->on('receive', function (Swoole\Server $server, $fd, $tid, $data) {
    $server->task(['fd' => $fd,]);
});
$server->on('Task', function (Swoole\Server $server, Swoole\Server\Task $task) {
    //此处$task是Swoole\Server\Task对象
    $server->send($task->data['fd'], json_encode($server->stats()));
});
$server->start();

 

5) task_ipc_mode(进阶):设置 Task 进程与 Worker 进程之间通讯的方式。【默认值:1】

这是一个进阶属性,正常状况下是不用设置,使用默认值便可。要了解这个属性,请先看文末的高级话题 :什么是IPC? 

 知道了什么是IPC后,这个属性可取如下三个值:

其中:

模式1:支持定向投递,可在 task 和 taskwait 方法中使用 dst_worker_id,指定目标 Task进程。dst_worker_id 设置为 -1 时,底层会判断每一个 Task 进程的状态,向当前状态为空闲的进程投递任务。

模式二、3:消息队列模式使用操做系统提供的内存队列存储数据,未指定 mssage_queue_key 消息队列 Key,将使用私有队列,在 Server 程序终止后会删除消息队列。指定消息队列 Key 后 Server 程序终止后,消息队列中的数据不会删除,所以进程重启后仍然能取到数据。这二者的不一样之处在于,模式2 支持定向投递,$serv->task($data, $task_worker_id) 能够指定投递到哪一个 task 进程。模式3 是彻底争抢模式, task 进程会争抢队列,将没法使用定向投递,task/taskwait 将没法指定目标进程 ID,即便指定了 $task_worker_id,在模式3 下也是无效的。另外:模式3 会影响 sendMessage 方法,使 sendMessage 发送的消息会随机被某一个 task 进程获取。

 

 

新函数

函数task:投递一个异步任务到 task_worker 池中。此函数是非阻塞的,执行完毕会当即返回。Worker 进程能够继续处理新的请求。

Swoole\Server->task(mixed $data, int $dstWorkerId = -1): int

$data:要投递的任务数据,必须是可序列化的 PHP 变量。

$dstWorkerId:能够指定要给投递给哪一个 Task 进程,传入 ID 便可,范围参考 $worker_id;默认值:-1。

 

函数finish: 用于在 Task 进程中通知 Worker 进程,投递的任务已完成。此函数能够传递结果数据给 Worker 进程(即,触发worker进程的onFinish事件)。

Swoole\Server->finish(mixed $data)

$data:任务处理的结果内容

注意:·

finish 方法能够连续屡次调用,Worker 进程会屡次触发 onFinish 事件;

在 onTask 回调函数中调用过 finish 方法后,return 数据依然会触发 onFinish 事件;

Server->finish 是可选的。若是 Worker 进程不关心任务执行的结果,不须要调用此函数;

在 onTask 回调函数中 return 字符串,等同于调用 finish;

 

 

新事件

1) 事件onTask:在 task 进程内被调用。worker 进程可使用 task 函数向 task_worker 进程投递新的任务。当前的 Task 进程在调用 onTask 回调函数时会将进程状态切换为忙碌,这时将再也不接收新的 Task,当 onTask 函数返回时会将进程状态切换为空闲而后继续接收新的 Task。

function onTask(Swoole\Server $server, int $task_id, int $src_worker_id, mixed $data);

$server:Swoole\Server 对象

$task_id:执行任务的 task 进程 id【$task_id 和 $src_worker_id 组合起来才是全局惟一的,不一样的 worker 进程投递的任务 ID 可能会有相同

$src_worker_id:投递任务的 worker 进程 id

$data:任务的数据内容

注意:

若是开启了 task_enable_coroutine 则回调函数原型是:

$server->on('Task', function (Swoole\Server $server, Swoole\Server\Task $task) {
    $task->worker_id;              //来自哪一个`Worker`进程
    $task->id;                     //任务的编号
    $task->flags;                  //任务的类型,taskwait, task, taskCo, taskWaitMulti 可能使用不一样的 flags
    $task->data;                   //任务的数据
    co::sleep(0.2);                //协程 API
    $task->finish([123, 'hello']); //完成任务,结束并返回数据
});

在 onTask 函数中 return 字符串(return 的变量能够是任意非 null 的 PHP 变量),表示将此内容返回给 worker 进程。也能够经过 Swoole\Server->finish() 来触发 onFinish 函数,而无需再 return。此时worker 进程中会触发 onFinish 函数,表示投递的 task 已完成。

onTask 函数执行时遇到致命错误退出,或者被外部进程强制 kill,当前的任务会被丢弃,但不会影响其余正在排队的 Task。

 

2)事件onFinish:在 worker 进程被调用,当 worker 进程投递的任务在 task 进程中完成时被触发。

function onFinish(Swoole\Server $server, int $task_id, mixed $data)

$server:Swoole\Server 对象;

$task_id:执行任务的 task 进程 id;

$data:任务处理的结果内容。

注意:

- task 进程的 onTask 事件中没有调用 finish 方法或者 return 结果,worker 进程不会触发 onFinish。

-执行 onFinish 逻辑的 worker 进程与下发 task 任务的 worker 进程是同一个进程。

 

 

关于异步任务的注意点:

-使用消息队列通讯,若是 Task进程 处理能力低于投递速度,可能会引发 Worker 进程阻塞。

-使用消息队列通讯后 task 进程没法支持协程 (开启 task_enable_coroutine)。

 

 

 

----------- 高级话题分隔线--------------

什么是IPC

同一台主机上两个进程间通讯 (简称 IPC) 的方式有不少种,在 Swoole 下咱们使用了 2 种方式 Unix Socket 和 sysvmsg,下面分别介绍:

第一种:Unix Socket

全名 UNIX Domain Socket, 简称 UDS, 使用套接字的 API (socket,bind,listen,connect,read,write,close 等),和 TCP/IP 不一样的是不须要指定 ip 和 port,而是经过一个文件名来表示 (例如 FPM 和 Nginx 之间的 /tmp/php-fcgi.sock),UDS 是 Linux 内核实现的全内存通讯,无任何 IO 消耗。在 1 进程 write,1 进程 read,每次读写 1024 字节数据的测试中,100 万次通讯仅需 1.02 秒,并且功能很是的强大,Swoole 下默认用的就是这种 IPC 方式

Swoole 下面使用 UDS 通信有两种类型:SOCK_STREAM 和 SOCK_DGRAM,能够简单的理解为 TCP 和 UDP 的区别,当使用 SOCK_STREAM 类型的时候一样须要考虑 TCP 粘包问题。

当使用 SOCK_DGRAM 类型的时候不须要考虑粘包问题,每一个 send() 的数据都是有边界的,发送多大的数据接收的时候就收到多大的数据,没有传输过程当中的丢包、乱序问题,send 写入和 recv 读取的顺序是彻底一致的。send 返回成功后必定是能够 recv 到。
在 IPC 传输的数据比较小时很是适合用 SOCK_DGRAM 这种方式,因为 IP 包每一个最大有 64k 的限制,因此用 SOCK_DGRAM 进行 IPC 时候单次发送数据不能大于 64k,同时要注意收包速度太慢操做系统缓冲区满了会丢弃包,由于 UDP 是容许丢包的,能够适当调大缓冲区。

第二种:sysvmsg

即 Linux 提供的消息队列,这种 IPC 方式经过一个文件名来做为 key 进行通信,这种方式很是的不灵活,实际项目使用的并很少,不作过多介绍。此种 IPC 方式只有两个场景下有用:

1)防止丢数据,若是整个服务都挂掉,再次启动队列中的消息也在,能够继续消费,但一样有脏数据的问题。

2)能够外部投递数据,好比 Swoole 下的 Worker进程经过消息队列给 Task进程投递任务,第三方的进程也能够投递任务到队列里面让 Task 消费,甚至能够在命令行手动添加消息到队列。

 

 

 

---------------------------  我是可爱的分割线  ----------------------------

最后博主借地宣传一下,漳州编程小组招新了,这是一个面向漳州青少年信息学/软件设计的学习小组,有意向的同窗点击连接,联系我吧。

相关文章
相关标签/搜索