Beanstalkd
是一个轻量级的内存型队列,利用了和 Memcache
相似的协议。依赖 libevent
单线程事件分发机制, 能够部署多个实例,可是高并发支持仍是不太友好;php
管道html
即有名称的任务队列,一个服务器有一个或者多个管道,用来储存统一类型的 job
。每一个管道由一个就绪队列与延迟队列组成。每一个job全部的状态迁移在一个管道中完成。消费者能够监控感兴趣的管道,经过发送 watch
指令。消费者也能够取消监控 tube
,经过发送 ignore
命令。经过 list
命令返回全部监控的管道,当客户端预订一个job
,此 job
可能来自任何一个它监控的管道。git
当一个客户端链接上服务器时,客户端监控的tube
默认为 default
,若是客户端提交 job
时,没有使用 use
命令,那么这些 job
就存于名为 default
的 tube
中。github
管道按需求建立,不管他们在地方被引用到。若是一个管道变为空和没有任何客户端引用,它将会被自动删除。redis
Job服务器
任务在队里之中被称做 Job
. 一个 Job
在 Beanstalkd
中有如下的生命周期:并发
put
将一个任务放置进 tube
中deayed
这个任务如今再等待中,须要若干秒才能准备完毕【延迟队列】ready
这个任务已经准备好了,能够消费了。全部的消费都是要从取 ready
状态的 job
reserved
这个任务已经被消费者消费release
这个 job
执行失败了,把它放进 ready
状态队列中。让其余队列执行bury
这个 job
执行失败了,但不但愿其余队列执行,先把它埋起来在 Centos7
上经过命令 yum -y install beanstalkd --enablerepo=epel
; 负载均衡
其余系统的安装在 官网 上查看dom
beanstalkd -v
常见启动以下:异步
beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs
启动后对 beanstalkd
的操做可使用 telnet
,好比 telnet 127.0.0.1 11300
。而后即可以执行 beanstalkd
的各命令,如 stats
查看信息,use
, put
, watch
等等。
如下示例代码是基于 pda/pheanstalk
这个第三方扩展写的;
<?php require './vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1'); # 查看 beanstalkd 当前的状态信息 var_dump($pheanstalk->stats());
输出的信息为如下 :
'current-jobs-urgent' => '0', // 优先级小于1024状态为ready的job数量 'current-jobs-ready' => '0', // 状态为ready的job数量 'current-jobs-reserved' => '0', // 状态为reserved的job数量 'current-jobs-delayed' => '0', // 状态为delayed的job数量 'current-jobs-buried' => '0', // 状态为buried的job数量 'cmd-put' => '0', // 总共执行put指令的次数 'cmd-peek' => '0', // 总共执行peek指令的次数 'cmd-peek-ready' => '0', // 总共执行peek-ready指令的次数 'cmd-peek-delayed' => '0', // 总共执行peek-delayed指令的次数 'cmd-peek-buried' => '0', // 总共执行peek-buried指令的次数 'cmd-reserve' => '0', // 总共执行reserve指令的次数 'cmd-reserve-with-timeout' => '0', 'cmd-delete' => '0', 'cmd-release' => '0', 'cmd-use' => '0', // 总共执行use指令的次数 'cmd-watch' => '0', // 总共执行watch指令的次数 'cmd-ignore' => '0', 'cmd-bury' => '0', 'cmd-kick' => '0', 'cmd-touch' => '0', 'cmd-stats' => '2', 'cmd-stats-job' => '0', 'cmd-stats-tube' => '0', 'cmd-list-tubes' => '0', 'cmd-list-tube-used' => '0', 'cmd-list-tubes-watched' => '0', 'cmd-pause-tube' => '0', 'job-timeouts' => '0', // 全部超时的job的总共数量 'total-jobs' => '0', // 建立的全部job数量 'max-job-size' => '65535', // job的数据部分最大长度 'current-tubes' => '1', // 当前存在的tube数量 'current-connections' => '1', // 当前打开的链接数 'current-producers' => '0', // 当前全部的打开的链接中至少执行一次put指令的链接数量 'current-workers' => '0', // 当前全部的打开的链接中至少执行一次reserve指令的链接数量 'current-waiting' => '0', // 当前全部的打开的链接中执行reserve指令可是未响应的链接数量 'total-connections' => '2', // 总共处理的链接数 'pid' => '3609', // 服务器进程的id 'version' => '1.10', // 服务器版本号 'rusage-utime' => '0.000000', // 进程总共占用的用户CPU时间 'rusage-stime' => '0.001478', // 进程总共占用的系统CPU时间 'uptime' => '12031', // 服务器进程运行的秒数 'binlog-oldest-index' => '2', // 开始储存jobs的binlog索引号 'binlog-current-index' => '2', // 当前储存jobs的binlog索引号 'binlog-records-migrated' => '0', 'binlog-records-written' => '0', // 累积写入的记录数 'binlog-max-size' => '10485760', // binlog的最大容量 'id' => '37604ac4305d3b16', // 一个随机字符串,在beanstalkd进程启动时产生 'hostname' => 'localhost.localdomain',
var_export($pheanstalk->statsJob($job_4));
执行结果以下:
'id' => '1', // job id 'tube' => 'test', // job 所在的管道 'state' => 'reserved', // job 当前的状态 'pri' => '1024', // job 的优先级 'age' => '5222', // 自 job 建立时间为止 单位:秒 'delay' => '0', 'ttr' => '60', // time to run 'time-left' => '58', // 仅在job状态为reserved或者delayed时有意义,当job状态为reserved时表示剩余的超时时间 'file' => '2', // 表示包含此job的binlog序号,若是没有开启它将为0 'reserves' => '10', // 表示job被reserved的次数 'timeouts' => '0', // 表示job处理的超时时间 'releases' => '1', // 表示job被released的次数 'buries' => '0', // 表示job被buried的次数 'kicks' => '0', // 表示job被kiced的次数
// 查看有多少个tube //var_export($pheanstalk->listTubes()); // 在 put 以前预申明要使用的管道,若是管道不存在,即建立 //$pheanstalk->useTube('test'); //设置要监听的tube $pheanstalk->watch('test'); //取消对默认tube的监听,能够省略 $pheanstalk->ignore('default'); //查看监听的tube列表 var_export($pheanstalk->listTubesWatched()); //查看test的tube当前的状态 var_export($pheanstalk->statsTube('test'));
// put 任务 方式一; 返回新 job 的任务标识,整型值; $pheanstalk->useTube('test')->put( 'hello, beanstalk, i am job 1', // 任务内容 23, // 任务的优先级, 默认为 1024 0, // 不等待直接放到ready队列中. 60 // 处理任务的时间(单位为秒) ); // put 任务 方式二; 返回新 job 的任务标识,整型值; $pheanstalk->putInTube( 'test', // 管道名称 'hello, beanstalk, i am job 2', // 任务内容 23, // 任务的优先级, 默认为 1024 0, // 不等待直接放到ready队列中. 如值为 60 表示 60秒; 60 // 处理任务的时间(单位为秒) ); // 给管道里全部新任务设置延迟 $pheanstalk->pauseTube('test', 30); // 取消管道延迟 $pheanstalk->resumeTube('test');
此处介绍几个概念:
job
) 能够有 0~2^32 个优先级, 0 表明最高优先级。 beanstalkd
采用最大最小堆 (Min-max heap
) 处理任务优先级排序, 任什么时候刻调用 reserve
命令的消费者老是能拿到当前优先级最高的任务, 时间复杂度为 O(logn)
.ttr
(time to run
, 预设的执行时间)TTR
(time-to-run
) 时间内发送 delete
/ release
/ bury
改变任务状态;不然 Beanstalkd
会认为消息处理失败,状态改成 ready
,而后把任务交给另外的消费者节点执行。若是消费者预计在 TTR (time-to-run)
时间内没法完成任务, 也能够发送 touch
命令, 它的做用是让 Beanstalkd
重置该任务的 time-left
剩余执行时间.正常的获取和执行 Job
流程
// 获取 test 管道的 job $job = $pheanstalk->watch('test')->ignore('default')->reserve(); $job_2 = $pheanstalk->reserveFromTube('test'); $job_3 = $pheanstalk->peekReady('test'); // 若是知道 job 的 id, 也能够 $job_4 = $pheanstalk->peek($id); // var_export($pheanstalk->statsJob($job_4)); // 获取下一个延迟时间最短 的 job $job_5 = $pheanstalk->peekDelayed('test'); // do job .... 这里省略异常的考虑 // 释听任务 让别人执行 $pheanstalk->release($job); // 或成功执行完,则删除任务 //$pheanstalk->delete($job); // 将任务埋起来,预留 //$pheanstalk->bury($job);
处理 buried
状态的 Job
// 获取下一个被埋藏的 job $job = $pheanstalk->peekBuried('test'); // 将任务状态从 buried 改成 ready //$pheanstalk->kickJob($job); // 批量将指定数目的任务从 buried 改成 ready $pheanstalk->kick(10);
若是是有优先级/延时任务的需求的话, beanstalkd
是个不错选择。若是做为常规的先进先出队列来讲,以性能和稳定来讲 kafka/redis
会是更好的选择,redis
自己也是全内存,队列操做 O(1)
, 而 benastalkd
是 log(n)
。redis 也更加成熟和稳定,同时支持本地持久化和主从。
另外有一个加分项是 beanstalkd
做者自己比较活跃,以前提了一个 pr`, 当天就获得回馈,这也是做为开源项目选择一个很重要的因素。