Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想经过后台异步执行耗时的任务来下降高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,如今有PostRank大规模部署和使用,天天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是一样的风格,因此使用过memcached的用户会以为Beanstalkd似曾相识。php
高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。服务器
Beanstalkd设计里面的核心概念:composer
◆ job异步
一个须要异步处理的任务,是Beanstalkd中的基本单元,须要放在一个tube中。分布式
◆ tubememcached
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操做的对象。性能
◆ producer测试
Job的生产者,经过put命令来将一个job放到一个tube中。ui
◆ consumerspa
Job的消费者,经过reserve/release/bury/delete命令来获取job或改变job的状态。
Beanstalkd中一个job的生命周期。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,若是选择延迟put,job就先到DELAYED状态,等待时间事后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其余的consumer就不能再操做该job。当consumer完成该job后,能够选择delete, release或者bury操做;delete以后,job从系统消亡,以后不能再获取;release操做能够从新把该job状态迁移回READY(也能够延迟该状态迁移操做),使其余的consumer能够继续获取和执行该job;有意思的是bury操做,能够把该job休眠,等到须要的时候,再将休眠的job kick回READY状态,也能够delete BURIED状态的job。正是有这些有趣的操做和状态,才能够基于此作出不少意思的应用,好比要实现一个循环队列,就能够将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
Beanstalkd基于的源码安装和使用很简单,在此略过。这里重点介绍一下其几个很nice的特性。
◆ 优先级
支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。
◆ 持久化
能够经过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时能够经过读取binlog来恢复以前的job及状态。
◆ 分布式容错
分布式设计和Memcached相似,beanstalkd各个server之间并不知道彼此的存在,都是经过client来实现分布式以及根据tube名称去特定server获取job。
◆ 超时控制
为了防止某个consumer长时间占用任务但不能处理的状况,Beanstalkd为reserve操做设置了timeout时间,若是该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其余consumer执行。
Beanstalkd不足之处:在使用中发现一个Beanstalkd尚无提供删除一个tube的操做,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。
下面,咱们经过php类Pheanstalk来进行简单入门。
1. composer安装
切换到项目目录,使用composer进行加载安装:composer require pda/pheanstalk
(如出现报错,请更新composer在下载:composer update)
2. 链接Beanstalk
--- 生产者 ---
关于如何安装Beanstalk,在这里就不进行说明了,操做很简单,请自行在执行系统安装好,才能使用。
做者是使用 4.0 版本的类库
链接服务
注意的是,在4.0以前的版本,是直接使用 new Pheanstalk($host, $port, ...) 方式链接的,4.0版本使用 create 静态方法进行链接。
链接成功,咱们来完成一个简单测试,“像管道demo中,put进值”,如
查看demo管道信息
这样,生产者就完成任务放进管道的操做。
3. 如何处理管道任务
--- 消费者 ---
咱们建立一个消费者的方法,经过while来处理管道每次产生的任务(这是模拟,实际生产中,下一篇文章会详细说明),如:
如下的代码是监听管道demo,并将任务取出来处理
当咱们在终端执行时,我看到每次处理任务的数据,如:
整个生产者到消费者的流程就是这样,至于具体业务,须要根据自身的状况进行处理。
入门篇(一)完!
如下是整理了一些经常使用方法,仅供参考(来源于网上,感谢网友的提供)
//----------------------------------------维护类----------------------------------
//1.查看目前pheanStalkd状态信息
//print_r($ph->stats());
//2.显示目前存在的管道
//print_r($ph->listTubes());
//3.查看NewUsers管道的信息
//$ph->useTube('NewUsers')->put('test');
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一个up任务
//print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息
//6.查看指定管道中某一个任务的状况
//$job = $ph->watch('NewUsers')->reserve(); //5.从管道中取出任务(消费)
//print_r($ph->statsJob($job)); //6.查看指定管道中某一个任务的状况
//7.查看任务id为1的任务详情
//$job = $ph->peek(1);7.直接取出任务id为1的任务 [注:beanstalkd中全部任务的id都具备惟一性]
//print_r($ph->statsJob($job));//查看任务id为1的任务详情
//----------------------------------------生产类--------------------------------------
////第一种 put()
//$tube = $ph->useTube('NewUsers');//链接NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道添加任务four,并返回结果
//注: put()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
////第二种 putInTube() [注: putInTube()就是对useTube()和put()的封装]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任务three
////注: putInTube()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
//print_r($res);//返回任务id
//print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的详细状况
//---------------------------------------消费类--------------------------------------
// 1.watch 监听NewUsers管道 [ 注: watch()一样能够监听多个管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//打印已经监听的管道
// 2.watch 监听多个管道
//$tube = $ph->watch('NewUsers')
// ->watch('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道
// 3.ignore 监听NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
// ->ignore('default');
//print_r($ph->listTubesWatched());//打印已经监听的管道
// 4.reserve 监听NewUsers管道,而且取出任务
//$job = $ph->watch('NewUsers')
// ->reserve();
//
////注reserve()有1个参数,阻塞的时间,过了阻塞时间,无论有没有东西,直接返回
//
//var_dump($job);//打印已经取出的任务
//$ph->delete($job);//删除已经取出的任务
// 5.putInTube/put 向NewUsers管道写入任务 [ 注:此为生产者方法,放到此处是为了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道详细信息
// 6.release 将取出的任务放回ready状态,还有2个参数(优先级和延迟)
//$job = $ph->watch('NewUsers')->reserve();//6.监听NewUsers管道,并取出任务
//if (true) {
// sleep(30);
// $ph->release($job);//6.将任务取出以后,停留30秒,而后将任务状态从新变为ready
//} else {
// $ph->delete($job);
//}
// 7.bury (预留) 将任务取出以后,发现后面执行的逻辑不成熟(好比发邮件,忽然发现邮件服务器挂掉了),
//或者说还不能执行后面的逻辑,须要把任务先封存起来,等待时机成熟了,再拿出这个任务进行消费
//$job = $ph->watch('NewUsers')->reserve();//取出任务
//$ph->bury($job);//取出任务后,将任务放到一边(预留)
// 8.peekBuried() 将处在bury状态的任务读取出来
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//var_dump($ph->statsJob($job));//打印任务状态(此时任务状态应该是bury)
// 9.kickJob() 将处在bury任务状态的任务转化为ready状态
//$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
//$ph->kickJob($job);
// 10.kick() 将处在bury任务状态的任务转化为ready状态,有第二个参数int, 批量将任务id小于此数值的任务转化为ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任务id小于65,而且任务状态处于bury的任务所有转化为ready
// 11.peekReady() 将管道中处于ready状态的任务读出来
//$job = $ph->peekReady('NewUser');//将NewUser管道中处于ready状态的任务读取出来
//var_dump($job);
//$ph->delete($job);
// 12.peekDelay() 将管道中全部处于delay状态的任务读取出来
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);
// 13.pauseTube() 对整个管道进行延迟设置,让管道处于延迟状态
//$ph->pauseTube('NewUser',10);//设置管道NewUser延迟时间为10s
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);
// 14.resumeTube() 恢复管道,让管道处于不延迟状态,当即被消费
//$ph->resumeTube('NewUser');//取消管道NewUser的延迟状态,变为当即读取
//$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
//var_dump($job);
// 15.touch() 让任务从新计算任务超时重发ttr时间,至关于给任务延长寿命