本文首发于 PHP 多任务协程处理,转载请注明出处!
上周 有幸和同事一块儿在 SilverStripe 分享最近的工做事宜。今天我计划分享 PHP 异步编程,不过因为上周我聊过 ReactPHP;我决定讨论一些不同的内容。因此本文将探讨多任务协程这方面的内容。php
另外我还计划把这个主题加入到我正在筹备的一本 PHP 异步编程的图书中。虽然这本书相比本文来讲会涉及更多细节,但我以为本文依然具备实际意义!html
那么,开始吧!
new MyIterator(react
这就是本文咱们要讨论的问题。不过咱们会从更简单更熟悉的示例开始。git
咱们能够经过简单的遍从来使用数组:github
$array = ["foo", "bar", "baz"]; foreach ($array as $key => $value) { print "item: " . $key . "|" . $value . "\n"; } for ($i = 0; $i < count($array); $i++) { print "item: " . $i . "|" . $array[$i] . "\n"; }
这是咱们平常编码所依赖的基本实现。能够经过遍历数组获取每一个元素的键名和键值。shell
固然,若是咱们但愿可以知道在什么时候可使用数组。PHP 提供了一个方便的内置函数:数据库
print is_array($array) ? "yes" : "no"; // yes
有时,咱们须要对一些数据使用相同的方式进行遍历处理,但它们并不是数组类型。好比对 DOMDocument 类进行处理:编程
$document = new DOMDocument(); $document->loadXML("<div></div>"); $elements = $document->getElementsByTagName("div"); print_r($elements); // DOMNodeList Object ( [length] => 1 )
这显然不是一个数组,可是它有一个 length 属性。咱们能像遍历数组同样,对其进行遍历么?咱们能够判断它是否实现了下面这个特殊的接口:数组
print ($elements instanceof Traversable) ? "yes" : "no"; // yes
这真的太有用了。它不会致使咱们在遍历非可遍历数据时触发错误。咱们仅需在处理前进行检测便可。promise
不过,这会引起另一个问题:咱们可否让自定义类也拥有这个功能呢?回答是确定的!第一个实现方法相似以下:
class MyTraversable implements Traversable { // 在这里编码... }
若是咱们执行这个类,咱们将看到一个错误信息:
PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate
咱们没法直接实现 Traversable,可是咱们能够尝试第二种方案:
class MyTraversable implements Iterator { // 在这里编码... }
这个接口须要咱们实现 5 个方法。让咱们完善咱们的迭代器:
class MyTraversable implements Iterator { protected $data; protected $index = 0; public function __construct($data) { $this->data = $data; } public function current() { return $this->data[$this->index]; } public function next() { return $this->data[$this->index++]; } public function key() { return $this->index; } public function rewind() { $this->index = 0; } public function valid() { return $this->index < count($this->data); } }
这边咱们须要注意几个事项:
咱们能够向下面这样运行这段代码:
$iterator = new MyTraversable(["foo", "bar", "baz"]); foreach ($iterator as $key => $value) { print "item: " . $key . "|" . $value . "\n"; }
这看起来须要处理太多工做,可是这是可以像数组同样使用 foreach/for 功能的一个简洁实现。
还记得第二个接口抛出的 Traversable 异常么?下面看一个比实现 Iterator 接口更快的实现吧:
class MyIteratorAggregate implements IteratorAggregate { protected $data; public function __construct($data) { $this->data = $data; } public function getIterator() { return new ArrayIterator($this->data); } }
这里咱们做弊了。相比于实现一个完整的 Iterator,咱们经过 ArrayIterator() 装饰。不过,这相比于经过实现完整的 Iterator 简化了很多代码。
兄弟莫急!先让咱们比较一些代码。首先,咱们在不使用生成器的状况下从文件中读取每一行数据:
$content = file_get_contents(__FILE__); $lines = explode("\n", $content); foreach ($lines as $i => $line) { print $i . ". " . $line . "\n"; }
这段代码读取文件自身,而后会打印出每行的行号和代码。那么为何咱们不使用生成器呢!
function lines($file) { $handle = fopen($file, 'r'); while (!feof($handle)) { yield trim(fgets($handle)); } fclose($handle); } foreach (lines(__FILE__) as $i => $line) { print $i . ". " . $line . "\n"; }
我知道这看起来更加复杂。不错,不过这是由于咱们没有使用 file_get_contents() 函数。一个生成器看起来就像是一个函数,可是它会在每次获取到 yield 关键词是中止运行。
生成器看起来有点像迭代器:
print_r(lines(__FILE__)); // Generator Object ( )
尽管它不是迭代器,它是一个 Generator。它的内部定义了什么方法呢?
print_r(get_class_methods(lines(__FILE__))); // Array // ( // [0] => rewind // [1] => valid // [2] => current // [3] => key // [4] => next // [5] => send // [6] => throw // [7] => __wakeup // )
若是你读取一个大文件,而后使用 memory_get_peak_usage(),你会注意到生成器的代码会使用固定的内存,不管这个文件有多大。它每次进度去一行。而是用 file_get_contents() 函数读取整个文件,会使用更大的内存。这就是在迭代处理这类事物时,生成器的能给咱们带来的优点!
能够将数据发送到生成器中。看下下面这个生成器:
<?php $generator = call_user_func(function() { yield "foo"; }); print $generator->current() . "\n"; // foo
注意这里咱们如何在 call_user_func() 函数中封装生成器函数的?这里仅仅是一个简单的函数定义,而后当即调用它获取一个新的生成器实例...
咱们已经见过 yield 的用法。咱们能够经过扩展这个生成器来接收数据:
$generator = call_user_func(function() { $input = (yield "foo"); print "inside: " . $input . "\n"; }); print $generator->current() . "\n"; $generator->send("bar");
数据经过 yield 关键字传入和返回。首先,执行 current() 代码直到遇到 yield,返回 foo。send() 将输出传入到生成器打印输入的位置。你须要习惯这种用法。
因为咱们须要同这些函数进行交互,可能但愿将异常推送到生成器中。这样这些函数就能够自行处理异常。
看看下面这个示例:
$multiply = function($x, $y) { yield $x * $y; }; print $multiply(5, 6)->current(); // 30
如今让咱们将它封装到另外一个函数中:
$calculate = function ($op, $x, $y) use ($multiply) { if ($op === 'multiply') { $generator = $multiply($x, $y); return $generator->current(); } }; print $calculate("multiply", 5, 6); // 30
这里咱们经过一个普通闭包将乘法生成器封装起来。如今让咱们验证无效参数:
$calculate = function ($op, $x, $y) use ($multiply) { if ($op === "multiply") { $generator = $multiply($x, $y); if (!is_numeric($x) || !is_numeric($y)) { throw new InvalidArgumentException(); } return $generator->current(); } }; print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
若是咱们但愿可以经过生成器处理异常?咱们怎样才能将异常传入生成器呢!
$multiply = function ($x, $y) { try { yield $x * $y; } catch (InvalidArgumentException $exception) { print "ERRORS!"; } }; $calculate = function ($op, $x, $y) use ($multiply) { if ($op === "multiply") { $generator = $multiply($x, $y); if (!is_numeric($x) || !is_numeric($y)) { $generator->throw(new InvalidArgumentException()); } return $generator->current(); } }; print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
棒呆了!咱们不只能够像迭代器同样使用生成器。还能够经过它们发送数据并抛出异常。它们是可中断和可恢复的函数。有些语言把这些函数叫作……
咱们可使用协程(coroutines)来构建异步代码。让咱们来建立一个简单的任务调度程序。首先咱们须要一个 Task 类:
class Task { protected $generator; public function __construct(Generator $generator) { $this->generator = $generator; } public function run() { $this->generator->next(); } public function finished() { return !$this->generator->valid(); } }
Task 是普通生成器的装饰器。咱们将生成器赋值给它的成员变量以供后续使用,而后实现一个简单的 run() 和 finished() 方法。run() 方法用于执行任务,finished() 方法用于让调度程序知道什么时候终止运行。
而后咱们须要一个 Scheduler 类:
class Scheduler { protected $queue; public function __construct() { $this->queue = new SplQueue(); } public function enqueue(Task $task) { $this->queue->enqueue($task); } pulic function run() { while (!$this->queue->isEmpty()) { $task = $this->queue->dequeue(); $task->run(); if (!$task->finished()) { $this->queue->enqueue($task); } } } }
Scheduler 用于维护一个待执行的任务队列。run() 会弹出队列中的全部任务并执行它,直到运行完整个队列任务。若是某个任务没有执行完毕,当这个任务本次运行完成后,咱们将再次入列。
SplQueue 对于这个示例来说再合适不过了。它是一种 FIFO(先进先出:fist in first out) 数据结构,可以确保每一个任务都可以获取足够的处理时间。
咱们能够像这样运行这段代码:
$scheduler = new Scheduler(); $task1 = new Task(call_user_func(function() { for ($i = 0; $i < 3; $i++) { print "task1: " . $i . "\n"; yield; } })); $task2 = new Task(call_user_func(function() { for ($i = 0; $i < 6; $i++) { print "task2: " . $i . "\n"; yield; } })); $scheduler->enqueue($task1); $scheduler->enqueue($task2); $scheduler->run();
运行时,咱们将看到以下执行结果:
task 1: 0 task 1: 1 task 2: 0 task 2: 1 task 1: 2 task 2: 2 task 2: 3 task 2: 4 task 2: 5
这几乎就是咱们想要的执行结果。不过有个问题发生在首次运行每一个任务时,它们都执行了两次。咱们能够对 Task 类稍做修改来修复这个问题:
class Task { protected $generator; protected $run = false; public function __construct(Generator $generator) { $this->generator = $generator; } public function run() { if ($this->run) { $this->generator->next(); } else { $this->generator->current(); } $this->run = true; } public function finished() { return !$this->generator->valid(); } }
咱们须要调整首次 run() 方法调用,从生成器当前有效的指针读取运行。后续调用能够从下一个指针读取运行...
有些人基于这个思路实现了一些超赞的类库。咱们来看看其中的两个...
RecoilPHP 是一套基于协程的类库,它最使人印象深入的是用于 ReactPHP 内核。能够将事件循环在 RecoilPHP 和 RecoilPHP 之间进行交换,而你的程序无需架构上的调整。
咱们来看一下 ReactPHP 异步 DNS 解决方案:
function resolve($domain, $resolver) { $resolver ->resolve($domain) ->then(function ($ip) use ($domain) { print "domain: " . $domain . "\n"; print "ip: " . $ip . "\n"; }, function ($error) { print $error . "\n"; }) } function run() { $loop = React\EventLoop\Factory::create(); $factory = new React\Dns\Resolver\Factory(); $resolver = $factory->create("8.8.8.8", $loop); resolve("silverstripe.org", $resolver); resolve("wordpress.org", $resolver); resolve("wardrobecms.com", $resolver); resolve("pagekit.com", $resolver); $loop->run(); } run();
resolve() 接收域名和 DNS 解析器,并使用 ReactPHP 执行标准的 DNS 查找。不用太过纠结与 resolve() 函数内部。重要的是这个函数不是生成器,而是一个函数!
run() 建立一个 ReactPHP 事件循环,DNS 解析器(这里是个工厂实例)解析若干域名。一样,这个也不是一个生成器。
想知道 RecoilPHP 到底有何不一样?还但愿掌握更多细节!
use Recoil\Recoil; function resolve($domain, $resolver) { try { $ip = (yield $resolver->resolve($domain)); print "domain: " . $domain . "\n"; print "ip: " . $ip . "\n"; } catch (Exception $exception) { print $exception->getMessage() . "\n"; } } function run() { $loop = (yield Recoil::eventLoop()); $factory = new React\Dns\Resolver\Factory(); $resolver = $factory->create("8.8.8.8", $loop); yield [ resolve("silverstripe.org", $resolver), resolve("wordpress.org", $resolver), resolve("wardrobecms.com", $resolver), resolve("pagekit.com", $resolver), ]; } Recoil::run("run");
经过将它集成到 ReactPHP 来完成一些使人称奇的工做。每次运行 resolve() 时,RecoilPHP 会管理由 $resoler->resolve() 返回的 promise 对象,而后将数据发送给生成器。此时咱们就像在编写同步代码同样。与咱们在其余一步模型中使用回调代码不一样,这里只有一个指令列表。
RecoilPHP 知道它应该管理一个有执行 run() 函数时返回的 yield 数组。RoceilPHP 还支持基于协程的数据库(PDO)和日志库。
IcicleIO 为了一全新的方案实现 ReactPHP 同样的目标,而仅仅使用协程功能。相比 ReactPHP 它仅包含极少的组件。可是,核心的异步流、服务器、Socket、事件循环特性一个不落。
让咱们看一个 socket 服务器示例:
use Icicle\Coroutine\Coroutine; use Icicle\Loop\Loop; use Icicle\Socket\Client\ClientInterface; use Icicle\Socket\Server\ServerInterface; use Icicle\Socket\Server\ServerFactory; $factory = new ServerFactory(); $coroutine = Coroutine::call(function (ServerInterface $server) { $clients = new SplObjectStorage(); $handler = Coroutine::async( function (ClientInterface $client) use (&$clients) { $clients->attach($client); $host = $client->getRemoteAddress(); $port = $client->getRemotePort(); $name = $host . ":" . $port; try { foreach ($clients as $stream) { if ($client !== $stream) { $stream->write($name . "connected.\n"); } } yield $client->write("Welcome " . $name . "!\n"); while ($client->isReadable()) { $data = trim(yield $client->read()); if ("/exit" === $data) { yield $client->end("Goodbye!\n"); } else { $message = $name . ":" . $data . "\n"; foreach ($clients as $stream) { if ($client !== $stream) { $stream->write($message); } } } } } catch (Exception $exception) { $client->close($exception); } finally { $clients->detach($client); foreach ($clients as $stream) { $stream->write($name . "disconnected.\n"); } } } ); while ($server->isOpen()) { $handler(yield $server->accept()); } }, $factory->create("127.0.0.1", 6000)); Loop::run();
据我所知,这段代码所作的事情以下:
打开命令行终端输入 nc localhost 6000 查看执行结果!
该示例使用 SplObjectStorage 跟踪 socket 链接。这样咱们就能够向全部 socket 发送消息。
这个话题能够包含不少内容。但愿您能看到生成器是如何建立的,以及它们如何帮助编写迭代程序和异步代码。
若是你有问题,能够随时问我。
感谢 Nikita Popov(还有它的启蒙教程 Cooperative multitasking using coroutines (in PHP!) ), Anthony Ferrara 和 Joe Watkins。这些研究工做泽被苍生,给我以写做此篇文章的灵感。关注他们吧,好么?