和一样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。可是Job在运行结束以后会自行结束,Topology却只能被手动的kill掉,不然会一直运行下去。html
Storm集群中有两种节点,一种是控制节点(Nimbus节点),另外一种是工做节点(Supervisor节点)。全部Topology任务的提交必须在Storm客户端节点上进行(须要配置~/.storm/storm.yaml文件),由Nimbus节点分配给其余Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分红一个个的Task,并将Task和Supervisor相关的信息提交到zookeeper集群上,Supervisor会去zookeeper集群上认领本身的Task,通知本身的Worker进程进行Task的处理。整体的Topology处理流程图为:git
每一个Topology都由Spout和Bolt组成,在Spout和Bolt传递信息的基本单位叫作Tuple,由Spout发出的接二连三的Tuple及其在相应Bolt上处理的子Tuple连起来称为一个Steam,每一个Stream的命名是在其首个Tuple被Spout发出的时候,此时Storm会利用内部的Ackor机制保证每一个Tuple可靠的被处理。github
而Tuple能够理解成键值对,其中,键就是在定义在declareStream方法中的Fields字段,而值就是在emit方法中发送的Values字段。web
在运行Topology以前,能够经过一些参数的配置来调节运行时的状态,参数的配置是经过Storm框架部署目录下的conf/storm.yaml文件来完成的。在次文件中能够配置运行时的Storm本地目录路径、运行时Worker的数目等。数据库
在代码中,也能够设置Config的一些参数,可是优先级是不一样的,不一样位置配置Config参数的优先级顺序为:数组
default.yaml<storm.yaml<topology内部的configuration<内部组件的special configuration<外部组件的special configuration服务器
在storm.yaml中经常使用的几个选项为:并发
配置选项名称app |
配置选项做用框架 |
topology.max.task.parallelism |
每一个Topology运行时最大的executor数目 |
topology.workers |
每一个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖 |
storm.zookeeper.servers |
zookeeper集群的节点列表 |
storm.local.dir |
Storm用于存储jar包和临时文件的本地存储目录 |
storm.zookeeper.root |
Storm在zookeeper集群中的根目录,默认是“/” |
ui.port |
Storm集群的UI地址端口号,默认是8080 |
nimbus.host: |
Nimbus节点的host |
supervisor.slots.ports |
Supervisor节点的worker占位槽,集群中的全部Topology公用这些槽位数,即便提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当全部的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,若是有,就再次给新提交的Topology分配 |
supervisor.worker.timeout.secs |
Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会从新分配其运行着的task任务 |
drpc.servers |
在使用drpc服务时,drpc server的服务器列表 |
drpc.port |
在使用drpc服务时,drpc server的服务端口 |
Spout是Stream的消息产生源, Spout组件的实现能够经过继承BaseRichSpout类或者其余*Spout类来完成,也能够经过实现IRichSpout接口来实现。
须要根据状况实现Spout类中重要的几个方法有:
当一个Task被初始化的时候会调用此open方法。通常都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。
示例以下:
1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 2 3 _collector = collector; 4 5 }
此方法用于声明当前Spout的Tuple发送流。Stream流的定义是经过OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域Fields。
示例以下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 3 declarer.declare(new Fields("word")); 4 5 }
此方法用于声明针对当前组件的特殊的Configuration配置。
示例以下:
1 public Map<String, Object> getComponentConfiguration() { 2 3 if(!_isDistributed) { 4 5 Map<String, Object> ret = new HashMap<String, Object>(); 6 7 ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 8 9 return ret; 10 11 } else { 12 13 return null; 14 15 } 16 17 }
这里即是设置了Topology中当前Component的线程数量上限。
这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是经过这个方法来实现的。
示例以下:
1 public void nextTuple() { 2 3 Utils.sleep(100); 4 5 final String[] words = new String[] {"twitter","facebook","google"}; 6 7 final Random rand = new Random(); 8 9 final String word = words[rand.nextInt(words.length)]; 10 11 _collector.emit(new Values(word)); 12 13 }
这里即是从一个数组中随机选取一个单词做为Tuple,而后经过_collector发送到Topology。
另外,除了上述几个方法以外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理以后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。
Bolt类接收由Spout或者其余上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现能够经过继承BasicRichBolt类或者IRichBolt接口来完成。
Bolt类须要实现的主要方法有:
此方法和Spout中的open方法相似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送能够在prepare方法中、execute方法中、cleanup等方法中进行,通常都是些在execute中。
示例以下:
1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 2 3 _collector = collector; 4 5 }
用于声明当前Bolt发送的Tuple中包含的字段,和Spout中相似。
示例以下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 3 declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); 4 5 }
此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "actualWindowLengthInSeconds"。
和Spout类同样,在Bolt中也能够有getComponentConfiguration方法。
示例以下:
1 public Map<String, Object> getComponentConfiguration() { 2 3 Map<String, Object> conf = new HashMap<String, Object>(); 4 5 conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); 6 7 return conf; 8 9 }
此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统须要每隔一段时间执行特定的处理时,就能够利用这个系统的组件的特性来完成。
这是Bolt中最关键的一个方法,对于Tuple的处理均可以放到此方法中进行。具体的发送也是经过emit方法来完成的。此时,有两种状况,一种是emit方法中有两个参数,另外一个种是有一个参数。
(1)emit有一个参数:此惟一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple再也不属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。
(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,若是下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。
这两种状况要根据本身的场景来肯定。
示例以下:
1 public void execute(Tuple tuple) { 2 3 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 4 5 _collector.ack(tuple); 6 7 } 8 9 public void execute(Tuple tuple) { 10 11 _collector.emit(new Values(tuple.getString(0) + "!!!")); 12 13 }
此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法相似,都是在当前Component关闭时调用,可是针对实时计算来讲,除非一些特殊的场景要求之外,这两个方法通常都不多用到。
上文中介绍了Topology的基本组件Spout和Bolt,在Topology中,数据流Tuple的处理就是不断的经过调用不一样的Spout和Bolt来完成的。不一样的Bolt和Spout的上下游关系是经过在入口类中定义的。示例以下:
1 builder = new TopologyBuilder(); 2 3 builder.setSpout(spoutId, new TestWordSpout(), 5); 4 5 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 6 7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); 8 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); 9
此例中的builder是TopologyBuilder对象,经过它的createTopology方法能够建立一个Topology对象,同时此builder还要定义当前Topology中用到的Spout和Bolt对象,分别经过setSpout方法和setBolt方法来完成。
setSpout方法和setBolt方法中的第一个参数是当前的Component组件的Stream流ID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来肯定Topology的Task数目。最后的小尾巴*Grouping是指的一个Stream应如何分配数据给Bolt上面的Task。目前Storm的Stream Grouping有以下几种:
(1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每一个Bolt的Task接收Tuple数量大体一致;
(2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中;
(3)AllGrouping:广播发送,每个Task都会受到全部的Tuple;
(4)GlobalGrouping:全局分组,全部的Tuple都发送到同一个Task中,此时通常将当前Component的并发数目设置为1;
(5)NonGrouping:不分组,和ShuffleGrouping相似,当前Task的执行会和它的被订阅者在同一个线程中执行;
(6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,并且,此时必须有emitDirect方法来发送;
(7) localOrShuffleGrouping:和ShuffleGrouping相似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。
不一样的的Grouping,须要根据不一样的场景来具体设定,不一而论。
Topology的运行能够分为本地模式和分布式模式,模式的设置能够在配置文件中设定,也能够在代码中设置。
(1)本地运行的提交方式:
1 LocalCluster cluster = new LocalCluster(); 2 3 cluster.submitTopology(topologyName, conf, topology); 4 5 cluster.killTopology(topologyName); 6 7 cluster.shutdown();
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
须要注意的是,在Storm代码编写完成以后,须要打包成jar包放到Nimbus中运行,打包的时候,不须要把依赖的jar都打进去,不然若是把依赖的storm.jar包打进去的话,运行时会出现重复的配置文件错误致使Topology没法运行。由于Topology运行以前,会加载本地的storm.yaml配置文件。
在Nimbus运行的命令以下:
storm jar StormTopology.jar maincalss args
有几点须要说明的地方:
(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,以后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化以后的Topology代码文件;
(2)在设定Topology所关联的Spouts和Bolts时,能够同时设置当前Spout和Bolt的executor数目和task数目,默认状况下,一个Topology的task的总和是和executor的总和一致的。以后,系统根据worker的数目,尽可能平均的分配这些task的执行。worker在哪一个supervisor节点上运行是由storm自己决定的;
(3)任务分配好以后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的全部worker进程的心跳信息;
(4)Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了全部Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor经过轮询此节点的内容,来领取本身的任务,启动worker进程运行;
(5)一个Topology运行以后,就会不断的经过Spouts来发送Stream流,经过Bolts来不断的处理接收到的Stream流,Stream流是无界的。
最后一步会不间断的执行,除非手动结束Topology。
Topology中的Stream处理时的方法调用过程以下:
有几点须要说明的地方:
(1)每一个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。
(2)open方法、prepare方法的调用是屡次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程 的数目,此数目是多少,上述的两个方法就会被调用多少次,在每一个executor运行的时候调用一次。至关于一个线程的构造方法。
(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产 生无界的Tuple流,体现实时性。至关于线程的run方法。
(4)在提交了一个topology以后,Storm就会建立spout/bolt实例并进行序列化。以后,将序列化的component发送给全部的任务所在的机器(即Supervisor节 点),在每个任务上反序列化component。
(5)Spout和Bolt之间、Bolt和Bolt之间的通讯,是经过zeroMQ的消息队列实现的。
(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理以后,须要调用ack方法来标记成功,不然调用fail方法标记失败,从新处理这个Tuple。
在Topology的执行单元里,有几个和并行度相关的概念。
(1)worker:每一个worker都属于一个特定的Topology,每一个Supervisor节点的worker能够有多个,每一个worker使用一个单独的端口,它对Topology中的每一个component运行一个或者多个executor线程来提供task的运行服务。
(2)executor:executor是产生于worker进程内部的线程,会执行同一个component的一个或者多个task。
(3)task:实际的数据处理由task完成,在Topology的生命周期中,每一个组件的task数目是不会发生变化的,而executor的数目却不必定。executor数目小于等于task的数目,默认状况下,两者是相等的。
在运行一个Topology时,能够根据具体的状况来设置不一样数量的worker、task、executor,而设置的位置也能够在多个地方。
(1)worker设置:
(1.1)能够经过设置yaml中的topology.workers属性
(1.2)在代码中经过Config的setNumWorkers方法设定
(2)executor设置:
经过在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;
(3)task设置:
(3.1) 默认状况下,和executor数目一致;
(3.2)在代码中经过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;
经过在Nimbus节点利用以下命令来终止一个Topology的运行:
storm kill topologyName
kill以后,能够经过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息以后,此Topology就会完全消失了。
Topology提交后,能够在Nimbus节点的web界面查看,默认的地址是http://NimbusIp:8080。
上面给出了如何编写Storm框架任务Topology的方法,那么在哪些场景下可以使用Storm框架呢?下面介绍Storm框架的几个典型的应用场景。
(1)利用Storm框架的DRPC进行大量的函数并行调用,即实现分布式的RPC;
(2)利用Storm框架的Transaction Topology,能够进行实时性的批量更新或者查询数据库操做或者应用须要同一批内的消息以及批与批之间的消息并行处理这样的场景,此时Topology中只能有一个TrasactionalSpout;
(3)利用滑动窗口的逻辑结合Storm框架来计算得出某段时间内的售出量最多的产品、购买者最多的TopN地区等;
(4)精确的广告推送,在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的帐户信息(若是有的话)完成产品的分类统计,产品的相关性查询等逻辑计算以后,将计算结果推送给用户;
(5)实时日志的处理,Storm能够和一个分布式存储结合起来,实时性的从多个数据源发送数据处处理逻辑Bolts,Bolts完成一些逻辑处理以后,交给分布式存储框架进行存储,此时,Spout能够是多个;
(6)实时性的监控舆论热点,好比针对某个关键词,在用户查询的时候,产生数据源Spout,结合语义分析等,由Bolt来完成查询关键词的统计分析,汇总当前的舆论热点;
(7)数据流的实时聚合操做。
http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/
https://github.com/nathanmarz/storm/wiki
http://nathanmarz.github.io/storm/doc/index-all.html
有理解不到位的地方,欢迎批评指正,一块儿交流~