1、Storm基本概念html
在运行一个Storm任务以前,须要了解一些概念:java
Storm集群和Hadoop集群表面上看很相似。可是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这二者之间是很是不同的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。node
在Storm的集群里面有两种节点: 控制节点(master node)和工做节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的做用相似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 而且监控状态。数据库
每个工做节点上面运行一个叫作Supervisor的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个topology的一个子集;一个运行的topology由运行在不少机器上的不少工做进程组成。 windows
Nimbus和Supervisor之间的全部协调工做都是经过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。全部的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你能够用kill -9来杀死Nimbus和Supervisor进程, 而后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。服务器
一、Topologies架构
一个topology是spouts和bolts组成的图, 经过stream groupings将图中的spouts和bolts链接起来,以下图: 并发
一个topology会一直运行直到你手动kill掉,Storm自动从新分配执行失败的任务, 而且Storm能够保证你不会有数据丢失(若是开启了高可靠性的话)。若是一些机器意外停机它上面的全部任务会被转移到其余机器上。app
运行一个topology很简单。首先,把你全部的代码以及所依赖的jar打进一个jar包。而后运行相似下面的这个命令:dom
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology而且把它提交给Nimbus。storm jar负责链接到Nimbus而且上传jar包。
Topology的定义是一个Thrift结构,而且Nimbus就是一个Thrift服务, 你能够提交由任何语言建立的topology。上面的方面是用JVM-based语言提交的最简单的方法。
二、Streams
消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地建立和处理。经过对stream中tuple序列中每一个字段命名来定义stream。在默认的状况下,tuple的字段类型能够是:integer,long,short, byte,string,double,float,boolean和byte array。你也能够自定义类型(只要实现相应的序列化器)。
每一个消息流在定义的时候会被分配给一个id,由于单向消息流使用的至关广泛, OutputFieldsDeclarer定义了一些方法让你能够定义一个stream而不用指定这个id。在这种状况下这个stream会分配个值为‘default’默认的id 。
Storm提供的最基本的处理stream的原语是spout和bolt。你能够实现spout和bolt提供的接口来处理你的业务逻辑。
三、Spouts
消息源spout是Storm里面一个topology里面的消息生产者。通常来讲消息源会从一个外部源读取数据而且向topology里面发出消息:tuple。Spout能够是可靠的也能够是不可靠的。若是这个tuple没有被storm成功处理,可靠的消息源spouts能够从新发射一个tuple, 可是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
消息源能够发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,而后使用SpoutOutputCollector来发射指定的stream。
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回若是已经没有新的tuple。要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。
四、Bolts
全部的消息处理逻辑被封装在bolts里面。Bolts能够作不少事情:过滤,聚合,查询数据库等等。
Bolts能够简单的作消息流的传递。复杂的消息流处理每每须要不少步骤,从而也就须要通过不少bolts。好比算出一堆图片里面被转发最多的图片就至少须要两步:第一步算出每一个图片的转发数量。第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。
Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
Bolts的主要方法是execute, 它以一个tuple做为输入,bolts使用OutputCollector来发射tuple,bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 通常的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
五、Stream groupings
定义一个topology的其中一步是定义每一个bolt接收什么样的流做为输入。stream grouping就是用来定义一个stream应该若是分配数据给bolts上面的多个tasks。
Storm里面有7种类型的stream grouping
六、Reliability
Storm保证每一个tuple会被topology完整的执行。Storm会追踪由每一个spout tuple所产生的tuple树(一个bolt处理一个tuple以后可能会发射别的tuple从而造成树状结构),而且跟踪这棵tuple树何时成功处理完。每一个topology都有一个消息超时的设置,若是storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,而且过一下子从新发射这个tuple。
为了利用Storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必需要通知storm。这一切是由OutputCollector来完成的。经过emit方法来通知一个新的tuple产生了,经过ack方法通知一个tuple处理完成了。
Storm的可靠性咱们在第四章会深刻介绍。
七、Tasks
每个spout和bolt会被看成不少task在整个集群里执行。每个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另一堆task。你能够调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个task)。
八、Workers
一个topology可能会在一个或者多个worker(工做进程)里面执行,每一个worker是一个物理JVM而且执行整个topology的一部分。好比,对于并行度是300的topology来讲,若是咱们使用50个工做进程来执行,那么每一个工做进程会处理其中的6个tasks。Storm会尽可能均匀的工做分配给全部的worker。
九、Configuration
Storm里面有一堆参数能够配置来调整Nimbus, Supervisor以及正在运行的topology的行为,一些配置是系统级别的,一些配置是topology级别的。default.yaml里面有全部的默认配置。你能够经过定义个storm.yaml在你的classpath里来覆盖这些默认配置。而且你也能够在代码里面设置一些topology相关的配置信息(使用StormSubmitter)。
2、构建Topology
1. 实现的目标:
咱们将设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。这是一个简单的例子,目的是让你们对于topology快速上手,有一个初步的理解。
2. 设计Topology结构:
在开始开发Storm项目的第一步,就是要设计topology。肯定好你的数据处理逻辑,咱们今天将的这个简单的例子,topology也很是简单。整个topology以下:
整个topology分为三个部分:
3. 设计数据流
这个topology从kestrel queue读取句子,并把句子划分红单词,而后汇总每一个单词出现的次数,一个tuple负责读取句子,每个tuple分别对应计算每个单词出现的次数,大概样子以下所示:
4. 代码实现:
1) 构建maven环境:
为了开发storm topology, 你须要把storm相关的jar包添加到classpath里面去: 要么手动添加全部相关的jar包, 要么使用maven来管理全部的依赖。storm的jar包发布在Clojars(一个maven库), 若是你使用maven的话,把下面的配置添加在你项目的pom.xml里面。
2) 定义topology:
这种topology的spout从句子队列中读取句子,在kestrel.backtype.com位于一个Kestrel的服务器端口22133。
Spout用setSpout方法插入一个独特的id到topology。 Topology中的每一个节点必须给予一个id,id是由其余bolts用于订阅该节点的输出流。 KestrelSpout在topology中id为1。
setBolt是用于在Topology中插入bolts。 在topology中定义的第一个bolts 是切割句子的bolts。 这个bolts 将句子流转成成单词流。
让咱们看看SplitSentence实施:
关键的方法是 execute方法。 正如你能够看到,它将句子拆分红单词,并发出每一个单词做为一个新的元组。 另外一个重要的方法是declareOutputFields,其中宣布bolts输出元组的架构。 在这里宣布,它发出一个域为word的元组。
setBolt的最后一个参数是你想为bolts的并行量。 SplitSentence bolts 是10个并发,这将致使在storm集群中有十个线程并行执行。 你所要作的的是增长bolts的并行量在遇到topology的瓶颈时。
setBolt方法返回一个对象,用来定义bolts的输入。 例如,SplitSentence螺栓订阅组件“1”使用随机分组的输出流。 “1”是指已经定义KestrelSpout。 我将解释在某一时刻的随机分组的一部分。 到目前为止,最要紧的是,SplitSentence bolts会消耗KestrelSpout发出的每个元组。
下面在让咱们看看wordcount的实现:
SplitSentence对于句子里面的每一个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。
5. 运行Topology
storm的运行有两种模式: 本地模式和分布式模式.
1) 本地模式:
storm用一个进程里面的线程来模拟全部的spout和bolt. 本地模式对开发和测试来讲比较有用。 你运行storm-starter里面的topology的时候它们就是以本地模式运行的, 你能够看到topology里面的每个组件在发射什么消息。
2) 分布式模式:
storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码而且负责给你的topolgoy分配工做进程。若是一个工做进程挂掉了, master节点会把认为从新分配到其它节点。
3) 下面是以本地模式运行的代码:
首先, 这个代码定义经过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是同样的。经过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology自己。
topology的名字是用来惟一区别一个topology的,这样你而后能够用这个名字来杀死这个topology的。前面已经说过了, 你必须显式的杀掉一个topology, 不然它会一直运行。
Conf对象能够配置不少东西, 下面两个是最多见的:
TOPOLOGY_WORKERS(setNumWorkers) 定义你但愿集群分配多少个工做进程给你来执行这个topology. topology里面的每一个组件会被须要线程来执行。每一个组件到底用多少个线程是经过setBolt和setSpout来指定的。这些线程都运行在工做进程里面. 每个工做进程包含一些节点的一些工做线程。好比, 若是你指定300个线程,60个进程, 那么每一个工做进程里面要执行6个线程, 而这6个线程可能属于不一样的组件(Spout, Bolt)。你能够经过调整每一个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每一个组件所发射的每条消息。这在本地环境调试topology颇有用, 可是在线上这么作的话会影响性能的。
关于上述,给出下面一个简单的参考实例,详细讲解bolt、spout使用。
本节探讨一下storm具体怎么使用,明白怎么在windows下开发storm程序。
功能描述:实时随机输出一字符串。
在开发前记得导入storm须要的jar包。
一、SimpleSpout类继承BaseRichSpout类,用来产生数据而且向topology里面发出消息:tuple。
package com.ljq.helloword; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * Spout起到和外界沟通的做用,他能够从一个数据库中按照某种规则取数据,也能够从分布式队列中取任务 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleSpout extends BaseRichSpout{ //用来发射数据的工具类 private SpoutOutputCollector collector; private static String[] info = new String[]{ "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998,"}; Random random=new Random(); /** * 初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就能够向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ @Override public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大做用,咱们还能够调用declarer.declareStream();来定义stramId,该id能够用来定义更加复杂的流拓扑结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 } }
二、SimpleBolt类继承BaseBasicBolt类,处理一个输入tuple。
package com.ljq.helloword; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ //System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); } }
三、SimpleTopology类包含一个main函数,是Storm程序执行的入口点,包括一个数据喷发节点spout和一个数据处理节点bolt。
package com.ljq.helloword; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; /** * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。 * * @author Administrator * */ public class SimpleTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } } }