storm能够确保spout发送出来的每一个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。 html
一个消息(tuple)从spout发送出来,可能会致使成百上千的消息基于此消息被建立。 git
咱们来思考一下流式的“单词统计”的例子: github
storm任务从数据源(Kestrel queue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每一个单词以及它出现过的次数。 算法
本例中,每一个从spout发送出来的消息(每一个英文句子)都会触发不少的消息被建立,那些从句子中分隔出来的单词就是被建立出来的新消息。 并发
这些消息构成一个树状结构,咱们称之为“tuple tree”,看起来如图1所示: 性能
图1 示例tuple tree spa
在什么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被知足: 设计
若是在指定的时间内,一个消息衍生出来的tuple tree未被彻底处理成功,则认为此消息未被完整处理。这个超时值能够经过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为30秒。 orm
若是消息被完整处理或者未被完整处理,Storm会如何进行接下来的操做呢?为了弄清这个问题,咱们来研究一下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口: htm
首先, Storm使用spout实例的nextTuple()方法从spout请求一个消息(tuple)。 收到请求之后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,Spout会给这个消息提供一个message ID,它将会被用来标识这个消息。
假设咱们从kestrel队列中读取消息,Spout会将kestrel 队列为这个消息设置的ID做为此消息的message ID。 向SpoutOutputCollector中发送消息格式以下:
接来下,这些消息会被发送到后续业务处理的bolts, 而且Storm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,Storm会调用Spout中的ack方法,并将此消息的messageID做为参数传入。同理,若是某消息处理超时,则此消息对应的Spout的fail方法会被调用,调用时此消息的messageID会被做为参数传入。
注意:一个消息只会由发送它的那个spout任务来调用ack或fail。若是系统中某个spout由多个任务运行,消息也只会由建立它的spout任务来应答(ack或fail),决不会由其余的spout任务来应答。
咱们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout须要作些什么(假设这个spout的名字是KestrelSpout)。
咱们先简述一下kestrel消息队列:
当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答之后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其余的客户端看到。另外,若是一个客户端意外的断开链接,则由此客户端“打开”的全部消息都会被从新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个惟一的标识。
KestrelSpout就是使用这个惟一的标识做为这个tuple的messageID的。稍后当ack或fail被调用的时候,KestrelSpout会把ack或者fail连同messageID一块儿发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它从新放回队列中。
为了使用Storm提供的可靠处理特性,咱们须要作两件事情:
经过上面的两步,storm就能够检测到一个tuple tree什么时候被彻底处理了,而且会调用相关的ack或fail方法。Storm提供了简单明了的方法来完成上述两步。
为tuple tree中指定的节点增长一个新的节点,咱们称之为锚定(anchoring)。锚定是在咱们发送消息的同时进行的。为了更容易说明问题,咱们使用下面代码做为例子。本示例的bolt将包含整句话的消息分解为一系列的子消息,每一个子消息包含一个单词。
每一个消息都经过这种方式被锚定:把输入消息做为emit方法的第一个参数。由于word消息被锚定在了输入消息上,这个输入消息是spout发送过来的tuple tree的根节点,若是任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被从新发送。
与此相反,咱们来看看使用下面的方式emit消息时,Storm会如何处理:
若是以这种方式发送消息,将会致使这个消息不会被锚定。若是此tuple tree中的消息处理失败,派生此tuple tree的根消息不会被从新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。
一个输出消息能够被锚定在一个或者多个输入消息上,这在作join或聚合的时候是颇有用的。一个被多重锚定的消息处理失败,会致使与之关联的多个spout消息被从新发送。多重锚定经过在emit方法中指定多个输入消息来实现:
多重锚定会将被锚定的消息加到多棵tuple tree上。
注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:
图2 多重锚定构成的钻石型结构
Storm的实现能够像处理树那样来处理DAGs。
锚定代表了如何将一个消息加入到指定的tuple tree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时咱们该作些什么。这些是经过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,能够发现当全部的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。
每一个被处理的消息必须代表成功或失败(acked 或者failed)。Storm是使用内存来跟踪每一个消息的处理状况的,若是被处理的消息没有应答的话,早晚内存会被耗尽!>
不少bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。通常的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可使用BasicBolt来重写:
使用这种方式,代码比以前稍微简单了一些,可是实现的功能是同样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,而且,当execute执行完毕的时候,会自动的应答输入消息。
不少状况下,一个消息须要延迟应答,例如聚合或者是join。只有根据一组输入消息获得一个结果以后,才会应答以前全部的输入消息。而且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。
Storm 系统中有一组叫作“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每一个消息。每当发现一个DAG被彻底处理,它就向建立这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度能够经过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提升acker任务的并发度。
为了理解Storm可靠性处理机制,咱们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被建立的时候(不管是在spout仍是bolt中),系统都为该消息分配一个64bit的随机值做为id。这些随机的id是acker用来跟踪由spout消息派生出来的tuple tree的。
每一个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tuple tree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,可是我派生出了一些新的消息,帮忙跟踪一下吧。
举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,tuple tree是如何变化的。
由于在C被从树中移除的同时D和E会被加入到tuple tree中,所以tuple tree不会被过早的认为已彻底处理。
关于Storm如何跟踪tuple tree,咱们再深刻的探讨一下。前面说过系统中能够有任意个数的acker,那么,每当一个消息被建立或应答的时候,它怎么知道应该通知哪一个acker呢?
系统使用一种哈希算法来根据spout消息的messageId肯定由哪一个acker跟踪此消息派生出来的tuple tree。由于每一个消息都知道与之对应的根消息的messageId,所以它知道应该与哪一个acker通讯。
当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会建立一个新的tuple tree。当acker发现这棵树被彻底处理以后,他就会通知对应的spout任务。
tuple是如何被跟踪的呢?系统中有成千上万的消息,若是为每一个spout发送的消息都构建一棵树的话,很快内存就会耗尽。因此,必须采用不一样的策略来跟踪每一个消息。因为使用了新的跟踪算法,Storm只须要固定的内存(大约20字节)就能够跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。
acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,经过这个id,acker就知道消息处理完成时该通知哪一个spout任务。第二个值是一个64bit的数字,咱们称之为“ack val”, 它是树中全部消息的随机id的异或结果。ack val表示了整棵树的的状态,不管这棵树多大,只须要这个固定大小的数字就能够跟踪整棵树。当消息被建立和被应答的时候都会有相同的消息id发送过来作异或。
每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被彻底处理了。由于消息的随机ID是一个64bit的值,所以ack val在树处理完以前被置为0的几率很是小。假设你每秒钟发送一万个消息,从几率上说,至少须要50,000,000年才会有机会发生一次错误。即便如此,也只有在这个消息确实处理失败的状况下才会有数据的丢失!
Acker任务是轻量级的,因此在拓扑中并不须要太多的acker存在。能够经过Storm UI来观察acker任务的吞吐量,若是看上去吞吐量不够的话,说明须要添加额外的acker。
若是你并不要求每一个消息必须被处理(你容许在处理过程当中丢失一些信息),那么能够关闭消息的可靠处理机制,从而能够获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每一个消息不须要应答了)。另外,关闭消息的可靠处理能够减小消息的大小(不须要每一个tuple记录它的根id了),从而节省带宽。
有三种方法能够关系消息的可靠处理机制:
到如今为止,你们已经理解了Storm的可靠性机制,而且知道了如何选择不一样的可靠性级别来知足需求。接下来咱们研究一下Storm如何保证在各类状况下确保数据不丢失。
本章介绍了storm集群如何实现数据的可靠处理。借助于创新性的tuple tree跟踪技术,storm高效的经过数据的应答机制来保证数据不丢失。
storm集群中除nimbus外,没有单点存在,任何节点均可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要能够及时重启,就不会影响正在运行的任务。