Beanstalkd 是一个高性能的消息队列中间件,本博文宅鸟将介绍一下这个东东的使用。php
1、先经过概念让你们了解Beanstalkd的特性和工做场景。html
Beanstalkd 是一个轻量级消息中间件,它最大特色是将本身定位为基于管道 (tube) 和任务 (job) 的工做队列 (work-queue):java
Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 可以很好的支持分布式的后台任务和定时任务处理。python
它的内部实现采用 libevent, 服务器-客户端之间用相似 memcached 的轻量级通信协议,具备有很高的性能。c++
尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态可以从纪录的本地 binlog 中恢复。git
管道 (tube):github
管道相似于消息主题 (topic), 在一个 Beanstalkd 中能够支持多个管道, 每一个管道都有本身的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。 数据库
任务 (job):json
Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不一样,任务有一系列状态:vim
READY- 须要当即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
DELAYED- 延迟执行的任务, 当消费者处理任务后, 能够用将消息再次放回 DELAYED 队列延迟执行;
RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
DELETED- 消息被完全删除。Beanstalkd 再也不维持这些消息。
任务优先级 (priority):
任务 (job) 能够有 0~2^32 个优先级, 0 表明最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任什么时候刻调用 reserve 命令的消费者老是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).
延时任务 (delay):
有两种方式能够延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制能够实现分布式的 java.util.Timer,这种分布式定时任务的优点是:若是某个消费者节点故障,任务超时重发 (time-to-run) 可以保证任务转移到另外的节点执行。
任务超时重发 (time-to-run):
Beanstalkd 把任务返回给消费者之后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;不然 Beanstalkd 会认为消息处理失败,而后把任务交给另外的消费者节点执行。若是消费者预计在 TTR (time-to-run) 时间内没法完成任务, 也能够发送 touch 命令, 它的做用是让 Beanstalkd 从系统时间从新计算 TTR (time-to-run).
任务预留 (buried):
若是任务由于某些缘由没法执行, 消费者能够把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员能够经过 peek buried 命令查询被保留的任务,而且进行人工干预。简单的, kick <n> 可以一次性把 n 条被保留的任务踢回队列。
Beanstalkd 协议:
Beanstalkd 采用类 memcached 协议, 客户端经过文本命令与服务器交互。这些命令能够简单的分红三组:
生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:
生产者用 use 选择一个管道 (tube), 而后用 put 命令向管道发布任务 (job).
消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>
消费者用 watch 选择多个管道 (tube), 而后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者能够完全删除任务 (DELETE), 释听任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。
维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>
用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。能够用消费类命令改变这些任务的状态。
被保留 (buried) 的任务能够用 kick 命令 "踢" 回队列。
协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt
Beanstalkd 不足:
Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,能够用数据库为任务 (job) 提供持久化存储。
另外, 和 memcached 相似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点能够经过单机部署多个实例克服。
2、部署安装:
Beanstalkd 的安装很是简单:
在Ubuntu和debian下使用下面命令:
sudo apt-get install beanstalkd
安装后编辑配置文件:
vim /etc/default/beanstalkd
把START=NO改成:START=yes便可
更多关于安装能够参考官网
经过命令能够启动、中止Beanstalk
/etc/init.d/beanstalkd start lsof -i:11300 /etc/init.d/beanstalkd stop
启动后,就能够经过客户端进行调用了:
Beanstalk支持多种客户端语言:
php,java,perl,c,c++,lua,python,go,ruby等等(了解更多能够来官网)。
咱们将经过php给你们介绍在生产环境下面的使用。
就拿录视频制程序使用到的Beanstalk来给你们介绍:
先介绍一下程序结构:
视频录制程序分为两个方面,一个是产生录制任务的脚本(生产者),还有一个处理录制任务脚本(消费者)。
首先把php的客户端下载后,加入到项目中。下面把代码贴出来:
生产者:
#!/usr/bin/php <?php require_once 'Configuration.php'; require_once 'Record.class.php'; require_once 'BeanStalk.class.php'; $now=time(); $model = new RecordModel (); $records=$model->checkStartRecord($now); //print_r($records); //exit(); $beanstalk = BeanStalk::open ( array ( 'servers' => array ( Configuration::$record_config['beanStak'] ), 'select' => 'random peek' ) ); $beanstalk->use_tube ( 'records' ); foreach ( $records as $record ) { $beanstalk->put ( 0, 0, 10, json_encode ( $record ) ); } ?>
消费者:
<?php require_once('config.php'); require_once('func.php'); require_once('BeanStalk.class.php'); $beanstalk = BeanStalk::open(array( 'servers' => array( $config['beanStak'] ), 'select' => 'random peek' )); $beanstalk->watch('records'); while(true){ //$beanstalk->watch('records'); $job = $beanstalk->reserve_with_timeout(); if(is_object($job)){ $data=$job->get(); $json=json_decode($data,true); print_r($json); if(!empty($json["live_name"])&&!empty($json["start_time"])&&!empty($json["end_time"])&&!empty($json["vod_id"])){ //print_r($json); if(!empty($json["afterplay"])&&$json["afterplay"]==1) $cmd="{$config['afterplaycmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}"; else $cmd="{$config['recordcmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}"; echo $cmd; $chkcmd="ps -ef |grep '".$cmd."' |grep -v 'grep'|wc -l"; //$chkcmd="ps -ef |wc -l"; //echo $chkcmd; $count=system($chkcmd); //echo $count; if($count==0) { //system($cmd); exec($cmd,$res,$rc); //print_r($res); //print_r($rc); } Beanstalk::delete($job); // Delete the job. $info=array(); $info["vod_id"]=$json['vod_id']; $info["record_msg"]="startjob"; $data=array(); $data["type"]="reciveRecords"; $data["message"]=$info; $url=$config['recordStatus']; $httpcode = 200; $result = test_api($httpcode,$url,"post",json_encode($data)); print_r($data); } //$beanstalk->watch('records'); } sleep(1); } ?>
下面咱们介绍一个能够管理Beanstalk的php工具,地址以下
https://github.com/jimbojsb/bstools
把该工具安装后,就能够查看Beanstalk的各类状况了
到此结束,不足之处欢迎拍砖