Storm入门 第二章 构建Topology

2.1 Storm基本概念

在运行一个Storm任务以前,须要了解一些概念: node

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers
  9. Configuration

Storm集群和Hadoop集群表面上看很相似。可是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这二者之间是很是不同的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。 数据库

在Storm的集群里面有两种节点: 控制节点(master node)和工做节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的做用相似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 而且监控状态。 服务器

每个工做节点上面运行一个叫作Supervisor的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个topology的一个子集;一个运行的topology由运行在不少机器上的不少工做进程组成。
架构

Nimbus和Supervisor之间的全部协调工做都是经过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。全部的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你能够用kill -9来杀死Nimbus和Supervisor进程, 而后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。 并发

2.1.1 Topologies

一个topology是spouts和bolts组成的图, 经过stream groupings将图中的spouts和bolts链接起来,以下图: app

一个topology会一直运行直到你手动kill掉,Storm自动从新分配执行失败的任务, 而且Storm能够保证你不会有数据丢失(若是开启了高可靠性的话)。若是一些机器意外停机它上面的全部任务会被转移到其余机器上。 maven

运行一个topology很简单。首先,把你全部的代码以及所依赖的jar打进一个jar包。而后运行相似下面的这个命令: 分布式

 storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 函数

这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1arg2。这个类的main函数定义这个topology而且把它提交给Nimbus。storm jar负责链接到Nimbus而且上传jar包。 oop

Topology的定义是一个Thrift结构,而且Nimbus就是一个Thrift服务, 你能够提交由任何语言建立的topology。上面的方面是用JVM-based语言提交的最简单的方法。

2.1.2 Streams

消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地建立和处理。经过对stream中tuple序列中每一个字段命名来定义stream。在默认的状况下,tuple的字段类型能够是:integer,long,short, byte,string,double,float,boolean和byte array。你也能够自定义类型(只要实现相应的序列化器)。

每一个消息流在定义的时候会被分配给一个id,由于单向消息流使用的至关广泛, OutputFieldsDeclarer定义了一些方法让你能够定义一个stream而不用指定这个id。在这种状况下这个stream会分配个值为‘default’默认的id 。

Storm提供的最基本的处理stream的原语是spout和bolt。你能够实现spout和bolt提供的接口来处理你的业务逻辑。

2.1.3  Spouts

消息源spout是Storm里面一个topology里面的消息生产者。通常来讲消息源会从一个外部源读取数据而且向topology里面发出消息:tuple。Spout能够是可靠的也能够是不可靠的。若是这个tuple没有被storm成功处理,可靠的消息源spouts能够从新发射一个tuple, 可是不可靠的消息源spouts一旦发出一个tuple就不能重发了。

消息源能够发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,而后使用SpoutOutputCollector来发射指定的stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回若是已经没有新的tuple。要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。

2.1.4  Bolts

全部的消息处理逻辑被封装在bolts里面。Bolts能够作不少事情:过滤,聚合,查询数据库等等。

Bolts能够简单的作消息流的传递。复杂的消息流处理每每须要不少步骤,从而也就须要通过不少bolts。好比算出一堆图片里面被转发最多的图片就至少须要两步:第一步算出每一个图片的转发数量。第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。

Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

Bolts的主要方法是execute, 它以一个tuple做为输入,bolts使用OutputCollector来发射tuple,bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 通常的流程是: bolts处理一个输入tuple,  发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

2.1.5  Stream groupings

定义一个topology的其中一步是定义每一个bolt接收什么样的流做为输入。stream grouping就是用来定义一个stream应该若是分配数据给bolts上面的多个tasks。

Storm里面有7种类型的stream grouping

  1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每一个bolt接收到的tuple数目大体相同。
  2. Fields Grouping:按字段分组, 好比按userid来分组, 具备一样userid的tuple会被分到相同的Bolts里的一个task, 而不一样的userid则会被分配到不一样的bolts里的task。
  3. All Grouping:广播发送,对于每个tuple,全部的bolts都会收到。
  4.  Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
  5. Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是同样的效果, 有一点不一样的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。 只有被声明为Direct Stream的消息流能够声明这种分组方法。并且这种消息tuple必须使用emitDirect方法来发射。消息处理者能够经过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
  7. Local or shuffle grouping:若是目标bolt有一个或者多个task在同一个工做进程中,tuple将会被随机发生给这些tasks。不然,和普通的Shuffle Grouping行为一致。

