beanstalked
是一个简单的快速的通用的轻量级内存队列,能够实现百万级任务处理。php
任务和管道git
任务的状态转换github
ready:任务已经准备好了,能够给消费者使用。
delay:任务放入管道的时候设置了延迟时间。
reserve:消费者把任务读取出来
buried:任务先放在一边,之后还会用
delete:任务从队列删除数据库
地址:https://github.com/kr/beanstalkd服务器
能够经过源代码安装或者包管理工具(brew,apt-get)安装,安装完成后输入beanstalkd会由相应的命令.异步
image.png工具
首先使用下面的命令绑定本地地址和端口号ui
beanstalkd -l 127.0.0.1 -p 11300 &
可使用pheantalk
php类操做beanstalkd
,pheantalk
是对beanstalkd
命令的封装。能够经过该类实现对任务的操做。spa
https://github.com/pda/pheanstalk命令行
require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); //查看beanstalk的当前信息命令行的封装 print_r($pheanstalk->stats());
beanstalkd当前状态.png
好比查看某个任务的信息
print_r($pheanstalk->putInTube('mytube','job',1000)); $job = $pheanstalk->watch('mytube')->reserve(); print_r( $pheanstalk->statsJob($job));
job的信息.png
还有其余的操做以下
require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; //如今命令行输入 beanstalkd -l 127.0.0.1 -p 11300 & 开启 $pheanstalk = new Pheanstalk('127.0.0.1',11300); //查看beanstalk的当前信息 // 能够经过命令行对管道进行管理,本php类对管道的管理就是对beanstalkd命令行的封装 print_r($pheanstalk->stats()); // 当前的管道 print_r($pheanstalk->listTubes()); // 查看默认的管道 print_r($pheanstalk->statsTube('default')); //在mytube 里面放入任务 one,使用默认的优先级,延迟重发时间,超时重发时间, $pheanstalk->useTube('mytube')->put('one'); //从管道取出任务 $job = $pheanstalk->watch('mytube')->reserve(); //根据job的编号获取信息 $job = $pheanstalk->peek(1); $pheanstalk->delete($job);//删除任务 $pheanstalk->watch('mytube')->delete($job); //会把任务从新放入管道,并把任务设置为ready状态 $pheanstalk->release($job); //把任务放在一边暂时不处理,条件成熟了在读取出来 $pheanstalk->bury($job); $pheanstalk->peekBuried('mytube');// 获取处理bury状态的任务 $pheanstalk->kickJob($job);//会把job转变成ready状态 $pheanstalk->kick(200);//把job编号小于200的变成ready状态 //--- 获取某个状态的任务 $pheanstalk->peekDelayed(); $pheanstalk->peekReady(); $pheanstalk->peekBuried(); $pheanstalk->pauseTube('mytube',10);//阻塞整个管道时间
生产者将任务放入队列
require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk("127.0.0.1",11300); $pheanstalk->useTube('mytube')->put('one',1024,10,3); $pheanstalk->useTube('mytube')->put('two',1023); $pheanstalk->useTube('mytube')->put('three',1025); print_r($pheanstalk->stats());
消费者处理任务
require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; //终端等待表示没有生产者产生数据 $pheanstalk = new Pheanstalk("127.0.0.1",11300); //reserve能够设置阻塞时间,不设置会一直等待,watch 能够同时监听两个管道, $job = $pheanstalk->watch('mytube')->watch('default')->reserve(); // reserver是获取ready的job $pheanstalk->ignore('default');//忽略管道 //处理代码 if ($job->getData() == 'one') { sleep(2);// 超时重回队列3秒,睡眠两秒,delete以前的代码还能执行1秒,若是想在加 $pheanstalk->watch('mytube')->touch($job);//续命 }
具体对队列和任务的操做能够结合实际逻辑和设置任务和管道的状态。
做者:鸿雁长飞光不度 连接:https://www.jianshu.com/p/82c4c6fee450 来源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。