原文连接:https://blog.breezelin.cn/scheme-redis-task-queue.htmlphp
一个网关服务器就跟快餐店同样,老是但愿客人来得快、去得也快,这样在相同时间内才能够服务更多的客人。若是快餐店的服务员在一个顾客点餐、等餐和结帐时都全程跟陪的话,那么这个服务员大部分时间都是在空闲的等待。应该有专门的服务员负责点餐,专门的服务员负责送餐,专门的服务员负责结帐,这样才能提升效率。一样道理,网关服务器中也须要分工明确。举个例子:
假设有一个申请发送重置密码邮件的网关接口,须知道发送一封邮件可能会花费上好几秒钟,若是网关服务器直接在线上给用户发送重置密码邮件,高并发的状况下就很容易形成网络拥挤。但实际上,网关服务器并不是必定要等待邮件发送成功后才能响应用户,彻底能够先告知用户邮件会发送的,然后再在线下把邮件发送出去(就像快餐店里点餐的服务员跟顾客说先去找位置坐,饭菜作好后会有人给他送过去)。html
那么是谁来把邮件发送出去呢?laravel
为了网关接口可以尽快响应用户请求,无需即时知道结果的耗时操做能够交由任务队列机制来处理。
任务队列机制中包含两种角色,一个是任务生产者,一个是任务消费者,而任务队列是二者之间的纽带:redis
任务队列的总体运行流程是:任务生产者把当前操做的关键信息(后续能够根据这些信息还原出当前操做)抽象出来,好比发送重置密码的邮件,咱们只须要当前用户邮箱和用户名就能够了;任务生产者把任务放进队列,实际就是把任务的关键信息存储起来,这里会用到MySQL、Redis之类数据存储工具,经常使用的是Redis;而任务消费者就不断地从数据库中取出任务信息,逐一执行。数据库
任务生产者的工做是任务分发,通常由线上的网关服务程序执行;任务消费者的工做是任务调度,通常由线下的程序执行,这样即便任务耗时再多,也不阻塞网关服务。服务器
这里主要讨论的是任务调度(任务消费者)的程序设计。网络
<!--more-->并发
假设咱们用Redis列表List存储任务信息,列表键名是queues:default
,任务发布就是往列表queues:default
后追加数据:函数
<?php // PHP伪代码 Redis::rpush('queues:default', serialize($task));
那么任务调度能够这样简单直接的实现:高并发
<?php // PHP伪代码 Class Worker { public function schedule() { while(1) { $seri = Redis::lpop('queues:default'); if($seri) { $task = unserialize($seri); $this->handle($task); continue; } sleep(1); } } public function handle($task) { // do something time-consuming } } $worker = new Worker; $worker->schedule();
上面代码是直接从queues:default
列表中移出第一个任务(lpop),由于handle($task)
函数是一个耗时的操做,过程当中如果遇到什么意外致使了整个程序退出,这个任务可能还没执行完成,但是任务信息已经彻底丢失了。保险起见,对schedule()
函数进行如下修改:
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->handle($task); Redis::lpop('queues:default'); continue; } sleep(1); } } ...
即在任务完成后才将任务信息从列表中移除。
queues:default
列表中的任务都是须要即时执行的,可是有些任务是须要间隔一段时间后或者在某个时间点上执行,那么能够引入一个有序集合,命名为queues:default:delayed
,来存放这些任务。任务发布时须要指明执行的时间点$time
:
<?php // PHP伪代码 Redis::zadd('queues:default:delayed', $time, serialize($task));
任务调度时,若是queues:default
列表已经空了,就从queues:default:delayed
集合中取出到达执行时间的任务放入queues:default
列表中:
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->handle($task); Redis::lpop('queues:default'); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } sleep(1); } } ...
预估任务正常执行所需的最大时间值,如果任务执行超过了这个时间,多是过程当中遇到一些意外,若是任由它继续卡着,那么后面的任务就会没法被执行了。
首先咱们给任务设定一个时限属性timeout
,而后在执行任务前先给进程自己设置一个闹钟信号,timeout
后收到信号说明任务执行超时,须要退出当前进程(用supervisor守护进程时,进程自身退出,supervisor会自动再拉起)。
注意:pcntl_alarm($timeout)
会覆盖以前闹钟信号,而pcntl_alarm(0)
会取消闹钟信号;任务超时后,当前任务放入queues:default:delayed
集合中延时执行,以避免再次阻塞队列。
<?php ... public function schedule() { while(1) { $seri = Redis::lindex('queues:default', 0); if($seri) { $task = unserialize($seri); $this->timeoutHanle($task); $this->handle($task); Redis::lpop('queues:default'); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } pcntl_alarm(0); sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () { $seri = Redis::lpop('queues:default'); Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...
上面代码,直观上没什么问题,可是在多进程并发执行的时候,有些任务可能会被重复执行,是由于没能及时将当前执行的任务从queues:default
列表中移出,其余进程也能够读取到。为了不重复执行的问题,咱们须要引入一个有序集合SortedSet存放正在执行的任务,命名为queues:default:reserved
。
首先任务是从queues:default
列表中直接移出,而后开始执行任务前先把任务放进queues:default:reserved
集合中,任务完成了再从queues:default:reserved
集合中移出。
再结合任务超时,假设一个任务执行时间不可能超过60*60
秒(能够按需调整),在queues:default
列表为空的时候,queues:default:reserved
集合中有任务已经存放超过了60*60
秒,那么有多是某些进程在执行任务是意外退出了,因此把这些任务放到queues:default:delayed
集合中稍后执行。
<?php ... public function schedule() { while(1) { $seri = Redis::lpop('queues:default', 0); if($seri) { Redis::zadd('queues:default:reserved', time()+10, $seri); $task = unserialize($seri); $this->timeoutHanle($task); $this->handle($task); Redis::zrem('queues:default:reserved', $seri); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time()); if($seri_arr) { Redis::rpush('queues:default', $seri_arr); continue; } $seri_arr = Redis::zremrangebyscore('queues:default:reserved', 0, time()-60*60); if($seri_arr) { foreach($seri_arr as $seri) { Redis::zadd('queues:default:delayed', time()+10, $seri); } } sleep(1); } } public function timeoutHanle($task) { $timeout = (int)$task->timeout; if ($timeout > 0) { pcntl_signal(SIGALRM, function () use ($task) { $seri = serialize($task); Redis::zrem('queues:default:reserved', $seri); Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); } ...
以上代码没有检验任务是否执行成功,应该有任务失败的处理机制:好比给任务设定一个最多重试次数属性retry_times
,任务每执行一次retry_times
,任务执行失败时,如果retry_times
等于0,则将任务放入queues:default:failed
列表中不在执行;不然放入放到queues:default:delayed
集合中稍后执行。
以上代码是进程忙时连续执行,闲时休眠一秒,能够按需调整优化。
如果须要在任务执行成功或失败时进行某些操做,能够给任务设定成功操做方法afterSucceeded()
或失败操做方法afterFailed()
,在相应的时候回调。
以上讲述了一个任务调度程序的逐步演变,设计方案很大程度上参考了Laravel Queue。用工具,知其然,知其因此然。