正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。java
Bolt是这样一种组件,它把元组做为输入,而后产生新的元组做为输出。实现一个bolt时,一般须要实现IRichBolt接口。Bolts对象由客户端机器建立,序列化为拓扑,并提交给集群中的主机。而后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。
NOTE:要建立一个bolt对象,它经过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一块儿序列化。web
Bolts拥有以下方法:ui
declareOutputFields(OutputFieldsDeclarer declarer) { 为bolt声明输出模式 } prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector) { 仅在bolt开始处理元组以前调用 } execute(Tuple input){ 处理输入的单个元组 } cleanup(){ 在bolt即将关闭时调用 }
下面看一个例子,在这个例子中bolt把一句话分割成单词列表:this
class SplitSentence implements IRichBolt { private OutputCollector collector; publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }}
正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,若是bolt由于某些缘由丢弃了一些消息——不管是由于bolt挂了,仍是由于程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spouts和bolts也不会收到。spa
然而在许多状况下,你想确保消息在整个拓扑范围内都被处理过了。设计
正如前面所说的,Storm保证经过spout发送的每条消息会获得全部bolt的全面处理。基于设计上的考虑,这意味着你要本身决定你的bolts是否保证这一点。code
拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每一个节点都会调用ack(tuple)或fail(tuple),Storm所以知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称作锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它可以确保消息都被处理了。orm
class SplitSentence implenents IRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer){ declar.declare(new Fields("word")); }}
锚定发生在调用collector.emit()时。正如前面提到的,Storm能够沿着元组追踪到始发spout。collector.ack(tuple)和collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。若是一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。对象
NOTE:你能够经过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。接口
固然了spout须要考虑消息的失败状况,并相应的重试或丢弃消息。
NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每一个元组,因此若是你不调用这两个方法,该任务最终将耗尽内存。
一个bolt可使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。而后,你能够在TopologyBuilder决定由哪一个流订阅它。
为了用bolt链接或聚合数据流,你须要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上。能够向emit方法传入一个元组列表来达成目的。
... List anchors = new ArrayList(); anchors.add(tuple1); anchors.add(tuple2); collector.emit(anchors, values); ...
经过这种方式,bolt在任意时刻调用ack或fail方法,都会通知消息树,并且因为流锚定了多个元组,全部相关的spout都会收到通知。
你可能已经注意到了,在许多状况下都须要消息确认。简单起见,Storm提供了另外一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法以后自动调用ack方法。
class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); }} public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
NOTE:分发消息的BasicOutputCollector自动锚定到做为参数传入的元组。