剖析Laravel队列系统--推送做业到队列

原文连接https://divinglaravel.com/queue-system/pushing-jobs-to-queuephp

There are several ways to push jobs into the queue:
有几种方法能够将做业推送到队列中:laravel

Queue::push(new InvoiceEmail($order));

Bus::dispatch(new InvoiceEmail($order));

dispatch(new InvoiceEmail($order));

(new InvoiceEmail($order))->dispatch();

As explained in a previous dive, calls on the Queue facade are calls on the queue driver your app uses, calling the push method for example is a call to the push method of the Queue\DatabaseQueue class in case you're using the database queue driver.数据库

There are several useful methods you can use:闭包

调用Queue facade是对应用程序使用的队列驱动的调用,若是你使用数据库队列驱动,调用push方法是调用Queue\DatabaseQueue类的push方法。app

有几种有用的方法能够使用:ide

// 将做业推送到特定的队列
Queue::pushOn('emails', new InvoiceEmail($order));

// 在给定的秒数以后推送做业
Queue::later(60, new InvoiceEmail($order));

// 延迟后将做业推送到特定的队列
Queue::laterOn('emails', 60, new InvoiceEmail($order));

// 推送多个做业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
]);

// 推送特定队列上的多个做业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
], null, 'emails');

After calling any of these methods, the selected queue driver will store the given information in a storage space for workers to pick up on demand.函数

调用这些方法以后,所选择的队列驱动会将给定的信息存储在存储空间中,供workers按需获取。ui

Using the command bus

使用命令总线

Dispatching jobs to queue using the command bus gives you extra control; you can set the selected connection, queue, and delay from within your job class, decide if the command should be queued or run instantly, send the job through a pipeline before running it, actually you can even handle the whole queueing process from within your job class.this

The Bus facade proxies to the Contracts\Bus\Dispatcher container alias, this alias is resolved into an instance of Bus\Dispatcher inside Bus\BusServiceProvider:spa

使用命令总线调度做业进行排队能够给你额外控制权; 您能够从做业类中设置选定的connection, queue, and delay 来决定命令是否应该排队或当即运行,在运行以前经过管道发送做业,实际上你甚至能够从你的做业类中处理整个队列过程。

Bug facade代理到 Contracts\Bus\Dispatcher 容器别名,此别名解析为Bus\Dispatcher内的Bus\BusServiceProvider的一个实例:

$this->app->singleton(Dispatcher::class, function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
});

因此Bus::dispatch() 调用的 dispatch() 方法是 Bus\Dispatcher 类的:

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}

This method decides if the given job should be dispatched to queue or run instantly, the commandShouldBeQueued() method checks if the job class is an instance of Contracts\Queue\ShouldQueue, so using this method your job will only be queued in case you implement the ShouldQueue interface.

We're not going to look into dispatchNow in this dive, it'll be discussed in detail when we dive into how workers run jobs. For now let's look into how the Bus dispatches your job to queue:

该方法决定是否将给定的做业分派到队列或当即运行,commandShouldBeQueued() 方法检查做业类是不是 Contracts\Queue\ShouldQueue, 的实例,所以使用此方法,您的做业只有继承了ShouldQueue接口才会被放到队列中。

咱们不会在这篇中深刻dispatchNow,咱们将在深刻worker如何执行做业中详细讨论。 如今咱们来看看总线如何调度你的工做队列:

public function dispatchToQueue($command)
{
    $connection = isset($command->connection) ? $command->connection : null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    } else {
        return $this->pushCommandToQueue($queue, $command);
    }
}

First Laravel checks if a connection property is defined in your job class, using the property you can set which connection Laravel should send the queued job to, if no property was defined null will be used and in such case Laravel will use the default connection.

Using the connection value, Laravel uses a queueResolver closure to build the instance of the queue driver that should be used, this closure is set inside Bus\BusServiceProvider while registering the Dispatcher instance:

首先 Laravel会检查您的做业类中是否认义了connection 属性,使用这个属性能够设置Laravel应该将排队做业发送到哪一个链接,若是未定义任何属性,将使用null属性,在这种状况下Laravel将使用默认链接。

经过设置的链接,Laravel使用一个queueResolver闭包来构建应该使用哪一个队列驱动的实例,当注册调度器实例的时候这个闭包在Bus\BusServiceProvider 中被设置:

function ($connection = null) use ($app) {
    return $app[Contracts\Queue\Factory::class]->connection($connection);
}

Contracts\Queue\Factory is an alias for Queue\QueueManager, so in other words this closure returns an instance of QueueManager and sets the desired connection for the manager to know which driver to use.

Finally the dispatchToQueue method checks if the job class has a queue method, if that's the case the dispatcher will just call this method giving you full control over how the job should be queued, you can select the queue, assign delay, set maximum retries, timeout, etc...

In case no queue method was found, a call to pushCommandToQueue() calls the proper pushmethod on the selected queue driver:

Contracts\Queue\FactoryQueue\QueueManager的别名,换句话说,该闭包返回一个QueueManager实例,并为manager设置所使用的队列驱动须要的链接。

