php在异步编程上的短板是周所周知的,为了保持语言的简单、处理逻辑的清晰,php使用了进程阻塞模型。尽管异步难以实现,需求中仍是常常会用到异步任务处理机制,好比比较耗时的邮件发送,订单生成;还有一些须要延时处理的任务;为了加快响应速度,将主干逻辑与其余不相干逻辑解耦等等。Laravel/Lumen框架已经实现了异步机制,咱们结合源码学习一下Lumen是怎样实现异步任务处理的。笔者简单总结了一下lumen实现异步队列任务的架构图:php
Lumen实现异步任务是在两个进程中进行的,一个是产生任务的生产者,另一个是单独处理任务的消费者。一般,生产者通常是咱们处理业务逻辑所在的fast-cgi进程,它将任务封装成payload,push到队列,消费者则是另外单独编写的守护进程,不停的取出队列中的任务,解析payload,消费执行。队列是Lumen实现异步处理不可缺乏的中间媒介,Lumen自己就支持Redis/Sqs/Database/Beanstalkd多种队列中间件,其中Redis使用最普遍,咱们就以Redis为例,学习Lumen使用Redis的zset、list数据结构实现失败任务重试和延时任务处理。不论是生产者仍是消费者,都使用了Lumen框架容器所提供的众多服务:任务的分发处理(BusServiceProvider)、事件的订阅/发布(EventServiceProvider)、任务队列化的实现(QueueServiceProvider)等等。mysql
咱们将结合架构图从Lumen框架的队列服务注册与启动、Bus服务注册与启动、 生产者分发任务到队列、守护进程消费任务四个阶段来解读源码,帮助读者清晰了解Lumen框架实现异步队列任务每一个阶段的工做原理。laravel
Lumen框架服务容器启动之后,经过服务提供者向容器中注册服务(服务提供者继承ServiceProvider抽象类,须要自行实现register方法)。队列的服务提供者是QueueServiceProvider类(vendor/illuminate/queue/QueueServiceProvider.php),它注册了队列用到的不少服务:git
class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->registerManager();
$this->registerConnection();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
$this->registerOpisSecurityKey();
}
......
}
复制代码
其中registerManager注册了队列管理的门面(Facade),QueueManager类底层使用了队列的链接,其中能够注册['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs']任意的队列中间件链接类,咱们以Redis为例子:github
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
return tap(new QueueManager($app), function ($manager) {
$this->registerConnectors($manager);
});
});
}
......
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
......
protected function registerRedisConnector($manager)
{
$manager->addConnector('redis', function () {
return new RedisConnector($this->app['redis']);
});
}
复制代码
QueueManager 是队列服务的总门面,提供一切与队列相关的操做接口(可使用Queue:: + 方法名来调用队列的方法)。QueueManager 中有一个成员变量 $connectors,存储着各类驱动的 connector,例如 RedisConnector、SqsConnector、DatabaseConnector、BeanstalkdConnector。 registerConnection 底层队列链接服务:redis
protected function registerConnection()
{
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}
复制代码
队列链接的时候会读取默认的配置信息,咱们查看QueueManager($app['queue']就是从容器中取出服务)类(/vendor/illuminate/queue/QueueManager.php)中的相关代码:sql
public function connection($name = null)
{
$name = $name ?: $this->getDefaultDriver();
if (! isset($this->connections[$name])) {
$this->connections[$name] = $this->resolve($name);
$this->connections[$name]->setContainer($this->app);
}
return $this->connections[$name];
}
...
protected function resolve($name)
{
$config = $this->getConfig($name);
return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
}
...
protected function getConnector($driver)
{
if (! isset($this->connectors[$driver])) {
throw new InvalidArgumentException("No connector for [$driver]");
}
return call_user_func($this->connectors[$driver]);
}
...
protected function getConfig($name)
{
if (! is_null($name) && $name !== 'null') {
return $this->app['config']["queue.connections.{$name}"];
}
return ['driver' => 'null'];
}
...
public function getDefaultDriver()
{
return $this->app['config']['queue.default'];
}
复制代码
由此咱们能够,队列首先经过getDefaultDriver方法得到驱动的链接并将其保存到驱动链接池数组中,生产者使用队列的使用能够根据驱动名称选择不一样的链接服务,例如使用sqs链接:编程
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
复制代码
咱们使用的队列驱动是Redis,config/queue.php要作相关配置:json
<?php
return [
'default' => env('QUEUE_DRIVER', 'redis'),
'connections' => [
......
'redis' => [
'driver' => 'redis',
'connection' => env('QUEUE_REDIS_CONNECTION', 'queue'),
'queue' => 'default',
'retry_after' => 60,
]
],
//失败的队列任务先不配置到队列中
/*'failed' => [
'database' => env('DB_CONNECTION', 'mysql'),
'table' => env('QUEUE_FAILED_TABLE', 'failed_jobs'),
],*/
];
复制代码
registerWorker注册消费者服务,程序会返回Illuminate\Queue\Worker类,咱们在第四部分讲解消费者的时候会详细了解它。下边的registerListener、registerFailedJobServices、registerOpisSecurityKey请你们自行阅读,其中registerListener使用到了订阅/发布模式,使用的是Lumen框架的事件Event系统,又是一个比较大的板块,还比较重要,和生产者/消费者相似,能够为队列注册不一样的监听者,当队列执行到这个状态时,就会通知监听者,例如能够在AppServiceProvider(/app/Providers/AppServiceProvider.php)启动时注册队列监听者:api
class AppServiceProvider extends ServiceProvider
{
public function boot()
{
//任务运行前
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
//任务运行后
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
//任务循环前
Queue::looping(function () {
});
//任务失败后
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
复制代码
这样就能够在任务执行的各个阶段对任务进行监控了;项目中注册监听者颇有用,举一个例子,咱们想要记录项目api业务逻辑中对DB的全部sql语句并搜集慢查询相关的信息,一样能够在AppServiceProvider中使用:
\DB::listen(function ($query) {
$sql = str_replace("?", "'%s'", $query->sql);
$sql = vsprintf($sql, $query->bindings) . " | {$query->time}";
Log::channel('sql-daily')->info($sql);
if ($query->time > 100) {
Log::warning('SLOOOOOW-SQL: ' . $sql);
}
});
复制代码
registerFailedJobServices在异步任务处理中也是颇有必要的,咱们经常也会将重试以后失败的任务保存到DB中,方便未来定位问题或再次重试。
Bus服务在Lumen系统中就是任务分发总线,就像公共汽车把乘客载到不一样的目的地,dispatch函数就是Bus服务。咱们来看一下BusServiceProvider(/vendor/illuminate/bus/BusServiceProvider.php)的注册函数:
class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
public function register()
{
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
});
$this->app->alias(
Dispatcher::class, DispatcherContract::class
);
$this->app->alias(
Dispatcher::class, QueueingDispatcherContract::class
);
}
复制代码
由此可知Bus服务就是Dispatcher类实现的,咱们结合Dispatcher类看一下生产者如何将任务给push到队列上的。
咱们在项目逻辑中每每是这样分发任务到队列的:
$job = (new ExampleJob($joblist));
dispatch($job);
复制代码
跟进dispatch是在helper.php中定义的,其中dispatch函数传入的是一个任务实例(这很重要):
if (! function_exists('dispatch')) {
function dispatch($job)
{
return new PendingDispatch($job);
}
}
复制代码
咱们继续跟进PendingDispatch类实例:
class PendingDispatch
{
protected $job;
public function __construct($job)
{
$this->job = $job;
}
...
public function __destruct()
{
app(Dispatcher::class)->dispatch($this->job);
}
复制代码
在析构函数中咱们得知,程序从Lumen服务容器中解析了Dispatcher类,调用了它的dispatch处理任务。咱们接下来看Dispatcher类(/vendor/illuminate/bus/Dispatcher.php)是如何实现的:
class Dispatcher implements QueueingDispatcher
{
......
public function __construct(Container $container, Closure $queueResolver = null)
{
$this->container = $container;
$this->queueResolver = $queueResolver;
$this->pipeline = new Pipeline($container);
}
public function dispatch($command)
{
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
return $this->dispatchToQueue($command);
}
return $this->dispatchNow($command);
}
public function dispatchNow($command, $handler = null)
{
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
return $handler->handle($command);
};
} else {
$callback = function ($command) {
return $this->container->call([$command, 'handle']);
};
}
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
...
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}
复制代码
这里的$command就是上边提到的job实例类,程序经过判断job有没有继承实现ShouldQueue接口,若是没有实现,则直接经过dispatchNow函数,经过Pipeline的send/Through/then来同步处理相关任务。咱们主要来看将任务推送到队列的状况:
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;
$queue = call_user_func($this->queueResolver, $connection);
if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
return $this->pushCommandToQueue($queue, $command);
}
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
return $queue->push($command);
}
复制代码
dispatchToQueue方法会首先判断 command->queue是查看程序是否设置了将任务推送到指定队列,$command->delay是查看程序是否将任务设置为超时任务;不一样的设置选项会调用队列驱动的不一样方法,走的是不一样的逻辑。咱们以Redis为例,代码中的queue就是RedisQueue(/vendor/illuminate/queue/RedisQueue.php),咱们来进一步查看它的处理逻辑:
class RedisQueue extends Queue implements QueueContract
{
......
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
{
$this->redis = $redis;
$this->default = $default;
$this->blockFor = $blockFor;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
}
......
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
......
protected function laterRaw($delay, $payload, $queue = null)
{
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
);
return json_decode($payload, true)['id'] ?? null;
}
复制代码
咱们这里看到的是延时队列的later方法,调用的是laterRaw方法,在传入参数的时候调用createPayload方法将job给封装成payload,这个过程很重要,由于消费者也是经过获取解析payload实现任务消费的,咱们来看一下封装payload的过程:
protected function createPayload($job, $queue, $data = '')
{
$payload = json_encode($this->createPayloadArray($job, $queue, $data));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}
return $payload;
}
protected function createPayloadArray($job, $queue, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job, $queue)
: $this->createStringPayload($job, $queue, $data);
}
protected function createObjectPayload($job, $queue)
{
$payload = $this->withCreatePayloadHooks($queue, [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'delay' => $this->getJobRetryDelay($job),
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => $job,
'command' => $job,
],
]);
return array_merge($payload, [
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
}
复制代码
能够看到,封装的payload信息中包含有不少信息,其中重试次数的控制maxTries、超时的设置timeout都在payload数组中设置,另外payload中的data还将任务的名称和序列化好的任务类serialize(clone $job)一同封装了进去。
另外咱们知道laterRaw中将延时任务经过zadd默认添加到了queue:delayed的zset中去了,其中的score添加的是 delay),咱们查看其实现:
protected function availableAt($delay = 0)
{
$delay = $this->parseDateInterval($delay);
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addRealSeconds($delay)->getTimestamp();
}
复制代码
发现score设置的正是任务执行时间的时间戳,设置可谓真是巧妙,消费者经过判断queue:delayed中大于当前时间的任务进行执行就能够实现延时任务的执行了;这种时间滑动窗口的设置在应用开发中很是常见。
咱们再来看非延时任务的执行就相对简单了不少(/vendor/illuminate/queue/RedisQueue.php):
......
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
......
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->eval(
LuaScripts::push(), 2, $this->getQueue($queue),
$this->getQueue($queue).':notify', $payload
);
return json_decode($payload, true)['id'] ?? null;
}
复制代码
redis使用lua脚本,经过rpush将任务默认推送到了queue:default队列
public static function push()
{
return <<<'LUA'
-- Push the job onto the queue...
redis.call('rpush', KEYS[1], ARGV[1])
-- Push a notification onto the "notify" queue...
redis.call('rpush', KEYS[2], 1)
LUA;
}
复制代码
使用lua脚本是为了保证redis操做命令的原子性,尤为在分布式任务中,不少服务去争抢任务的时候都须要使用lua脚本,在消费者中咱们还会看到lua脚本的使用,并且操做比这里还要复杂不少。
Lumen包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你能够经过 queue:work 命令来运行处理器。生产环境中咱们经常使用supervisor来管理这些消费任务,咱们将他们称为守护进程消费者。咱们首先来看看消费者可使用怎样的方式来启动吧:
//处理给定链接的队列
php artisan queue:work redis --queue=emails
//仅对队列中的单一任务处理
php artisan queue:work --once
//若是一个任务失败了,会被放入延时队列中取,--delay 选项能够设置失败任务的延时时间
php artisan queue:work --delay=2
//若是想要限制一个任务的内存,可使用 --memory
php artisan queue:work --memory=128
//能够指定 Lumen 队列处理器最多执行多长时间后就应该被关闭掉
php artisan queue:work --timeout=60
//能够指定 Lumen 队列处理器失败任务重试的次数
php artisan queue:work --tries=60
复制代码
咱们使用cli程序启动消费者的时候,命令行模式会调用 Illuminate\Queue\Console\WorkCommand,这个类在初始化的时候依赖注入了 Illuminate\Queue\Worker:
class WorkCommand extends Command
{
protected $signature = 'queue:work {connection? : The name of the queue connection to work} {--queue= : The names of the queues to work} {--daemon : Run the worker in daemon mode (Deprecated)} {--once : Only process the next job on the queue} {--stop-when-empty : Stop when the queue is empty} {--delay=0 : The number of seconds to delay failed jobs} {--force : Force the worker to run even in maintenance mode} {--memory=128 : The memory limit in megabytes} {--sleep=3 : Number of seconds to sleep when no job is available} {--timeout=60 : The number of seconds a child process can run} {--tries=0 : Number of times to attempt a job before logging it failed}';
protected $description = 'Start processing jobs on the queue as a daemon';
protected $worker;
public function __construct(Worker $worker)
{
parent::__construct();
$this->worker = $worker;
}
public function handle()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}
$this->listenForEvents();
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
$queue = $this->getQueue($connection);
$this->runWorker(
$connection, $queue
);
}
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$this->option('timeout'), $this->option('sleep'),
$this->option('tries'), $this->option('force'),
$this->option('stop-when-empty')
);
}
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessing::class, function ($event) {
$this->writeOutput($event->job, 'starting');
});
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, 'success');
});
$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, 'failed');
$this->logFailedJob($event);
});
}
protected function writeOutput(Job $job, $status)
{
switch ($status) {
case 'starting':
return $this->writeStatus($job, 'Processing', 'comment');
case 'success':
return $this->writeStatus($job, 'Processed', 'info');
case 'failed':
return $this->writeStatus($job, 'Failed', 'error');
}
}
protected function writeStatus(Job $job, $status, $type)
{
$this->output->writeln(sprintf(
"<{$type}>[%s][%s] %s</{$type}> %s",
Carbon::now()->format('Y-m-d H:i:s'),
$job->getJobId(),
str_pad("{$status}:", 11), $job->resolveName()
));
}
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
protected function downForMaintenance()
{
return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
}
}
复制代码
任务启动时会运行handle函数,执行任务以前,首先经过listenForEvents注册监听事件,监放任务的完成与失败状况。接下来启动runWorker方法,该函数默认会调用 Illuminate\Queue\Worker 的 daemon 函数,只有在命令中强制 --once 参数的时候,才会执行 runNestJob 函数。咱们主要看Worker类daemon函数,上边提到的超时控制、失败重试、内存限制都是在Worker中实现的:
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
}
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
$this->stopIfNecessary($options, $lastRestart, $job);
}
}
复制代码
daemon函数首先经过supportsAsyncSignals判断程序是否支持装载信号,若是支持装载信号:
...
protected function supportsAsyncSignals()
{
return extension_loaded('pcntl');
}
...
protected function listenForSignals()
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
...
复制代码
信号处理是进程间通讯的一种经常使用方式,这里主要用于接收用户在控制台发送的命令和由 Process Monitor(如 Supervisor)发送并与咱们的脚本进行通讯的异步通知。假如咱们正在执行一个很是重要可是耗时又很是长的任务,这个时候守护进程又收到了程序退出的信号,怎样使程序优雅的退出(执行完任务以后再退出),这里向你们推荐一篇文章供你们探索:supervisor在PHP项目中的使用
在真正运行任务以前,程序从 cache 中取了一次最后一次重启的时间,while(true)启动一个长时间运行的进程,使用daemonShouldRun判断当前脚本是应该处理任务,仍是应该暂停,仍是应该退出:
......
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
......
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->events->until(new Events\Looping($connectionName, $queue)) === false);
}
......
复制代码
如下几种状况,循环将不会处理任务:
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
}
复制代码
脚本在 sleep 一段时间以后,就要从新判断当前脚本是否须要 stop:
......
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
if ($this->shouldQuit) {
$this->stop();
} elseif ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
} elseif ($options->stopWhenEmpty && is_null($job)) {
$this->stop();
}
}
......
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}
......
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
......
复制代码
如下状况脚本将会被 stop:
......
public function kill($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping($status));
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
......
public function stop($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping($status));
exit($status);
}
复制代码
脚本被重启,当前的进程须要退出而且从新加载。
接下来程序获取下一个任务,命令行能够用 , 链接多个队列的名字,位于前面的队列优先级更高:
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
}
}
复制代码
$connection 是具体的驱动,咱们这里是 Illuminate\Queue\RedisQueue:
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
复制代码
在从队列中取出任务以前,须要先将 delay 队列和 reserved 队列中已经到时间的任务放到主队列中:
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
);
}
复制代码
这里一样使用了lua脚本,而且这里的lua脚本更加复杂
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i+99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end
return val
LUA;
}
复制代码
脚本的大概意思是将delay中的score大于当前事件戳的任务取出,push到主队列中去,而后将任务删除。这里使用lua脚本保证原子性。 接下来,就要从主队列中获取下一个任务,在取出下一个任务以后,还要将任务放入 reserved 队列中,当任务执行失败后,该任务会进行重试。
protected function retrieveNextJob($queue, $block = true)
{
$nextJob = $this->getConnection()->eval(
LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
$this->availableAt($this->retryAfter)
);
if (empty($nextJob)) {
return [null, null];
}
[$job, $reserved] = $nextJob;
if (! $job && ! is_null($this->blockFor) && $block &&
$this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
return $this->retrieveNextJob($queue, false);
}
return [$job, $reserved];
}
......
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
redis.call('lpop', KEYS[3])
end
return {job, reserved}
LUA;
}
......
复制代码
从 redis 中获取到 job 以后,就会将其包装成 RedisJob 类. 若是一个脚本超时, pcntl_alarm 将会启动并杀死当前的 work 进程。杀死进程后, work 进程将会被守护进程重启,继续进行下一个任务,若是任务注册有fail函数还会执行失败任务处理的相关逻辑。
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
);
}
$this->kill(1);
});
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
......
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
$this->failJob($job, $e);
}
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($job, $e);
}
}
......
protected function failJob($job, $e)
{
return $job->fail($e);
}
......
复制代码
接下来就是执行任务了,runJob逻辑和以前描述差很少:
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
}
}
......
public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
if ($job->isDeleted()) {
return $this->raiseAfterJobEvent($connectionName, $job);
}
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
......
复制代码
raiseBeforeJobEvent 函数用于触发任务处理前的事件,raiseAfterJobEvent 函数用于触发任务处理后的事件,这里再也不多说。 接下来咱们再来看一下RedisJob(/vendor/illuminate/queue/Jobs/Job.php)中的fire()函数如何处理从队列中取到的payload的:
public function fire()
{
$payload = $this->payload();
[$class, $method] = JobName::parse($payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
......
public static function parse($job)
{
return Str::parseCallback($job, 'fire');
}
public static function resolve($name, $payload)
{
if (! empty($payload['displayName'])) {
return $payload['displayName'];
}
return $name;
}
......
public static function parseCallback($callback, $default = null)
{
return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default];
}
复制代码
分析代码可知,RedisJob从payload中解析出要执行的Job类,使用队列执行器Illuminate\Queue\CallQueuedHandler@call执行调用dispatchNow执行Job类的方法完成了消费:
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
$this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
);
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
}
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
复制代码
到这里消费者从队列中取出任务到消费的整个流程咱们就走完了。咱们作一个简单的回顾总结。
Lumen框架启动时为异步队列任务提供了基础的队列服务和Bus任务分发服务。咱们程序中的生成者经过dispatch函数将任务push到队列,能够指定底层驱动,还能够设置延时任务等。dispatch函数经过Bus服务将Job类包装成payload添加到默认队列,若是是延时任务会添加到Redis的Zset结构中。消费者在处理任务的时候会装载信号,实现进程重启、退出的同时保证任务不中断;经过memory_get_usage(true)函数判断任务是否内存超限;经过payload中的maxTries判断任务是否须要重试;经过pcntl装载计时器判断是不是否执行超时;经过向任务类中添加fail函数来记录失败的任务;经过zset结构任务的中score和当前时间戳对比造成滑动窗口来执行延时任务。