不得不说storm是一个特别棒的实时计算框架。为了对后文理解的方便,先说几个storm中的术语:数据库
Topology:拓扑图或者拓扑结构。在storm中它经过消息分组的分式链接Spout和Bolt节点定义了运算处理的拓扑结构。以下图:框架
那什么是Spout呢?ide
在计算任务须要的数据其实就是由Spout提供的,因此它能够说是Storm中的消息源,通常是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据而后发送给tuple元组的。spa
那它是经过谁发送的呢?又是如何发送的呢?日志
这里咱们先回答第一个问题,第二个问题之后解答。code
好了上面说了那么多就是为了引出今天的任务:阅读SpoutOutputCollector源码。orm
在阅读以前,咱们先明确一下SpoutOutputCollector究竟是什么?其实从类名就能说出大概(不得不说老外写的代码的可读性真是好的无法说。这里啰嗦一句,blog
我的以为这也是他们分享精神的体现,时刻记住方便给别人看。),它就是Spout输出收集器。接口
那它到底能干些啥呢?请看代码:队列
1.ISpoutOutputCollector:是SpoutOutputCollector的接口
1 public interface ISpoutOutputCollector { 2 /** 3 发送tuple消息,并返回起发送任务的task的序列号集合 4 */ 5 List<Integer> emit(String streamId, List<Object> tuple, Object messageId); 6 /** 7 *与上述发送方法相似,只不过emitDirect方法是要指定接收端的task,让接收端特定的task接收消息。 8 */ 9 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); 10 /** 11 *处理异常 12 */ 13 void reportError(Throwable error); 14 }
从上述接口ISpoutOutputCollector源码能够看出ISpoutOutputCollector中声明了3个方法,两个属于发送tuple元组的方法,他们之间的差别在上述注释中已说的很清楚,还有一个处理异常的方法。
2.SpoutOutputCollector:它实现了接口ISpoutOutputCollector
1 public class SpoutOutputCollector implements ISpoutOutputCollector { 2 ISpoutOutputCollector _delegate; 3 4 public SpoutOutputCollector(ISpoutOutputCollector delegate) { 5 _delegate = delegate; 6 } 7 8 /** 9 * 指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息彻底处理了,就会回调ack方法,不然会回调fail方法。 10 */ 11 public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { 12 return _delegate.emit(streamId, tuple, messageId); 13 } 14 15 /** 16 * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,这没有指定streamid,故采用默认的streamid 17 */ 18 public List<Integer> emit(List<Object> tuple, Object messageId) { 19 return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); 20 } 21 22 /** 23 * emit(String streamId, List<Object> tuple, Object messageId) 24 *的重载方法,这没有指定streamid,故采用默认的streamid,由于没有messageid,故ack方法和fail方法不会被调用 25 */ 26 public List<Integer> emit(List<Object> tuple) { 27 return emit(tuple, null); 28 } 29 30 /** 31 * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,由于没有messageid,故ack方法和fail方法不会被调用 32 */ 33 public List<Integer> emit(String streamId, List<Object> tuple) { 34 return emit(streamId, tuple, null); 35 } 36 37 /** 38 * 发射tuple消息,不过须要指定接收端的task来接收,而且输出必须声明为直接流,同时指定用来接收消息的task必须采用直接分组的方式来接收消息. 39 * 40 */ 41 public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { 42 _delegate.emitDirect(taskId, streamId, tuple, messageId); 43 } 44 45 /** 46 * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,采用默认的streamid 47 */ 48 public void emitDirect(int taskId, List<Object> tuple, Object messageId) { 49 emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); 50 } 51 52 /** 53 * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,由于没有指定的消息id,因此ack和fail方法就不会调用. 54 */ 55 public void emitDirect(int taskId, String streamId, List<Object> tuple) { 56 emitDirect(taskId, streamId, tuple, null); 57 } 58 59 /** 60 * 该类提供的重载方法,由于没有指定的消息id,因此ack和fail方法就不会调用. 61 */ 62 public void emitDirect(int taskId, List<Object> tuple) { 63 emitDirect(taskId, tuple, null); 64 } 65 /** 66 * 接口ISpoutOutputCollector中reportError的实现. 67 */ 68 @Override 69 public void reportError(Throwable error) { 70 _delegate.reportError(error); 71 } 72 }
在SpoutOutputCollector类中,实现了消息发射的方法,而且还提供了多个重载方法方便用户使用。