最后,dispatchToQueue方法检查做业类是否具备queue方法,若是调度器调用此方法,能够彻底控制做业排队的方式,您能够选择队列,分配延迟,设置最大重试次数, 超时等

若是没有找到 queue 方法,对 pushCommandToQueue() 的调用将调用所选队列驱动上的push方法:

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    return $queue->push($command);
}

The dispatcher checks for queue and delay properties in your Job class & picks the appropriate queue method based on that.

调度器检查Job类中的 queuedelay ,并根据此选择适当的队列方法。

So I can set the queue, delay, and connection inside the job class?

因此我能够设置工做类中的队列,延迟和链接?

Yes, you can also set a tries and timeout properties and the queue driver will use these values as well, here's how your job class might look like:

是的,您还能够设置一个triestimeout 属性,队列驱动也将使用这些值,如下工做类示例:

class SendInvoiceEmail{
    public $connection = 'default';

    public $queue = 'emails';

    public $delay = 60;

    public $tries = 3;

    public $timeout = 20;
}

Setting job configuration on the fly

即时设置做业配置

Using the dispatch() global helper you can do something like this:

使用 dispatch() 全局帮助方法,您能够执行如下操做:

dispatch(new InvoiceEmail($order))
        ->onConnection('default')
        ->onQueue('emails')
        ->delay(60);

This only works if you use the Bus\Queueable trait in your job class, this trait contains several methods that you may use to set some properties on the job class before dispatching it, for example:

这只有在您在做业类中使用 Bus\Queueable trait时才有效,此trait包含几种方法,您能够在分发做业类以前在做业类上设置一些属性,例如:

public function onQueue($queue)
{
    $this->queue = $queue;

    return $this;
}

But in your example we call the methods on the return of dispatch()!

可是在你的例子中,咱们调用dispatch()的返回方法!

Here's the trick:

这是诀窍:

function dispatch($job)
{
    return new PendingDispatch($job);
}

This is the definition of the dispatch() helper in Foundation/helpers.php, it returns an instance of Bus\PendingDispatch and inside this class we have methods like this:

这是在Foundation/helpers.php中的dispatch()帮助方法的定义,它返回一个Bus\PendingDispatch 的实例,而且在这个类中,咱们有这样的方法:

public function onQueue($queue)
{
    $this->job->onQueue($queue);

    return $this;
}

So when we do dispatch(new JobClass())->onQueue('default'), the onQueue method of PendingDispatch will call the onQueue method on the job class, as we mentioned earlier job classes need to use the Queueable trait for all this to work.

因此当咱们执行 dispatch(new JobClass())->onQueue('default'), 时,PendingDispatchonQueue 方法将调用job类上的 onQueue 方法,如前所述,做业类须要使用全部这些的 Queueable trait来工做。

Then where's the part where the Dispatcher::dispatch method is called?

那么调用Dispatcher::dispatch方法的那部分是哪里?

Once you do dispatch(new JobClass())->onQueue('default') you'll have the job instance ready for dispatching, the actual work happens inside PendingDispatch::__destruct():

一旦你执行了 dispatch(new JobClass())->onQueue('default'),你将让做业实例准备好进行调度,实际的工做发生在 PendingDispatch::__destruct()中:

public function __destruct()
{
    app(Dispatcher::class)->dispatch($this->job);
}

This method, when called, will resolve an instance of Dispatcher from the container and call the dispatch() method on it. A __destruct() is a PHP magic method that's called when all references to the object no longer exist or when the script terminates, and since we don't store a reference to the PendingDispatch instance anywhere the __destruct method will be called immediately:

调用此方法时,将从容器中解析 Dispatcher 的一个实例,而后调用它的dispatch()方法。 destruct()是一种PHP魔术方法,当对对象的全部引用再也不存在或脚本终止时,都会调用,由于咱们不会当即在 __destruct方法中存储对PendingDispatch 实例的引用,

// Here the destructor will be called rightaway
dispatch(new JobClass())->onQueue('default');

// 若是咱们调用unset($temporaryVariable),那么析构函数将被调用
// 或脚本完成执行时。
$temporaryVariable = dispatch(new JobClass())->onQueue('default');

Using the Dispatchable trait

使用可调度的特征

You can use the Bus\Dispatchable trait on your job class to be able to dispatch your jobs like this:

您能够使用工做类上的 Bus\Dispatchable trait来调度您的工做,以下所示:

(new InvoiceEmail($order))->dispatch();

Here's how the dispatch method of Dispatchable looks like:

调度方法 Dispatchable看起来像这样:

public static function dispatch()
{
    return new PendingDispatch(new static(...func_get_args()));
}

As you can see it uses an instance of PendingDispatch, that means we can do something like:

正如你能够看到它使用一个 PendingDispatch的实例,这意味着咱们能够作一些像这样的事:

(new InvoiceEmail($order))->dispatch()->onQueue('emails');

转载请注明: 转载自Ryan是菜鸟 | LNMP技术栈笔记

若是以为本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文连接地址: 剖析Laravel队列系统--推送做业到队列

相关文章
相关标签/搜索