参考:http://www.open-open.com/lib/view/open1374979211233.html html
Apache Storm 是由Twitter开源的分布式实时计算系统。Storm能够很是容易而且可靠的处理无限的数据流。对比Hadoop的批处理,Storm是一个实时的、分布式的、具有高容错的计算系统。Storm应用可使用不一样的编程语言来进行开发。java
Storm的集群表面上看和hadoop的集群很是像。可是在Hadoop上面你运行的是MapReduce的Job。而在Storm上面你运行的是Topology。它们是很是不同的—一个关键的区别是: 一个MapReduce Job最终会结束,而一个Topology运永远运行(除非你显式的杀掉他)。在Storm的集群里面有两种节点:控制节点(master node)和工做节点(worker node)。控制节点上面运行一个后台程序:Nimbus,它的做用相似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工做给机器,而且监控状态。每个工做节点上面运行一个叫作Supervisor的节点(相似 TaskTracker)。Supervisor会监听分配给它那台机器的工做,根据须要 启动/关闭工做进程。每个工做进程执行一个Topology(相似 Job)的一个子集;一个运行的Topology由运行在不少机器上的不少工做进程 Worker(相似 Child)组成。node
Nimbus和Supervisor之间的全部协调工做都是经过一个Zookeeper集群来完成。而且,nimbus进程和supervisor都是快速失败(fail-fast)和无状态的。全部的状态要么在Zookeeper里面,要么在本地磁盘上。这也就意味着你能够用kill -9来杀死nimbus和supervisor进程,而后再重启它们,它们能够继续工做,就好像什么都没有发生过似的。这个设计使得storm难以想象的稳定。数据库
Storm 分布式计算结构称为 topology(拓扑),由stream(数据流),spout(数据流的生成者),bolt(运算)组成。
Storm 的核心数据结构是tuple。tuple是包含了一个或者多个键值对的列表,Stream是由无限制的tuple组成的序列。
spout 表明了一个Storm topology的主要数据入口,充当采集器的角色,链接到数据源,将数据转化为一个个tuple,并将tuple做为数据流进行发射。
bolt 能够理解为计算程序中的运算或者函数,将一个或者多个数据流做为输入,对数据实施运算后,选择性地输出一个或者多个数据流。 bolt能够订阅多个由spout或者其余bolt发射的数据流,这样就能够创建复杂的数据流转换网络。编程
Topologyapi
为了在storm上面作实时计算, 你要去创建一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每一个处理节点都包含处理逻辑, 而节点之间的链接则表示数据流动的方向。数组
运行一个Topology是很简单的。首先,把你全部的代码以及所依赖的jar打进一个jar包。而后运行相似下面的这个命令。cookie
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology而且把它提交给Nimbus。storm jar负责链接到nimbus而且上传jar文件。由于topology的定义其实就是一个Thrift结构而且nimbus就是一个Thrift服务, 有能够用任何语言建立而且提交topology。上面的方面是用JVM-based语言提交的最简单的方法。计算任务Topology是由不一样的Spouts和Bolts,经过数据流(Stream)链接起来的图。下面是一个Topology的结构示意图: 网络
其中包含有:数据结构
Spout:Storm中的消息源,用于为Topology生产消息(数据),通常是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。Spout能够是可靠的,也能够是不可靠的。若是这个tuple没有被Storm彻底处理,可靠的消息源能够从新发射一个tuple,可是不可靠的消息源一旦发出一个tuple就不能重发了。(可靠性会在下面介绍)
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回(若是已经没有新的tuple)。要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。
Bolt:Storm中的消息处理者,用于为Topology进行消息的处理,Bolt能够执行过滤, 聚合, 查询数据库等操做,并且能够一级一级的进行处理。
下图是Topology的提交流程图:
下图是Storm的数据交互图。能够看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间经过ZeroMQ传送数据。Storm全部的元数据信息保存在Zookeeper中。
Stream
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。好比: 你能够把一个tweets流传输到热门话题的流。storm提供的最基本的处理stream的原语是spout和bolt。你能够实现Spout和Bolt对应的接口以处理你的应用的逻辑。spout的流的源头。好比一个spout可能从Kestrel队列里面读取消息而且把这些消息发射成一个流。又好比一个spout能够调用twitter的一个api而且把返回的tweets发射成一个流。一般Spout会从外部数据源(队列、数据库等)读取数据,而后封装成Tuple形式,以后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。
bolt能够接收任意多个输入stream,做一些处理,有些bolt可能还会发射一些新的stream。一些复杂的流转换,好比从一些tweet里面计算出热门话题,须要多个步骤,从而也就须要多个bolt。Bolt能够作任何事情:运行函数,过滤tuple作一些聚合,作一些合并以及访问数据库等等。Bolt处理输入的Stream,并产生新的输出Stream。Bolt能够执行过滤、函数操做、Join、操做数据库等任何操做。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息以后会调用此函数,用户能够在此方法中执行本身的处理逻辑。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(相似 Job), 你能够把topology提交给storm的集群来运行。
topology里面的每个节点都是并行运行的。 在你的topology里面, 你能够指定每一个节点的并行度, storm则会在集群里面分配那么多线程来同时计算。一个topology会一直运行直到你显式中止它。storm自动从新分配一些运行失败的任务, 而且storm保证你不会有数据丢失, 即便在一些机器意外停机而且消息被丢掉的状况下。
数据模型
Storm使用tuple来做为它的数据模型。每一个tuple是一堆值,每一个值有一个名字,而且每一个值能够是任何类型,在个人理解里面一个tuple能够看做一个没有方法的java对象。整体来看,storm支持全部的基本类型、字符串以及字节数组做为tuple的值类型。你也可使用你本身定义的类型来做为值类型,只要你实现对应的序列化器(serializer)。一个Tuple表明数据流中的一个基本的处理单元,例如一条cookie日志,它能够包含多个Field,每一个Field表示一个属性。
Tuple原本应该是一个Key-Value的Map,因为各个组件间传递的tuple的字段名称已经事先定义好了,因此Tuple只须要按序填入各个Value,因此就是一个Value List。
一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。
topology里面的每一个节点必须定义它要发射的tuple的每一个字段。 好比下面这个bolt定义它所发射的tuple包含两个字段,类型分别是: double和triple。
publicclassDoubleAndTripleBoltimplementsIRichBolt { privateOutputCollectorBase _collector; @Override publicvoidprepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override publicvoidexecute(Tuple input) { intval = input.getInteger(0); _collector.emit(input,newValues(val*2, val*3)); _collector.ack(input); } @Override publicvoidcleanup() { } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields("double","triple")); } }
declareOutputFields方法定义要输出的字段 : ["double", "triple"]。这个bolt的其它部分咱们接下来会解释。
流分组策略(Stream grouping)
流分组策略告诉topology如何在两个组件之间发送tuple。要记住,spouts和bolts以不少task的形式在topology里面同步执行。若是从task的粒度来看一个运行的topology,它应该是这样的:
从task角度来看topology
当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt B的哪一个task呢?
stream grouping专门回答这种问题的。在咱们深刻研究不一样的stream grouping以前, 让咱们看一下storm-starter里面的另一个topology。WordCountTopology读取一些句子, 输出句子里面每一个单词出现的次数。
TopologyBuilder builder =newTopologyBuilder(); builder.setSpout(1,newRandomSentenceSpout(),5); builder.setBolt(2,newSplitSentence(),8) .shuffleGrouping(1); builder.setBolt(3,newWordCount(),12) .fieldsGrouping(2,newFields("word"));
SplitSentence对于句子里面的每一个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。
fields grouping是stream合并,stream聚合以及不少其它场景的基础。在背后呢, fields grouping使用的一致性哈希来分配tuple的。
可靠的消息处理
Storm容许用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId能够是任意的Object对象。多个源Tuple能够共用同一个MessageId,表示这多个源Tuple对用户来讲是同一个消息单元。Storm的可靠性是指Storm会告知用户每个消息单元是否在一个指定的时间内被彻底处理。彻底处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的全部Tuple都通过了Topology中每个应该到达的Bolt的处理。
在Spout中由message 1绑定的tuple1和tuple2分别通过bolt1和bolt2的处理,而后生成了两个新的Tuple,并最终流向了bolt3。当bolt3处理完以后,称message 1被彻底处理了。
Storm中的每个Topology中都包含有一个Acker组件。Acker组件的任务就是跟踪从Spout中流出的每个messageId所绑定的Tuple树中的全部Tuple的处理状况。若是在用户设置的最大超时时间内这些Tuple没有被彻底处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功。
那么Acker是如何记录Tuple的处理结果呢??
A xor A = 0.
A xor B…xor B xor A = 0,其中每个操做数出现且仅出现两次。
在Spout中,Storm系统会为用户指定的MessageId生成一个对应的64位的整数,做为整个Tuple Tree的RootId。RootId会被传递给Acker以及后续的Bolt来做为该消息单元的惟一标识。同时,不管Spout仍是Bolt每次新生成一个Tuple时,都会赋予该Tuple一个惟一的64位整数的Id。
当Spout发射完某个MessageId对应的源Tuple以后,它会告诉Acker本身发射的RootId以及生成的那些源Tuple的Id。而当Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker本身处理的输入Tuple的Id以及新生成的那些Tuple的Id。Acker只须要对这些Id进行异或运算,就能判断出该RootId对应的消息单元是否成功处理完成了。
实例:
咱们看一下storm-starter里面的ExclamationTopology:
TopologyBuilder builder =newTopologyBuilder(); builder.setSpout(1,newTestWordSpout(),10); builder.setBolt(2,newExclamationBolt(),3) .shuffleGrouping(1); builder.setBolt(3,newExclamationBolt(),2) .shuffleGrouping(2);
这个Topology包含一个Spout和两个Bolt。Spout发射单词, 每一个bolt在每一个单词后面加个”!!!”。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt而后把处理好的单词发射给第二个bolt。若是spout发射的单词是["bob"]和["john"], 那么第二个bolt会发射["bolt!!!!!!"]和["john!!!!!!"]出来。咱们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收咱们指定的一个id, 一个包含处理逻辑的对象(spout或者bolt), 以及你所须要的并行度。这个包含处理的对象若是是spout那么要实现IRichSpout的接口, 若是是bolt,那么就要实现IRichBolt接口.最后一个指定并行度的参数是可选的。它表示集群里面须要多少个thread来一块儿执行这个节点。若是你忽略它那么storm会分配一个线程来执行这个节点。
setBolt方法返回一个InputDeclarer对象, 这个对象是用来定义Bolt的输入。 这里第一个Bolt声明它要读取spout所发射的全部的tuple — 使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。shuffle grouping表示全部的tuple会被随机的分发给bolt的全部task。给task分发tuple的策略有不少种。
若是你想第二个bolt读取spout和第一个bolt所发射的全部的tuple, 那么你应该这样定义第二个bolt:
builder.setBolt(3,newExclamationBolt(),5) .shuffleGrouping(1) .shuffleGrouping(2);
让咱们深刻地看一下这个topology里面的spout和bolt是怎么实现的。Spout负责发射新的tuple到这个topology里面来。TestWordSpout从["nathan", "mike", "jackson", "golda", "bertels"]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:
publicvoidnextTuple() { Utils.sleep(100); finalString[] words =newString[] {"nathan","mike", "jackson","golda","bertels"}; finalRandom rand =newRandom(); finalString word = words[rand.nextInt(words.length)]; _collector.emit(newValues(word)); }
ExclamationBolt把”!!!”拼接到输入tuple后面。咱们来看下ExclamationBolt的完整实现。
publicstaticclassExclamationBoltimplementsIRichBolt { OutputCollector _collector; publicvoidprepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } publicvoidexecute(Tuple tuple) { _collector.emit(tuple,newValues(tuple.getString(0) +"!!!")); _collector.ack(tuple); } publicvoidcleanup() { } publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields("word")); } }
Config conf =newConfig(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster =newLocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();