以前在微博上调查过你们正在使用的分布式内存队列系统,反馈有Memcacheq,Fqueue, RabbitMQ, Beanstalkd以及linkedin的kafka。RabbitMQ使用比较普遍,Beanstalkd是后起之秀。Beanstalkd之于RabbitMQ,就比如Nginx之于Apache,Varnish之于Squid。后面在项目中使用Beanstalkd的过程当中,更发现其简单、轻量级、高性能、易使用等特色,以及优先级、多队列、持久化、分布式容错、超时控制等特性。下面就简单介绍一下Beanstalkd,不足之处请你们指正。php
设计思想html
高性能离不开异步,异步离不开队列,而其内部都是Producer-Comsumer模式的原理。git
图1 Producer-Comsumer模式github
应用ubuntu
Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想经过后台异步执行耗时的任务来下降高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,如今有PostRank大规模部署和使用,天天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是一样的风格,因此使用过memcached的用户会以为Beanstalkd似曾相识。centos
核心概念异步
Beanstalkd设计里面的核心概念:分布式
◆ jobmemcached
一个须要异步处理的任务,是Beanstalkd中的基本单元,须要放在一个tube中。性能
◆ tube
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操做的对象。
◆ producer
Job的生产者,经过put命令来将一个job放到一个tube中。
◆ consumer
Job的消费者,经过reserve/release/bury/delete命令来获取job或改变job的状态。
Beanstalkd中一个job的生命周期如图2所示。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,若是选择延迟put,job就先到DELAYED状态,等待时间事后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其余的consumer就不能再操做该job。当consumer完成该job后,能够选择delete, release或者bury操做;delete以后,job从系统消亡,以后不能再获取;release操做能够从新把该job状态迁移回READY(也能够延迟该状态迁移操做),使其余的consumer能够继续获取和执行该job;有意思的是bury操做,能够把该job休眠,等到须要的时候,再将休眠的job kick回READY状态,也能够delete BURIED状态的job。正是有这些有趣的操做和状态,才能够基于此作出不少意思的应用,好比要实现一个循环队列,就能够将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
图2 Beanstalkd中job的生命周期
特性
Beanstalkd基于的源码安装和使用很简单,在此略过。这里重点介绍一下其几个很nice的特性。
◆ 优先级
支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。
◆ 持久化
能够经过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时能够经过读取binlog来恢复以前的job及状态。
◆ 分布式容错
分布式设计和Memcached相似,beanstalkd各个server之间并不知道彼此的存在,都是经过client来实现分布式以及根据tube名称去特定server获取job。
◆ 超时控制
为了防止某个consumer长时间占用任务但不能处理的状况,Beanstalkd为reserve操做设置了timeout时间,若是该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其余consumer执行。
不足
在使用中发现一个Beanstalkd尚无提供删除一个tube的操做,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。
后续工做
1.介绍Beanstalkd的命令和使用
2. 翻译Beanstalkd协议
3. 分析Beanstalkd源码
原文:http://rdc.taobao.com/blog/cs/?p=1201
最近在作一个项目,须要用户在提交相关信息后,分析信息内容,而后将分析结果推送到相关的用户的信息模块中,用到了beanstalk这个队列系统。
beanstalkd介绍:
Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想经过后台异步执行耗时的任务来下降高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,如今有PostRank大规模部署和使用,天天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是一样的风格,因此使用过memcached的用户会以为Beanstalkd似曾相识。
Beanstalkd中一个job的生命周期如图所示。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,若是选择延迟put,job就先到DELAYED状态,等待时间事后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其余的consumer就不能再操做该job。当consumer完成该job后,能够选择delete, release或者bury操做;delete以后,job从系统消亡,以后不能再获取;release操做能够从新把该job状态迁移回READY(也能够延迟该状态迁移操做),使其余的consumer能够继续获取和执行该job;有意思的是bury操做,能够把该job休眠,等到须要的时候,再将休眠的job kick回READY状态,也能够delete BURIED状态的job。正是有这些有趣的操做和状态,才能够基于此作出不少意思的应用,好比要实现一个循环队列,就能够将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
特性:
为了防止某个consumer长时间占用任务但不能处理的状况,Beanstalkd为reserve操做设置了timeout时间,若是该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其余consumer执行。
下载:
服务端:http://kr.github.io/beanstalkd/download.html
客户端:https://github.com/kr/beanstalkd/wiki/client-libraries
安装:
ubuntu
sudo apt-get install beanstalkd
centos
yum install beanstalkd
源码安装
tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.9.tar.gz cd beanstalkd make install PERFIX=/usr/bin/beanstalkd
后台启动:
beanstalkd -l 地址 -p 端口号 -z 最大的任务大小(byte) -c &
若是是外部客户端链接,ip地址要写外网地址,这样才能链接上
启动选项
-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for “always fsync”)
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help
php客户端的使用:我使用的是这个简易的类 https://github.com/davidpersson/beanstalk
发送任务:
<?php //发送任务 require_once 'src/Socket/Beanstalk.php'; //实例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否长链接 'host' => 'ip地址', 'port' => 11600, //端口号默认11300 'timeout' => 3 //链接超时时间 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //选择使用的tube $beanstalk->useTube('test'); //往tube中增长数据 $put = $beanstalk->put( 23, // 任务的优先级. 0, // 不等待直接放到ready队列中. 60, // 处理任务的时间. 'hello, beanstalk' // 任务内容 ); if (!$put) { exit('commit job fail'); } $beanstalk->disconnect();
处理任务:
<?php require_once 'src/Socket/Beanstalk.php'; //实例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否长链接 'host' => 'ip地址', 'port' => 11600, //端口号默认11300 'timeout' => 3 //链接超时时间 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //查看beanstalkd状态 //var_dump($beanstalk->stats()); //查看有多少个tube //var_dump($beanstalk->listTubes()); $beanstalk->useTube('test'); //设置要监听的tube $beanstalk->watch('test'); //取消对默认tube的监听,能够省略 $beanstalk->ignore('default'); //查看监听的tube列表 //var_dump($beanstalk->listTubesWatched()); //查看test的tube当前的状态 //var_dump($beanstalk->statsTube('test')); while (true) { //获取任务,此为阻塞获取,直到获取有用的任务为止 $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk') //处理任务 $result = doJob($job['body']); if ($result) { //删除任务 $beanstalk->delete($job['id']); } else { //休眠任务 $beanstalk->bury($job['id']); } //跳出无限循环 if (file_exists('shutdown')) { file_put_contents('shutdown', 'beanstalkd在'.date('Y-m-d H:i:s').'关闭'); break; } } $beanstalk->disconnect();
原文:http://blog.chedushi.com/archives/8026