对于spout,有ISpout,IRichSpout,BaseRichSpout数据库
对于bolt,有IBolt,IRichBolt,BaseRichBolt,IBasicBolt,BaseBasicBoltapi
IBasicBolt,BaseBasicBolt不用每次execute完成都写ack/fail,由于已经帮你实现好了。函数
做为storm的使用者,有两件事情要作以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple以后要通知storm。 这样storm就能够检测整个tuple树有没有完成处理,而且通知源spout处理结果。storm提供了一些简洁的api来作这些事情。
由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每一个单词一个tuple。
ui
public class SplitSentence implements IRichBolt { orm
Output Collector _collector; blog
public void prepare(Map conf, 内存
TopologyContext context, get
OutputCollector collector) { it
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple,newValues(word));
}
_collector.ack(tuple);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
咱们经过anchoring来构造这个tuple树,最后一件要作的事情是在你处理完当个tuple的时候告诉storm, 经过OutputCollector类的ack和fail方法来作,若是你回过头来看看SplitSentence的例子, 你能够看到“句子tuple”在全部“单词tuple”被发出以后调用了ack。
你能够调用OutputCollector 的fail方法去当即将从消息源头发出的那个tuple标记为fail, 好比你查询了数据库,发现一个错误,你能够立刻fail那个输入tuple, 这样可让这个tuple被快速的从新处理, 由于你不须要等那个timeout时间来让它自动fail。
每一个你处理的tuple, 必须被ack或者fail。由于storm追踪每一个tuple要占用内存。因此若是你不ack/fail每个tuple, 那么最终你会看到OutOfMemory错误。
大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt每每是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。若是用BasicBolt来作, 上面那个SplitSentence能够改写成这样:
public class SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
}
public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(newValues(word));
}
}
public void cleanup() {
}
public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
这个实现比以前的实现简单多了, 可是功能上是同样的。
发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。
咱们编写的时候使用IBasicBolt最方便了。或者 extends BaseBasicBolt类