storm spout和bolt java api

Componentjava

Storm中,Spout和Bolt都是其Component。因此,Storm定义了一个名叫IComponent的总接口web

图谱以下:dom

绿色部分是咱们最经常使用、比较简单的部分。红色部分是与事务相关的,在之后的文章会具体讲解。ide

BaseComponent 是Storm提供的“偷懒”的类。为何这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。这样咱们在用的时候,能够直接继承该类,而不是本身每次都写全部的方法。但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。ui

Spoutthis

在前面基本例子中,咱们实现了一个RandomSpout,来看看其类图spa

  • Spout的最顶层抽象是ISpout接口。code

open方法是初始化动做。容许你在该spout初始化时作一些动做,传入了上下文,方便取上下文的一些数据。orm

close方法在该spout关闭前执行,可是并不能获得保证其必定被执行。spout是做为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就没法执行了。而在本地模式下,只要不是kill -9, 若是是发送中止命令,是能够保证close的执行的。继承

activate和deactivate :一个spout能够被暂时激活和关闭,这两个方法分别在对应的时刻被调用。

nextTuple 用来发射数据。

ack(Object)

传入的Object实际上是一个id,惟一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。

fail(Object)

同ack,只不过是tuple处理失败时执行。

咱们的RandomSpout 因为继承了BaseRichSpout,因此不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。

结论:
一般状况下(Shell和事务型的除外),实现一个Spout,能够直接实现接口IRichSpout,若是不想写多余的代码,能够直接继承BaseRichSpout。

Bolt

 ExclaimBasicBolt的类图: 

这里能够看到一个奇怪的问题:
为何IBasicBolt并无继承IBolt?
咱们带着问题往下看。

IBolt定义了三个方法:

IBolt继承了java.io.Serializable,咱们在nimbus上提交了topology之后,建立出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文
execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果
cleanup 同ISpout的close方法,在关闭前调用。一样不保证其必定执行。
红色部分是Bolt实现时必定要注意的地方。而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。
若是你确实要反馈失败,能够抛出FailedException。

咱们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码以下:

public class ExclaimRichBolt extends BaseRichBolt {
 
     private OutputCollector collector;
     
     @Override
     public void prepare(Map stormConf, TopologyContext context,
             OutputCollector collector) {
         this .collector = collector;
     }
 
     @Override
     public void execute(Tuple tuple) {
         this .collector.emit(tuple, new Values(tuple.getString( 0 )+ "!" ));
         this .collector.ack(tuple);
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare( new Fields( "after_excl" ));
     }
 
}

修改topology

//builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
builder.setBolt( "exclaim" , new ExclaimRichBolt(), 2 ).shuffleGrouping( "spout" );

运行下,结果一致。

结论:

一般状况下,实现一个Bolt,能够实现IRichBolt接口或继承BaseRichBolt,若是不想本身处理结果反馈,能够实现IBasicBolt接口或继承BaseBasicBolt,它实际上至关于自动作掉了prepare方法和collector.emit.ack(inputTuple);

相关文章
相关标签/搜索