PHP Laravel 队列技巧:Fail、Retry 或者 Delay

当建立队列jobs、监听器或订阅服务器以推送到队列中时,您可能会开始认为,一旦分派,队列工做器决定如何处理您的逻辑就彻底由您本身决定了。php

嗯……并非说你不能从做业内部与队列工做器交互,可是一般状况下,哪怕你作了,也是不必的。laravel

这个神奇的骚操做的出现是由于“InteractsWithQueue”这个trait。.当排队做业正在从队列中拉出, 这个 [CallQueuedListener](https://github.com/laravel/framework/blob/5.8/src/Illuminate/Events/CallQueuedListener.php#L90-L104) 会检查它是否在使用 InteractsWithQueue trait, 若是是的话,框架会将底层的“队列jobs”实例注入到内部。git

这个 “任务” 实例相似于一个包装了真正的 Job 类的驱动,其中包含队列链接和尝试等信息。github

背景
我将以一个转码 Job 为例。 这是一个将广播音频文件转换成192kbps MP3格式的任务。由于这是在自由转码队列中设置的,因此它的做用有限。服务器

 

检查尝试次数
attempts()是被调用的第一个方法, 顾名思义,它返回尝试次数,一个队列 job老是伴随着一个attempt启动。框架

此方法旨在与其余方法一块儿使用 ..., 相似 fail() 或者 release() (delay). 为了便于说明,咱们将通知用户第几回重试: 每次咱们尝试在空闲队列中转换(转换代码)时,咱们都会通知用户咱们正在第几回重试,让他能够选择取消未来的转换(转换代码)。this

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Notifications\PodcastTranscoded;
 8 use Illuminate\Queue\InteractsWithQueue;
 9 use Illuminate\Foundation\Bus\Dispatchable;
10 use App\Notifications\RetyingPodcastTranscode;
11 class TranscodePodcast
12 {
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 /**
15 * Transcoder Instance
16 *
17 * @var \App\Podcast
18 */
19 protected $podcast;
20 /**
21 * 建立一个新的转码podcast实例。
22 *
23 * @param \App\Podcast $podcast
24 * @return void
25 */
26 public function __construct(Podcast $podcast)
27 {
28 $this->podcast = $podcast;
29 }
30 /**
31 * 执行队列job.
32 *
33 * @param \Transcoder\Transcoder $podcast
34 * @return void
35 */
36 public function handle(Transcoder $transcoder)
37 {
38 // 告诉用户咱们第几回重试
39 if ($this->attempts() > 1) {
40 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
41 }
42 $transcoded = $this->transcoder->setFile($event->podcast)
43 ->format('mp3')
44 ->bitrate(192)
45 ->start();
46 
47 
48 // 将转码podcast与原始podcast关联
49 $this->podcast->transcode()->associate($transcoded);
50 
51 // 通知podcast的发布者他的podcast已经准备好了
52 
53 $this->publisher->notify(new PodcastTranscoded($this->podcast));
54 }
55 }

 


告诉用户咱们将在第几回时重试某些内容,这在逻辑预先失败时颇有用,让用户(或开发人员)检查出了什么问题,但固然您能够作更多的事情。spa

就我我的而言,我喜欢在“Job做业”失败后再这样作,若是还有重试时间,告诉他咱们稍后会重试固然,这个例子只是为了举例说明。code

删除做业队列 Job
第二个方法就是 delete(). 跟你猜测的同样,您能够从队列中删除当前的“队列 Job”。 当队列 Job或侦听器因为多种缘由排队后不该处理时,这将会很方便,例如,考虑一下这个场景:在转码发生以前,上传podcast的发布者因为任何缘由(好比TOS冲突)被停用,咱们应该不处理podcast。orm

咱们将在前面的示例中添加该代码:

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Notifications\PodcastTranscoded;
 8 use Illuminate\Queue\InteractsWithQueue;
 9 use Illuminate\Foundation\Bus\Dispatchable;
10 use App\Notifications\RetyingPodcastTranscode;
11 class TranscodePodcast
12 {
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 /**
15 * Transcoder Instance
16 *
17 * @var \App\Podcast
18 */
19 protected $podcast;
20 /**
21 * 建立一个新的转码podcast实例。
22 *
23 * @param \App\Podcast $podcast
24 * @return void
25 */
26 public function __construct(Podcast $podcast)
27 {
28 $this->podcast = $podcast;
29 }
30 /**
31 * 执行队列 job.
32 *
33 * @param \Transcoder\Transcoder $podcast
34 * @return void
35 */
36 public function handle(Transcoder $transcoder)
37 {
38 // 若是发布服务器已被停用,请删除此队列job
39 if ($this->podcast->publisher->isDeactivated()) {
40 $this->delete();
41 }
42 // 告诉用户咱们第几回重试
43 if ($this->attempts() > 1) {
44 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
45 }
46 $transcoded = $this->transcoder->setFile($event->podcast)
47 ->format('mp3')
48 ->bitrate(192)
49 ->start();
50 
51 // 将转码podcast与原始podcast关联
52 $this->podcast->transcode()->associate($transcoded);
53 
54 // 通知podcast的发布者他的podcast已经准备好了
55 $this->publisher->notify(new PodcastTranscoded($this->podcast));
56 }
57 }

 


若是须要删除可能已删除的模型上的做业,则可能须要 设置 [$deleteWhenMissingModels](https://laravel.com/docs/5.8/queues#ignoring-missing-models) 为真 t避免处理不存在的东西。

失败的队列job
当您须要控制人为破坏逻辑时,这很是很是方便, 由于使用空的“return”语句会将“Job”标记为已成功完成。您能够强制使排队的做业失败,但愿出现异常,容许处理程序在可能的状况下稍后重试。

这使您在做业失败时能够更好地控制在任何状况下,也可使用“failed()”方法, 它容许你 失败后执行任何清洁操纵, 好比通知用户或者删除一些东西。

在此示例中,若是因为任何缘由(如 CDN 关闭时)没法从存储中检索podcast ,则做业将失败,并引起自定义异常。

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Exceptions\PodcastUnretrievable;
 8 use App\Notifications\PodcastTranscoded;
 9 use Illuminate\Queue\InteractsWithQueue;
10 use Illuminate\Foundation\Bus\Dispatchable;
11 use App\Notifications\RetyingPodcastTranscode;
12 class TranscodePodcast
13 {
14 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
15 /**
16 * 转码器实例
17 *
18 * @var \App\Podcast
19 */
20 protected $podcast;
21 /**
22 * 建立一个新的转码Podcast实例。
23 *
24 * @param \App\Podcast $podcast
25 * @return void
26 */
27 public function __construct(Podcast $podcast)
28 {
29 $this->podcast = $podcast;
30 }
31 /**
32 * 执行队列 job.
33 *
34 * @param \Transcoder\Transcoder $podcast
35 * @return void
36 */
37 public function handle(Transcoder $transcoder)
38 {
39 // 若是发布服务器已被停用,请删除此队列job
40 if ($this->podcast->publisher->isDeactivated()) {
41 $this->delete();
42 }
43 //若是podcast不能从storage存储中检索,咱们就会失败。
44 if ($this->podcast->fileDoesntExists()) {
45 $this->fail(new PodcastUnretrievable($this->podcast));
46 }
47 // 告诉用户咱们第几回重试
48 if ($this->attempts() > 1) {
49 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
50 }
51 
52 $transcoded = $this->transcoder->setFile($event->podcast)
53 ->format('mp3')
54 ->bitrate(192)
55 ->start();
56 
57 // 将转码podcast与原始podcast关联
58 $this->podcast->transcode()->associate($transcoded);
59 
60 // 通知podcast的发布者他的podcast已经准备好了
61 $this->publisher->notify(new PodcastTranscoded($this->podcast));
62 }
63 }

 


如今,进入最后的方法。

释放(延迟)队列job
这多是trait性状的最有用的方法, 由于它可让你在将来进一步推进这项队列job. 此方法用于 队列job速率限制.

除了速率限制以外,您还能够在某些不可用但但愿在不久的未来使用它的状况下使用它同时,避免先发制人地失败。

在最后一个示例中,咱们将延迟转码以备稍后使用:若是转码器正在大量使用,咱们将延迟转码5分钟直到负载下降。

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Exceptions\PodcastUnretrievable;
 8 use App\Notifications\PodcastTranscoded;
 9 use Illuminate\Queue\InteractsWithQueue;
10 use App\Notifications\TranscoderHighUsage;
11 use Illuminate\Foundation\Bus\Dispatchable;
12 use App\Notifications\RetyingPodcastTranscode;
13 class TranscodePodcast
14 {
15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
16 /**
17 * Transcoder Instance
18 *
19 * @var \App\Podcast
20 */
21 protected $podcast;
22 /**
23 * 建立一个新的转码podcast实例。
24 *
25 * @param \App\Podcast $podcast
26 * @return void
27 */
28 public function __construct(Podcast $podcast)
29 {
30 $this->podcast = $podcast;
31 }
32 /**
33 * 执行队列job.
34 *
35 * @param \Transcoder\Transcoder $podcast
36 * @return void
37 */
38 public function handle(Transcoder $transcoder)
39 {
40 // 若是发布服务器已被停用,请删除此队列job
41 if ($this->podcast->publisher->isDeactivated()) {
42 $this->delete();
43 }
44 // 若是podcast不能从storage存储中检索,咱们就会失败。
45 if ($this->podcast->fileDoesntExists()) {
46 $this->fail(new PodcastUnretrievable($this->podcast));
47 }
48 
49 // 若是转码器使用率很高,咱们将
50 // t延迟转码5分钟. 不然咱们可能会有拖延转码器进程的危险 
51 // 它会把全部的转码子进程都记录下来。
52 if ($transcoder->getLoad()->isHigh()) {
53 $delay = 60 * 5;
54 $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay));
55 $this->release($delay);
56 }
57 // 告诉用户咱们第几回重试
58 if ($this->attempts() > 1) {
59 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
60 }
61 
62 $transcoded = $this->transcoder->setFile($event->podcast)
63 ->format('mp3')
64 ->bitrate(192)
65 ->start();
66 
67 // 将转码podcast与原始podcast关联
68 $this->podcast->transcode()->associate($transcoded);
69 
70 // 通知podcast的发布者他的podcast已经准备好了
71 $this->publisher->notify(new PodcastTranscoded($this->podcast));
72 }
73 }

 

咱们可使用一些特殊方法,例如,得到分配给转码器的一些时隙,若是转码器时隙已满,则延迟做业。 在排队的工做中,你能作的就只有这些了。排队愉快。

相关文章
相关标签/搜索