storm可靠机制

一  可靠性 简介

       Storm的可靠性是指Storm会告知用户每个消息单元是否在一个指定的时间(timeout)内被彻底处理。 彻底处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的全部Tuple都通过了Topology中每个应该到达的Bolt的处理。html

注:  timetout 能够经过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS   来指定java

       Storm中的每个Topology中都包含有一个Acker组件。 Acker组件的任务就是跟踪从某个task中的Spout流出的每个messageId所绑定的Tuple树中的全部Tuple的处理状况。 若是在用户设置的最大超时时间内这些Tuple没有被彻底处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功,它会分别调用Spout中的fail和ack方法。git

       Storm容许用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId能够是任意的Object对象。 多个源Tuple能够共用同一个MessageId,表示这多个源Tuple对用户来讲是同一个消息单元,它们会被放到同一棵tuple树中 ,以下图所示:github

                                                    

                                                                          Tuple 树web

       在Spout中由message 1绑定的tuple1和tuple2分别通过bolt1和bolt2的处理,而后生成了两个新的Tuple,并最终流向了bolt3。当bolt3处理完以后,称message 1被彻底处理了。算法

二 Acker  原理分析数据库

       storm里面有一类 特殊的task称为acker (acker bolt) ,  负责跟踪spout发出的每个tuple的tuple树 。当acker发现一个tuple树已经处理完成了。它会发送一个消息给产生这个tuple的那个task。你能够经过 Config.TOPOLOGY_ACKERS 来设置一个topology里面的acker的数量, 默认值是1。 若是你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。编程

       理解storm的可靠性的最好的方法是来看看tuple和tuple树的生命周期, 当一个tuple被建立, 不论是spout仍是bolt建立的,  它会被赋予一个64位的id ,而acker就是利用这个id去跟踪全部的tuple的。 每一个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。 因此当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化。具体来讲就是它告诉acker:  我已经完成了, 我有这些儿子tuple, 你跟踪一下他们吧。api

                                  (spout-tuple-id, tmp-ack-val)app

                 tmp-ark-val =  tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )

   tmp-ack-val是要ack的tuple的id与由它新建立的全部的tuple的id异或的结果

        当一个tuple须要ack的时候,它到底选择哪一个acker来发送这个信息呢?

          storm使用一致性哈希来把一个spout-tuple-id对应到acker, 由于每个tuple知道它全部的祖宗的tuple-id, 因此它天然能够算出要通知哪一个acker来ack。

         注:一个tuple可能存在于多个tuple树,全部可能存在多个祖宗的tuple-id

          acker是怎么知道每个spout tuple应该交给哪一个task来处理?

       当一个spout发射一个新的tuple, 它会简单的发一个消息给一个合适的acker,而且告诉acker它本身的id(taskid), 这样storm就有了taskid-tupleid的对应关系。 当acker发现一个树完成处理了, 它知道给哪一个task发送成功的消息。

Acker的高效性

           acker task并不显式的跟踪tuple树。 对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反,  acker用了一种不一样的方式, 使得对于每一个spout tuple所须要的内存量是恒定的(20 bytes) .  这个跟踪算法是storm如何工做的关键,而且也是它的主要突破。

        一个acker task存储了一个spout-tuple-id到一对值的一个mapping。这个对子的第一个值是建立这个tuple的taskid, 这个是用来在完成处理tuple的时候发送消息用的。 第二个值是一个64位的数字称做:ack val , ack val是整个tuple树的状态的一个表示,无论这棵树多大。它只是简单地把这棵树上的全部建立的tupleid/ack的tupleid一块儿异或(XOR)。

            当一个acker task 发现一个 ack val变成0了, 它知道这棵树已经处理完成了。

 例以下图是一个简单的Topology。

                         

                                                                          一 个简单的 Topology

        ack_val的初值为0,varl_x表示新产生的tuple id ,它们通过Spout,Bolt1,Bolt2,Bolt3 处理,并与arv_val异或,最终arv_val变为0,表示tuple1被成功处理。

   下面看一个稍微复杂一点的例子:

        注:红色虚线框表示的是Acker组件,ack_val表示acker value的值,它的初值为0

        msg1绑定了两个源tuple,它们的id分别为1001和1010.在通过Bolt1处理后新生成了tuple id为1110,新生成的tuple与传入的tuple 1001进行异或获得的值为0111,而后Bolt1经过spout-tuple-id映射到指定的Acker组件,向它发送消息,Acker组件将Bolt1传过来的值与ack_val异或,更新ack_val的值变为了0100。与此相同通过Bolt2处理后,ack_val的值变为0001。最后经Bolt3处理后ack_val的值变为了0,说明此时由msg1标识的Tuple处理成功,此时Acker组件会经过事先绑定的task id映射找到对应的Spout,而后调用该Spout的ack方法。

            其流程以下图所示:

     注: 1. Acker (ack bolt)组件由系统自动产生,通常来讲一个topology只有一个ack bolt(固然能够经过配置参数指定多个),当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack bolt。ack bolt经过简单的异或原理(即同一个数与本身异或结果为零)来断定从spout发出的某一个Tuple是否已经被彻底处理完毕。若是结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并执行。若是超时,则发送fail消息给spout,spout中的fail函数被调用并执行,spout中的ack和fail的处理逻辑由用户自行填写。

          2.  Acker对于每一个Spout-tuple保存一个ack-val的校验值,它的初始值是0, 而后每发射一个tuple 就ack一个tuple,那么tuple的id都要跟这个校验值异或一下,而且把获得的值更新为ack-val的新值。那么假设每一个发射出去的tuple都被ack了, 那么最后ack-val必定是0(由于一个数字跟本身异或获得的值是0)。

            A xor A = 0.

          A xor B…xor B xor A = 0,其中每个操做数出现且仅出现两次。

        3.  tupleid是随机的64位数字, ack val碰巧变成0(例如:ark_val = 1 ^ 2  ^ 3 = 0)而不是由于全部建立的tuple都完成了,这样的几率极小。 算一下就知道了, 就算每秒发生10000个ack, 那么须要50000000万年才可能碰到一个错误。并且就算碰到了一个错误, 也只有在这个tuple失败的时候才会形成数据丢失。 

      看看storm在每种异常状况下是怎么避免数据丢失的:

         1. 因为对应的task挂掉了,一个tuple没有被ack:  storm的超时机制在超时以后会把这个tuple标记为失败,从而能够从新处理。

         2. Acker挂掉了:  这种状况下由这个acker所跟踪的全部spout tuple都会超时,也就会被从新处理。

          3. Spout挂掉了:  在这种状况下给spout发送消息的消息源负责从新发送这些消息。好比Kestrel和RabbitMQ在一个客户端断开以后会把全部”处理中“的消息放回队列。

