Job
的定义<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class TestJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct()
{
echo '开始构造Job';
}
public function handle()
{
echo '开始处理Job';
}
}
复制代码
新建的 TestJob
类,这个类实现了序列化模型,队列功能等等都是经过trait
类来补充的。 这些特性咱们经过使用来分解。php
dispatch(new TestJob());
复制代码
这里就是执行一个 TestJob
的任务,接下去看看 dispatch()
这个方法redis
function dispatch($job)
{
if ($job instanceof Closure) {
$job = new CallQueuedClosure(new SerializableClosure($job));
}
return new PendingDispatch($job);
}
复制代码
这里会返回一个 Illuminate\Foundation\Bus\PendingDispatch
对象json
TestJob
这个对象里面经过 use Queueable
引入的几个成员属性。 目前为止咱们看到只不过是实例化了一个对象,同时将 TestJob
传给 PendingDispatch
bash
咱们来解读 PendingDispatch
这个类闭包
<?php
namespace Illuminate\Foundation\Bus;
use Illuminate\Contracts\Bus\Dispatcher;
class PendingDispatch
{
protected $job;
public function __construct($job)
{
// "接收传入的 job 对象"
$this->job = $job;
}
public function onConnection($connection)
{
// "设置任务指定链接"
$this->job->onConnection($connection);
return $this;
}
public function onQueue($queue)
{
// "设置任务队列名"
$this->job->onQueue($queue);
return $this;
}
public function allOnConnection($connection)
{
// "设置工做链全部须要的链接"
$this->job->allOnConnection($connection);
return $this;
}
public function allOnQueue($queue)
{
// "设置工做链的队列"
$this->job->allOnQueue($queue);
return $this;
}
public function delay($delay)
{
// "设置延迟时间"
$this->job->delay($delay);
return $this;
}
public function chain($chain)
{
// "设置工做链任务"
$this->job->chain($chain);
return $this;
}
public function __destruct()
{
// "经过析构函数来转发job"
app(Dispatcher::class)->dispatch($this->job);
}
}
复制代码
分解完这个类,其实大部分都是设置参数的过程,也是经过这些参数来控制任务的执行状态,好比延迟,工做链模式运行等等。app
重点在析构函数,当运行完
return new PendingDispatch($job);
以后对象若是没有被任何变量接收,那么对象的内存空间会被回收,从而触发析构函数执行,也是触发job
继续执行的方式!dom
public function __destruct()
{
// "经过析构函数来转发job"
app(Dispatcher::class)->dispatch($this->job);
}
复制代码
app(Dispatcher::class)
传入的参数是 Illuminate\Bus\Dispatcher
, 这个契约对应的绑定类是经过配置文件 app.providers.Illuminate\Bus\BusServiceProvider::class
来加载的 关于 provider
的启动在第九章中有讲,咱们直接看启动方法ide
public function register()
{
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
});
$this->app->alias(
Dispatcher::class, DispatcherContract::class
);
$this->app->alias(
Dispatcher::class, QueueingDispatcherContract::class
);
}
复制代码
app(Dispatcher::class)
的实质就是这个闭包的返回函数
function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
}
复制代码
看看 Dispatcher
构造函数ui
public function __construct(Container $container, Closure $queueResolver = null)
{
$this->container = $container;
$this->queueResolver = $queueResolver;
$this->pipeline = new Pipeline($container);
}
复制代码
接受两个参数,第一个是容器,第二个就是闭包因此 $this->queueResolver
就是
function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
}
复制代码
我管这个 $this->queueResolver
叫解析器,做用是接收一个 $connection
而后从容器中解析出队列的驱动并进行链接。
QueueFactoryContract::class
是经过 provider
加载的
位于 app.providers.Illuminate\Queue\QueueServiceProvider::class,
返回的对象是 Illuminate\Queue\QueueManager
因为 'default' => env('QUEUE_CONNECTION', 'sync'),
中配置的 redis
因此最后返回的对象是 Illuminate\Queue\RedisQueue
public function dispatch($command)
{
// "$this->queueResolver 这个队列解析器是在构造的时候注入的"
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
return $this->dispatchToQueue($command);
}
return $this->dispatchNow($command);
}
复制代码
上面的方法明确了任务是该经过队列仍是同步执行。
这里咱们看,传入的 $command
就是开始的 TestJob
对象。
还记得 Laravel
文档说的若是要经过队列实现须要实现一个指定的接口吗
implements ShouldQueue
,这段代码就是解释了缘由。
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}
复制代码
继续下去,经过上面的判断以后咱们进入 dispatchToQueue($command)
这里
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;
$queue = call_user_func($this->queueResolver, $connection);
if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
return $this->pushCommandToQueue($queue, $command);
}
复制代码
上面解析过了 $queue
就是 Illuminate\Queue\RedisQueue
这个对象
// "返回 false"
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
复制代码
全部最后执行了 return $this->pushCommandToQueue($queue, $command);
protected function pushCommandToQueue($queue, $command)
{
// "若是存在指定的队列和延迟,则推入指定队列+延迟"
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
// "若是存在指定的队列则push到指定的队列"
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
// "只存在延迟设置,推入延迟"
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
// "默认"
return $queue->push($command);
}
复制代码
上面已经到了最终的调用,那么接下来的事情就是构造一个什么样格式的数据存入
redis
追踪 $queue->push($command)
// "这里的 $job 就是最开始传入的 TestJob 对象!"
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
复制代码
构造 payload
protected function createPayload($job, $queue, $data = '')
{
$payload = json_encode($this->createPayloadArray($job, $queue, $data));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}
return $payload;
}
复制代码
这里的 createPayloadArray() 先调用Illuminate\Queue\RedisQueue
对象的
protected function createPayloadArray($job, $queue, $data = '')
{
return array_merge(parent::createPayloadArray($job, $queue, $data), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
}
复制代码
追踪父类Illuminate\Queue\Queue
方法
protected function createPayloadArray($job, $queue, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job, $queue)
: $this->createStringPayload($job, $queue, $data);
}
// "$job 是对象的时候格式化方式"
protected function createObjectPayload($job, $queue)
{
$payload = $this->withCreatePayloadHooks($queue, [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null, // "这是任务设置的重试次数"
'timeout' => $job->timeout ?? null, // "这是超时时间"
'timeoutAt' => $this->getJobExpiration($job), // "获取处理过时时间"
'data' => [
'commandName' => $job,
'command' => $job,
],
]);
return array_merge($payload, [
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
// "序列化,这里的序列化会调用 SerializesModels 特质类的__sleep()方法 在开头的时候全部的 Job 类都有use"
],
]);
}
// "$job 是字符串的时候格式化方式"
protected function createStringPayload($job, $queue, $data)
{
return $this->withCreatePayloadHooks($queue, [
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
'job' => $job,
'maxTries' => null,
'timeout' => null,
'data' => $data,
]);
}
复制代码
将获取的最后 json
字符串 rpush
到 redis
中。
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'] ?? null;
}
复制代码
return $queue->later($command->delay, $command);
, 逻辑基本上同样,只不过最后存入的队列是名不同
到这里位置关于任务和队列的应用写入端口已经完成,最终是把指定的格式的数据存入配置的存储驱动中的过程。