为何使用队列
使用队列的目的通常是:php
- 异步执行
- 出错重试
解释一下:mysql
异步执行: 部分代码执行很耗时, 为了提升响应速度及避免占用过多链接资源, 能够将这部分代码放到队列中异步执行.laravel
Eg. 网站新用户注册后, 须要发送欢迎的邮件, 涉及到网络IO没法控制耗时的这一类就很适合放到队列中来执行.
出错重试: 为了保证一些任务的正常执行, 能够将任务放到队列中执行, 若执行出错则能够延迟一段时间后重试, 直到任务处理成功或出错超过N次后取消执行.面试
Eg. 用户须要绑定手机号, 此时发送短信的接口是依赖第三方, 一个是不肯定耗时, 一个是不肯定调用的成功, 为了保证调用成功, 必然须要在出错后重试
Laravel 中的队列
如下分析默认使用的队列及其配置以下redis
- 默认队列引擎:
redis
经过在redis-cli
中使用monitor
命令查看具体执行的命令语句
- 默认队列名:
default
分发任务
此处以分发 异步通知(class XxxNotification implement ShouldQueue
)为例.sql
在Laravel中发起异步通知时, Laravel 会往redis中的任务队列添加一条新任务shell
redis 执行语句json
redis> RPUSH queues:default { "displayName": "App\\Listeners\\RebateEventListener", "job": "Illuminate\\Queue\\CallQueuedHandler@call", "maxTries": null, "timeout": null, "timeoutAt": null, "data": { "commandName": "Illuminate\\Events\\CallQueuedListener", "command": "O:36:\"Illuminate\\Events\\CallQueuedListener\":7:{s:5:\"class\";s:33:\"App\\Listeners\\RebateEventListener\";s:6:\"method\";s:15:\"onRebateCreated\";s:4:\"data\";a:1:{i:0;O:29:\"App\\Events\\RebateCreatedEvent\":4:{s:11:\"\u0000*\u0000tbkOrder\";O:45:\"Illuminate\\Contracts\\Database\\ModelIdentifier\":3:{s:5:\"class\";s:19:\"App\\Models\\TbkOrder\";s:2:\"id\";i:416;s:10:\"connection\";s:5:\"mysql\";}s:15:\"\u0000*\u0000notifyAdmins\";b:1;s:13:\"\u0000*\u0000manualBind\";b:0;s:6:\"socket\";N;}}s:5:\"tries\";N;s:9:\"timeoutAt\";N;s:7:\"timeout\";N;s:6:\"\u0000*\u0000job\";N;}" }, "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB", "attempts": 0 }
上面的redis语句是将任务信息(json格式) rpush
到 redis 队列 queues:default
的尾部.服务器
任务队列 Worker
Laravel 处理任务队列的进程开启方式: php artisan queue:work
, 为了更好的观察, 这里使用 --once
选项来指定队列中的单一任务进行处理, 具体的更多参数请自行参考文档网络
php artisan queue:work --once --delay=1 --tries=3
上述执行语句参数含义:
--once
仅执行一次任务, 默认是常驻进程一直执行--tries=3
任务出错最多重试3次, 默认是无限制重试--delay=1
任务出错后, 每次延迟1秒后再次执行, 默认是延迟0秒
当 Worker 启动时, 它依次执行以下步骤:
此处仍以默认队列
default
为例讲解, 且
只讲解redis的相关操做
- 从
queues:default:delayed
有序集合中获取能够处理的 "延迟任务", 并rpush
到queue:default
队列的尾部
具体的执行语句:
redis> eval "Lua脚本" 2 queues:default:delayed queues:default 当前时间戳
Lua 脚本内容以下:
-- Get all of the jobs with an expired \"score\"...localval = redis.call('zrangebyscore', KEYS[1],'-inf', ARGV[1])-- If we have values in the array, we will remove them from the first queue-- and add them onto thedestination queue in chunks of 100, which moves-- all of the appropriate jobs onto the destination queue very safely.if(next(val) ~=nil)thenredis.call('zremrangebyrank', KEYS[1],0, #val -1)fori =1, #val,100doredis.call('rpush', KEYS[2],unpack(val, i,math.min(i+99, #val)))endendreturnval
从 queue:default:reserved
有序集合中获取已过时的 "reserved 任务", 并 rpush
到 queue:default
队列的尾部
具体的执行语句:
redis> eval "Lua脚本" 2 queues:default:reserved queues:default 当前时间戳
使用的Lua脚本同步骤 1
从 queue:default
队列中获取(lpop
)一个任务, 增长其 attempts
次数, 并将该任务保存到queu:default:reserved
有序集合中, 该任务的 score
值为 当前时间 + 90(任务执行超时时间)
具体的执行语句:
redis> eval “Lua脚本” 2 queues:default queues:default:reserved 任务超时时间戳
Lua脚本
- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}
- 这里的 90 是根据配置而定:
config('queue.connections.redis.retry_after')
若预计任务耗时太久, 则应增长该数值, 防止任务还在执行时就被重置 - 在成功执行上面获取的任务后, 就将该任务从
queues:default:reserved
队列中移除掉
具体执行语句:ZREM queues:default:reserved "具体任务"
- 若是执行任务失败, 此时分为2种状况:
任务失败次数未达到指定的重试次数阀值
将该任务从 queues:default:reserved
中移除, 并将该任务添加到 queue:default:delayed
有序集合中, score
为该任务下一次执行的时间戳
执行语句:
redis> EVAL "Lua脚本" 2 queues:default:delayed queues:default:reserved "失败的任务" 任务延迟执行的时间戳
Lua脚本
-- Remove the job from the current queue... redis.call('zrem', KEYS[2], ARGV[1]) -- Add the job onto the \"delayed\" queue... redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) return true
若是任务失败次数超过指定的重试阀值
将该任务从 queue:default:reserved
中移除
执行语句:
redis> ZREM queue:default:reserved
注意, 上述使用 Lua 脚本的目的在于操做的原子性, Redis 是单进程单线程模式, 以Lua脚本形式执行命令时能够确保执行脚本的原子性, 而不会有并发问题.
以上内容但愿帮助到你们, 不少PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提高,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货须要的能够免费分享给你们 ,须要戳这里 PHP进阶架构师>>>实战视频、大厂面试文档免费获取