就像你看到的那样, storm的可靠性机制是彻底分布式的, 可伸缩的而且是高度容错的。

四Acker 工做流程

咱们来看看acker的工做流程:

1. Spout在初始化时会产生一个tasksId;

2. Spout中建立新的Tuple,其id是一个64位的随机数;

3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分

  • Spout的taskId:用户acker在整个tuple树被彻底处理后找到原始的Spout进行回调ack或fail

  • 一个64位的ack val值: 标志该tuple是否被彻底处理。初始值为0。

4. 一个Bolt在处理完Tuple后,若是发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

5. 该Bolt调用OutputCollector.ack()时,Storm会作以下操做:

  • anchor tuple列表中每一个已经ack过的和新建立的Tuple的id作异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0XORtuple-id-1

  • Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,而后把上面异或后得出的ack val值发送给acker

6. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,若是为0,表示该Tuple已被彻底处理,则根据其taskId找到原始的Spout,回调其ack()方法。


fail的机制相似,在发现fail后直接回调Spout的fail方法。

四 Acker 编程接口

        在Spout中,Storm系统会为用户指定的MessageId生成一个对应的64位的整数,做为整个Tuple Tree的RootId。RootId会被传递给Acker以及后续的Bolt来做为该消息单元的惟一标识。同时,不管Spout仍是Bolt每次新生成一个Tuple时,都会赋予该Tuple一个惟一的64位整数的Id。

      当Spout发射完某个MessageId对应的源Tuple以后,它会告诉Acker本身发射的RootId以及生成的那些源Tuple的Id。而当Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker本身处理的输入Tuple的Id以及新生成的那些Tuple的Id。Acker只须要对这些Id进行异或运算,就能判断出该RootId对应的消息单元是否成功处理完成了。

   下面这个是spout要实现的接口:

public interface ISpout extends Serializable {
     void open(Map conf, TopologyContext context,SpoutOutputCollector collector);      
     void close();      
     void nextTuple();      
     void ack(Object msgId);     
     void fail(Object msgId); 
    }

        首先 storm经过调用spout的nextTuple方法来获取下一个tuple, Spout经过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面经过这个message-id来追踪这个tuple。

this.collector.emit(new Values("hello world"),msgId);

          注:

msgId是提供给Acker组件使用的,Acker组件使用msgId来跟踪Tuple树

       接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。若是storm检测到一个tuple被彻底处理了, 那么storm会以最开始的那个message-id做为参数去调用消息源的ack方法;反之storm会调用spout的fail方法。值得注意的是, storm调用ack或者fail的task始终是产生这个tuple的那个task。因此若是一个spout被分红不少个task来执行, 消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

         做为storm的使用者,有两件事情要作以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple以后要通知storm。 这样storm就能够检测整个tuple树有没有完成处理,而且通知源spout处理结果。storm提供了一些简洁的api来作这些事情。

         由一个tuple产生一个新的tuple称为:anchoring。 你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每一个单词一个tuple。  