2.1.6  Reliability

Storm保证每一个tuple会被topology完整的执行。Storm会追踪由每一个spout tuple所产生的tuple树(一个bolt处理一个tuple以后可能会发射别的tuple从而造成树状结构),而且跟踪这棵tuple树何时成功处理完。每一个topology都有一个消息超时的设置,若是storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,而且过一下子从新发射这个tuple。

为了利用Storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必需要通知storm。这一切是由OutputCollector来完成的。经过emit方法来通知一个新的tuple产生了,经过ack方法通知一个tuple处理完成了。

Storm的可靠性咱们在第四章会深刻介绍。

2.1.7  Tasks

每个spout和bolt会被看成不少task在整个集群里执行。每个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另一堆task。你能够调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个task)。

2.1.8  Workers

一个topology可能会在一个或者多个worker(工做进程)里面执行,每一个worker是一个物理JVM而且执行整个topology的一部分。好比,对于并行度是300的topology来讲,若是咱们使用50个工做进程来执行,那么每一个工做进程会处理其中的6个tasks。Storm会尽可能均匀的工做分配给全部的worker。

2.1.9 Configuration

Storm里面有一堆参数能够配置来调整Nimbus, Supervisor以及正在运行的topology的行为,一些配置是系统级别的,一些配置是topology级别的。default.yaml里面有全部的默认配置。你能够经过定义个storm.yaml在你的classpath里来覆盖这些默认配置。而且你也能够在代码里面设置一些topology相关的配置信息(使用StormSubmitter)。

2.2  构建Topology

1. 实现的目标:

咱们将设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。这是一个简单的例子,目的是让你们对于topology快速上手,有一个初步的理解。

2. 设计Topology结构:

在开始开发Storm项目的第一步,就是要设计topology。肯定好你的数据处理逻辑,咱们今天将的这个简单的例子,topology也很是简单。整个topology以下:

整个topology分为三个部分:

KestrelSpout:数据源,负责发送sentence

Splitsentence:负责将sentence切分

Wordcount:负责对单词的频率进行累加

3. 设计数据流

这个topology从kestrel queue读取句子,并把句子划分红单词,而后汇总每一个单词出现的次数,一个tuple负责读取句子,每个tuple分别对应计算每个单词出现的次数,大概样子以下所示:

4. 代码实现:

1) 构建maven环境:

为了开发storm topology, 你须要把storm相关的jar包添加到classpath里面去: 要么手动添加全部相关的jar包, 要么使用maven来管理全部的依赖。storm的jar包发布在Clojars(一个maven库), 若是你使用maven的话,把下面的配置添加在你项目的pom.xml里面。

<repository>

    <id>clojars.org</id>

    <url>http://clojars.org/repo</url>

</repository>

<dependency>

     <groupId>storm</groupId>

    <artifactId>storm</artifactId>

     <version>0.5.3</version>

     <scope>test</scope>

</dependency>

