用PHP来实现异步任务一直是个难题,现有的解决方案中:PHP知名的异步框架有swoole
和Workerman
,但都是没法在web
环境中直接使用的,即使强行搭建web
环境,异步调用也是使用 多进程模式实现的。但有时真的不须要用启动服务的方式,让服务端一直等待客户端消息,况且中间还不能改动服务端代码。本文就介绍一下不使用任何框架和第三方库的状况下,在CLI
环境中如何实现 多进程以及在web
环境中的异步调用。
web
环境的异步调用经常使用的方式有两种php
socket
链接这种方式就是典型的C/S架构,须要有服务端支持。web
// 1. 建立socket套接字 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // 2. 进行socket链接 socket_connect($socket, '127.0.0.1', '3939'); //socket_set_nonblock($socket); // 以非阻塞模式运行,因为在客户端不实用,因此这里不考虑 // 3. 向服务端发送请求 socket_write($socket, $request, strlen($request)); // 4. 接受服务端的回应消息(忽略非阻塞的状况,若是服务端不是提供异步服务,那这一步能够省略) $recv = socket_read($socket, 2048); // 5. 关闭socket链接 socket_close($socket);
popen
打开进程管道这种方式是使用操做系统命令,由操做系统直接执行。
本文讨论的异步调用就是使用这种方式。redis
$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件 $op = 'call'; //脚本文件接收的参数1 $data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2 pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开以后接着就关闭进程管道,让该进程以守护模式运行 echo PHP_EOL.'异步任务已执行。'.PHP_EOL;
这种方式的优势就是:一步解决,当前进程不须要任何开销。
缺点也很明显:没法跟踪任务脚本的运行状态。
因此重头戏会是在执行任务的脚本文件上,下面就介绍任务处理和多进程的实现方式。json
CLI
环境的多进程任务处理注意:多进程模式仅支持Linux,不支持Windows!!数组
这里会从0开始(未使用任何框架和类库)介绍每个步骤,最后会附带一份完整的代码。缓存
在PHP中就是调用 set_exception_handler
set_error_handler
register_shutdown_function
这三个函数,而后写上自定义的处理方法。swoole
spl_autoload_register
免去每使用一个新类都要 require / include
的烦恼。PHP 建立多进程是使用 pcntl_fork
函数,该函数会 fork
一份当前进程(影分身术),因而就有了两个进程,当前进程是主进程(本体),fork
出的进程是子进程(影分身)。须要注意的是两个进程代码环境是同样的,两个进程都是执行到了 pcntl_fork
函数位置。区别就是 getmypid
得到的进程号不同,最重要的区分是当调用 pcntl_fork
函数时,子进程得到的返回值是 0
,而主进程得到的是子进程的进程号 pid
。架构
好了,当咱们知道谁是子进程后,就可让该子进程执行任务了。app
那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait
。该函数有两个参数 $status
和 $options
,$status
是引用类型,用来存储子进程的状态,$options
有两个可选常量WNOHANG
| WUNTRACED
,分别表示不等待子进程结束当即返回和等待子进程结束。很明显使用WUNTRACED
会阻塞主进程。(也可使用 pcntl_waitpid
函数获取特定 pid
子进程状态)框架
在多进程中,主进程要作的就是管理每一个子进程的状态,不然子进程极可能没法退出而变成僵尸进程。
关于多进程间的消息通讯
这一块须要涉及具体的业务逻辑,因此只能简单的提一下。不考虑使用第三方好比 redis
等服务的状况下,PHP原生能够实现就是管道通讯和共享内存等方式。实现起来都比较简单,缺点就是可以使用的数据容量有限,只能用简单文本协议交换数据。
如何手动结束全部进程任务
若是多进程处理不当,极可能致使进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title
函数,而后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9
(里面的 cli_async_worker
就是自定义的进程名称),这样就能够快速结束多进程任务了。
未完待续...
如下是完整的任务执行脚本代码:
可能没法直接使用,须要修改的地方有:
Task
结尾的文件)execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);
。执行状况能够在日志文件中查看。execAsyncTask
函数参考【__使用popen
打开进程管道__】。<?php error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING); @ini_set('display_errors', 0); @ini_set('date.timezone', 'PRC'); chdir(__DIR__); /* 任务脚本目录 */ defined('TASK_PATH') or define('TASK_PATH', realpath(__DIR__ .'/tasks')); /* 任务日志目录 */ defined('TASK_LOGS_PATH') or define('TASK_LOGS_PATH', __DIR__ .'/tasks/logs'); if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true); set_exception_handler(function($e) { $time = date('H:i:s', time()); $msg = sprintf(''. '<h3>[%s] %s (%s)</h3>'. "\n". '<pre>%s</pre>', $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date('Y-m-d H:i:s', time()); $msg = <<<EOF [{$errno}] 时间:{$datetime} 信息:{$errmsg} 文件:{$filename} 行号:{$line} 追踪: {$backtrace} EOF; file_put_contents(TASK_LOGS_PATH .'/error-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); register_shutdown_function(function() { $last_error = error_get_last(); if (in_array($last_error['type'], array(E_ERROR, E_WARNING, E_USER_ERROR))) { // } debug_log('End.', true); }); function debug_log($log, $close=false) { static $fp; if (!$fp) { $fp = fopen(TASK_LOGS_PATH .'/debug-'.date('Ym').'.log', 'a+'); } $log = '['. date('Y-m-d H:i:s') .'] [Task@'. getmypid() . '] ' . trim($log) . PHP_EOL; if (flock($fp, LOCK_EX)) { fwrite($fp, $log); fflush($fp); flock($fp, LOCK_UN); } else { // } if ($close) fclose($fp); } function call($job) { if (is_callable($job)) { $ret = call_user_func($job); } elseif (is_array($job) and is_callable(@$job[0])) { $ret = call_user_func_array($job[0], array_slice($job, 1)); } else throw new \Exception('不是可执行的任务!'); return $ret; } function grab(array $job) { /* 消息数据为json,格式 { "url":"fetch_url", //拉取的连接地址 "method":"request_method", //请求方法 "data":"post_data", //POST请求数据 "args":[], //请求附加参数 headers|user_agent|proxy|timeout "callback":"callback_url", //回调地址(统一POST带回应数据) "msg_id": "message_id" //消息ID }*/ $url = $job['url']; $headers = @$job['args']['headers'] ?: []; $_headers = ''; if (is_array($headers)) { foreach ($headers as $_k => $header) { if (!is_numeric($_k)) $header = sprintf('%s: %s', $_k, $header); $_headers .= $header . "\r\n"; } } $headers = "Connection: close\r\n" . $_headers; $opts = array( 'http' => array( 'method' => strtoupper(@$job['method'] ?: 'get'), 'content' => @$job['data'] ?: null, 'header' => $headers, 'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)', 'proxy' => @$job['args']['proxy'] ?: null, 'timeout' => intval(@$job['args']['timeout'] ?: 120), 'protocol_version' => @$job['args']['protocol_version'] ?: '1.1', 'max_redirects' => 3, 'ignore_errors' => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url.' -->'.strlen($ret)); if ($ret and isset($job['callback'])) { $postdata = http_build_query(array( 'msg_id' => @$job['msg_id'] ?: 0, 'url' => @$job['url'], 'result' => $ret )); $opts = array( 'http' => array( 'method' => 'POST', 'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n", 'content' => $postdata, 'timeout' => 30 ) ); file_get_contents($job['callback'], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job['callback'].' -->'.$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) { if (fileatime($_file) < (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; } } } return $ret; } function backup($file, $dest) { $zip = new \ZipArchive(); if (!$zip->open($file, \ZipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub='') { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != '.' and $file != '..' and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log('Start...'); $t1 = microtime(true); switch($op) { case 'call': //执行任务脚本类 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case 'grab': //抓取网页 if (is_string($data)) $data = ['url' => $data]; if (is_array($data)) $ret = grab($data); else throw new \Exception('无效的命令参数!'); break; case 'clean': //清理缓存文件夹:dirs 须要清理的文件夹列表,expires 过时时间(秒,默认7天) if (isset($data['dirs'])) { $ret = clean($data['dirs'], @$data['expires']); } else { $ret = clean($data); } break; case 'backup': //备份文件:zip 备份到哪一个zip文件,dest 须要备份的文件夹 if (isset($data['zip']) and is_dir($data['dest'])) $ret = backup($data['zip'], $data['dest']); else throw new \Exception('没有指定须要备份的文件!'); break; case 'require': //加载脚本文件 if (is_file($data)) $ret = require($data); else throw new \Exception('不是可请求的文件!'); break; case 'test': sleep(rand(1, 5)); $ret = ucfirst(strval($data)). '.PID:'. getmypid(); break; case 'multi': //多进程处理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通讯管道 throw new Exception('make pipe failed!'); } } //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子进程中执行任务 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, 'w'); //写 //stream_set_blocking($pipe, false); $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]); if (strlen($_ret) > 4096) //写入管道的数据最大4K $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']); //debug_log('write pipe: '.$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子进程 } elseif ($pid > 0) { //主进程中记录任务 $childs[] = $pid; $results[$pid] = 0; debug_log('fork by child: '.$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception('could not fork at '. getmygid()); } } } $pipe = fopen($fifo, 'r+'); //读 stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //读取管道数据 $results[$pid] = $_ret; unset($childs[$i]); debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程 } usleep(200000); $n++; } debug_log('child process completed.'); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new \Exception('没有可执行的任务!'); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret; } // 读取 CLI 命令行参数 $params = getopt('', array('op:', 'data:')); $op = $params['op']; $data = unserialize(base64_decode($params['data'])); // 开始执行任务 execute_task($op, $data); function __autoload($classname) { $parts = explode('\\', ltrim($classname, '\\')); if (false !== strpos(end($parts), '_')) { array_splice($parts, -1, 1, explode('_', current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php'; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match('/.*Task$/', $classname)) { //查找以Task结尾的任务脚本类 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php'; } else { return false; } }