public class SplitSentence implements IRichBolt {  
    OutputCollector _collector; 
    public void prepare(Map conf,TopologyContext context,OutputCollector collector) {        
        _collector = collector;  
        } 
    public void execute(Tuple tuple) {    
        String sentence = tuple.getString(0);    
        for(String word: sentence.split(" ")) {          
            _collector.emit(tuple,new Values(word));    
            }       
         _collector.ack(tuple);  
         }
 
       public void cleanup() {}
       public void declareOutputFields(OutputFieldsDeclarer declarer) {      
           declarer.declare(newFields("word"));
       }
   }

        看一下这个execute方法, emit的第一个参数是输入tuple, 第二个参数则是输出tuple, 这其实就是经过输入tuple anchoring了一个新的输出tuple。由于这个“单词tuple”被anchoring在“句子tuple”一块儿, 若是其中一个单词处理出错,那么这整个句子会被从新处理。做为对比, 咱们看看若是经过下面这行代码来发射一个新的tuple的话会有什么结果。

     _collector.emit(new Values(word));

        用这种方法发射会致使新发射的这个tuple脱离原来的tuple树(unanchoring), 若是这个tuple处理失败了, 整个句子不会被从新处理。一个输出tuple能够被anchoring到多个输入tuple。这种方式在stream合并或者stream聚合的时候颇有用。一个多入口tuple处理失败的话,那么它对应的全部输入tuple都要从新执行。看看下面演示怎么指定多个输入tuple:

 List<Tuple> anchors = new ArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit(anchors,new Values(1,2,3));

         咱们经过anchoring来构造这个tuple树,最后一件要作的事情是在你处理完这个tuple的时候告诉storm,  经过OutputCollector类的ack和fail方法来作,若是你回过头来看看 SplitSentence 的例子, 你能够看到“句子tuple”在全部“单词tuple”被发出以后调用了ack。

       你能够调用 OutputCollector  的fail方法去当即将从消息源头发出的那个tuple标记为fail, 好比你查询了数据库,发现一个错误,你能够立刻fail那个输入tuple, 这样可让这个tuple被快速的从新处理, 由于你不须要等那个timeout时间来让它自动fail。

        每一个你处理的tuple, 必须被ack或者fail。由于storm追踪每一个tuple要占用内存。因此若是你不ack/fail每个tuple, 那么最终你会看到OutOfMemory错误。

       大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt每每是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。若是用BasicBolt来作, 上面那个SplitSentence能够改写成这样:

public class SplitSentence implements IBasicBolt {  
    public void prepare(Map conf,TopologyContext context) {} 
    public void execute(Tuple tuple,BasicOutputCollector collector) {      
        String sentence = tuple.getString(0);      
        for(String word: sentence.split(" ")) {        
            collector.emit(newValues(word));      
            }
       } 
    public void cleanup() {} 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {      
        declarer.declare(newFields("word"));  
        }
    }

         这个实现比以前的实现简单多了, 可是功能上是同样的,发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。

          做为对比,处理聚合和合并的bolt每每要处理一大堆的tuple以后才能被ack, 而这类tuple一般都是多输入的tuple, 因此这个已经不是IBasicBolt能够罩得住的了。

  注:当一个Tuple处理失败的时候,storm不会自动的重发该tuple,须要用户本身来编写逻辑从新处理fail掉的Tuple,能够将其 放入一个列表中,在nextTuple()中获取这些失败的tuple,从新发射。

五 调整可靠性 

       acker task是很是轻量级的, 因此一个topology里面不须要不少acker。你能够经过Strom UI(id: -1)来跟踪它的性能。 若是它的吞吐量看起来不正常,那么你就须要多加点acker了。

若是可靠性对你来讲不是那么重要 — 你不太在乎在一些失败的状况下损失一些数据, 那么你能够经过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减小一半, 由于对于每个tuple都要发送一个ack消息。而且它须要更少的id来保存下游的tuple, 减小带宽占用。

  有三种方法能够去掉可靠性:

       第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种状况下, storm会在spout发射一个tuple以后立刻调用spout的ack方法。也就是说这个tuple树不会被跟踪。

第二个方法是在tuple层面去掉可靠性。 你能够在发射tuple的时候不指定messageid来达到不跟踪某个特定的spout tuple的目的。

最后一个方法是若是你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么能够在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面, 也就不会被跟踪了。

总结

spout里的msgId(也就是tupleId,系统自动生成64位)和bolt里的锚点有什么相关性呢?

msgId用来跟踪一个tuple,可是一个tuple可能要被N个bolt进行处理,你须要保证每一个bolt都处理正常,这时候你在任何一个bolt调用ack(tuple)的话,spout都会认定完成,这种状况明显不符合要求。 锚定就是对每一个数据的路径负责,只有当路径执行到底,而且正确的状况下,反馈信息到锚定节点,锚定节点才会确认数据正常。

相关文章
相关标签/搜索