2) 定义topology:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout(“kestrel.backtype.com”,22133,

                                                                                    ”sentence_queue”,

                                                                                    new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

       .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

       .fieldsGrouping(2, new Fields(“word”));

这种topology的spout从句子队列中读取句子,在kestrel.backtype.com位于一个Kestrel的服务器端口22133。

Spout用setSpout方法插入一个独特的id到topology。 Topology中的每一个节点必须给予一个id,id是由其余bolts用于订阅该节点的输出流。 KestrelSpout在topology中id为1。

setBolt是用于在Topology中插入bolts。 在topology中定义的第个bolts 是切割句子的bolts。 这个bolts 将句子流转成成单词流。

让咱们看看SplitSentence实施:

public class SplitSentence implements IBasicBolt{

        public void prepare(Map conf, TopologyContext context) {

         }

       public void execute(Tuple tuple, BasicOutputCollector collector) {

              String sentence = tuple.getString(0);

               for(String word: sentence.split(“ ”)) {

                        collector.emit(new Values(word));

                  }

             }

         public void cleanup() {

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                declarer.declare(new Fields(“word”));

             }

 }

关键的方法 execute方法。 正如你能够看到,它将句子拆分红单词,并发出每一个单词做为一个新的元组。 另外一个重要的方法是declareOutputFields其中宣布bolts输出元组的架构。 在这里宣布,它发出一个域为word的元组

setBolt的最后一个参数是你想为bolts的并行量。 SplitSentence bolts 是10个并发,这将致使在storm集群中有十个线程并行执行。 你所要作的的是增长bolts的并行量在遇到topology的瓶颈时。

 setBolt方法返回一个对象,用来定义bolts的输入。 例如SplitSentence螺栓订阅组件“1”使用随机分组的输出流。 “1”是指已经定义KestrelSpout 我将解释在某一时刻的随机分组的一部分。 到目前为止,最要紧的是SplitSentence bolts会消耗KestrelSpout发出的每个元组。

下面在让咱们看看wordcount的实现:

public class WordCount implements IBasicBolt {

        private Map<String, Integer> _counts = new HashMap<String, Integer>();

        public void prepare(Map conf, TopologyContext context) {

        }

       public void execute(Tuple tuple, BasicOutputCollector collector) {

              String word = tuple.getString(0);

              int count;

              if(_counts.containsKey(word)) {

                     count = _counts.get(word);

              } else {

                     count = 0;

}

              count++;

              _counts.put(word, count);

              collector.emit(new Values(word, count));

       }

       public void cleanup() {

       }

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields(“word”, “count”));

       }

}

SplitSentence对于句子里面的每一个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词, 它就更新内存里面的统计状态。

5. 运行Topology

storm的运行有两种模式: 本地模式和分布式模式.

1) 本地模式:

storm用一个进程里面的线程来模拟全部的spout和bolt. 本地模式对开发和测试来讲比较有用。 你运行storm-starter里面的topology的时候它们就是以本地模式运行的, 你能够看到topology里面的每个组件在发射什么消息。

2) 分布式模式:

storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码而且负责给你的topolgoy分配工做进程。若是一个工做进程挂掉了, master节点会把认为从新分配到其它节点。

3) 下面是以本地模式运行的代码:

         Config conf = new Config();

         conf.setDebug(true);

         conf.setNumWorkers(2);

         LocalCluster cluster = new LocalCluster();

         cluster.submitTopology(“test”, conf, builder.createTopology());

          Utils.sleep(10000);

          cluster.killTopology(“test”);

          cluster.shutdown();

首先, 这个代码定义经过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是同样的。经过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology自己。

topology的名字是用来惟一区别一个topology的,这样你而后能够用这个名字来杀死这个topology的。前面已经说过了, 你必须显式的杀掉一个topology, 不然它会一直运行。

Conf对象能够配置不少东西, 下面两个是最多见的:

 TOPOLOGY_WORKERS(setNumWorkers) 定义你但愿集群分配多少个工做进程给你来执行这个topology. topology里面的每一个组件会被须要线程来执行。每一个组件到底用多少个线程是经过setBolt和setSpout来指定的。这些线程都运行在工做进程里面. 每个工做进程包含一些节点的一些工做线程。好比, 若是你指定300个线程,60个进程, 那么每一个工做进程里面要执行6个线程, 而这6个线程可能属于不一样的组件(Spout, Bolt)。你能够经过调整每一个组件的并行度以及这些线程所在的进程数量来调整topology的性能。

 TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每一个组件所发射的每条消息。这在本地环境调试topology颇有用, 可是在线上这么作的话会影响性能的。

结论:

本章从storm的基本对象的定义,到普遍的介绍了storm的开发环境,从一个简单的例子讲解了topology的构建和定义。但愿你们能够从本章的内容对storm有一个基本的理解和概念,而且已经能够构建一个简单的topology!!

相关文章
相关标签/搜索