Storm's reliability API: how Storm guarantees that every message coming off a spout will be fully processed.html
(storm的可靠性API: storm如何保证spout发出的每个tuple都被完整处理。)java
本文导读: node
1、简介 2、理解消息被完整处理 3、消息的生命周期 4、可靠相关的API 5、高效的实现tuple tree 6、选择合适的可靠性级别 7、集群的各级容错性 7.1 任务级失败 7.2任务槽(slot)故障 7.3集群节点(机器)故障 8、小结 附:官网文档guaranteeing message processing 译文
storm能够确保spout发送出来的每一个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。git
一个消息(tuple)从spout发送出来,可能会致使成百上千的消息基于此消息被建立。github
咱们来思考一下流式的“单词统计”的例子:算法
storm任务从数据源(Kestrel queue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每一个单词以及它出现过的次数。apache
本例中,每一个从spout发送出来的消息(每一个英文句子)都会触发不少的消息被建立,那些从句子中分隔出来的单词就是被建立出来的新消息。api
这些消息构成一个树状结构,咱们称之为“tuple tree”,看起来如图1所示:并发
图1 示例tuple treeapp
在什么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被知足:
若是在指定的时间内,一个消息衍生出来的tuple tree未被彻底处理成功,则认为此消息未被完整处理。这个超时值能够经过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为30秒。
若是消息被完整处理或者未被完整处理,Storm会如何进行接下来的操做呢?为了弄清这个问题,咱们来研究一下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口:
首先, Storm使用spout实例的nextTuple()方法从spout请求一个消息(tuple)。 收到请求之后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,Spout会给这个消息提供一个message ID,它将会被用来标识这个消息。
假设咱们从kestrel队列中读取消息,Spout会将kestrel 队列为这个消息设置的ID做为此消息的message ID。 向SpoutOutputCollector中发送消息格式以下:
接来下,这些消息会被发送到后续业务处理的bolts, 而且Storm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,Storm会调用Spout中的ack方法,并将此消息的messageID做为参数传入。同理,若是某消息处理超时,则此消息对应的Spout的fail方法会被调用,调用时此消息的messageID会被做为参数传入。
注意:一个消息只会由发送它的那个spout任务来调用ack或fail。若是系统中某个spout由多个任务运行,消息也只会由建立它的spout任务来应答(ack或fail),决不会由其余的spout任务来应答。
咱们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout须要作些什么(假设这个spout的名字是KestrelSpout)。
咱们先简述一下kestrel消息队列:
当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答之后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其余的客户端看到。另外,若是一个客户端意外的断开链接,则由此客户端“打开”的全部消息都会被从新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个惟一的标识。
KestrelSpout就是使用这个惟一的标识做为这个tuple的messageID的。稍后当ack或fail被调用的时候,KestrelSpout会把ack或者fail连同messageID一块儿发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它从新放回队列中。
为了使用Storm提供的可靠处理特性,咱们须要作两件事情:
经过上面的两步,storm就能够检测到一个tuple tree什么时候被彻底处理了,而且会调用相关的ack或fail方法。Storm提供了简单明了的方法来完成上述两步。
为tuple tree中指定的节点增长一个新的节点,咱们称之为锚定(anchoring)。锚定是在咱们发送消息的同时进行的。为了更容易说明问题,咱们使用下面代码做为例子。本示例的bolt将包含整句话的消息分解为一系列的子消息,每一个子消息包含一个单词。
每一个消息都经过这种方式被锚定:把输入消息做为emit方法的第一个参数。由于word消息被锚定在了输入消息上,这个输入消息是spout发送过来的tuple tree的根节点,若是任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被从新发送。
与此相反,咱们来看看使用下面的方式emit消息时,Storm会如何处理:
若是以这种方式发送消息,将会致使这个消息不会被锚定。若是此tuple tree中的消息处理失败,派生此tuple tree的根消息不会被从新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。
一个输出消息能够被锚定在一个或者多个输入消息上,这在作join或聚合的时候是颇有用的。一个被多重锚定的消息处理失败,会致使与之关联的多个spout消息被从新发送。多重锚定经过在emit方法中指定多个输入消息来实现:
多重锚定会将被锚定的消息加到多棵tuple tree上。
注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:
图2 多重锚定构成的钻石型结构
Storm的实现能够像处理树那样来处理DAGs。
锚定代表了如何将一个消息加入到指定的tuple tree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时咱们该作些什么。这些是经过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,能够发现当全部的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。
每一个被处理的消息必须代表成功或失败(acked 或者failed)。Storm是使用内存来跟踪每一个消息的处理状况的,若是被处理的消息没有应答的话,早晚内存会被耗尽!>
不少bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。通常的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可使用BasicBolt来重写:
使用这种方式,代码比以前稍微简单了一些,可是实现的功能是同样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,而且,当execute执行完毕的时候,会自动的应答输入消息。
不少状况下,一个消息须要延迟应答,例如聚合或者是join。只有根据一组输入消息获得一个结果以后,才会应答以前全部的输入消息。而且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。
Storm 系统中有一组叫作“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每一个消息。每当发现一个DAG被彻底处理,它就向建立这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度能够经过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提升acker任务的并发度。
为了理解Storm可靠性处理机制,咱们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被建立的时候(不管是在spout仍是bolt中),系统都为该消息分配一个64bit的随机值做为id。这些随机的id是acker用来跟踪由spout消息派生出来的tuple tree的。
每一个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tuple tree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,可是我派生出了一些新的消息,帮忙跟踪一下吧。
举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,tuple tree是如何变化的。
由于在C被从树中移除的同时D和E会被加入到tuple tree中,所以tuple tree不会被过早的认为已彻底处理。
关于Storm如何跟踪tuple tree,咱们再深刻的探讨一下。前面说过系统中能够有任意个数的acker,那么,每当一个消息被建立或应答的时候,它怎么知道应该通知哪一个acker呢?
系统使用一种哈希算法来根据spout消息的messageId肯定由哪一个acker跟踪此消息派生出来的tuple tree。由于每一个消息都知道与之对应的根消息的messageId,所以它知道应该与哪一个acker通讯。
当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会建立一个新的tuple tree。当acker发现这棵树被彻底处理以后,他就会通知对应的spout任务。
tuple是如何被跟踪的呢?系统中有成千上万的消息,若是为每一个spout发送的消息都构建一棵树的话,很快内存就会耗尽。因此,必须采用不一样的策略来跟踪每一个消息。因为使用了新的跟踪算法,Storm只须要固定的内存(大约20字节)就能够跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。
acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,经过这个id,acker就知道消息处理完成时该通知哪一个spout任务。第二个值是一个64bit的数字,咱们称之为“ack val”, 它是树中全部消息的随机id的异或结果。ack val表示了整棵树的的状态,不管这棵树多大,只须要这个固定大小的数字就能够跟踪整棵树。当消息被建立和被应答的时候都会有相同的消息id发送过来作异或。
每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被彻底处理了。由于消息的随机ID是一个64bit的值,所以ack val在树处理完以前被置为0的几率很是小。假设你每秒钟发送一万个消息,从几率上说,至少须要50,000,000年才会有机会发生一次错误。即便如此,也只有在这个消息确实处理失败的状况下才会有数据的丢失!
Acker任务是轻量级的,因此在拓扑中并不须要太多的acker存在。能够经过Storm UI来观察acker任务的吞吐量,若是看上去吞吐量不够的话,说明须要添加额外的acker。
若是你并不要求每一个消息必须被处理(你容许在处理过程当中丢失一些信息),那么能够关闭消息的可靠处理机制,从而能够获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每一个消息不须要应答了)。另外,关闭消息的可靠处理能够减小消息的大小(不须要每一个tuple记录它的根id了),从而节省带宽。
有三种方法能够关系消息的可靠处理机制:
到如今为止,你们已经理解了Storm的可靠性机制,而且知道了如何选择不一样的可靠性级别来知足需求。接下来咱们研究一下Storm如何保证在各类状况下确保数据不丢失。
本章介绍了storm集群如何实现数据的可靠处理。借助于创新性的tuple tree跟踪技术,storm高效的经过数据的应答机制来保证数据不丢失。
storm集群中除nimbus外,没有单点存在,任何节点均可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要能够及时重启,就不会影响正在运行的任务。
参考连接:
1、storm官方文档:Guaranteeing message processing
Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. This timeout can be configured on a topology-specific basis using theConfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration and defaults to 30 seconds.
If Storm detects that a tuple is fully processed, Storm will call the ack
method on the originating Spout
task with the message id that the Spout
provided to Storm. Likewise, if the tuple times-out Storm will call the fail
method on the Spout
.
Note that a tuple will be acked or failed by the exact same Spout
task that created it. So if a Spout
is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.
There's two things you have to do as a user to benefit from Storm's reliability capabilities. First, you need to tell Storm whenever you're creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm's API provides a concise way of doing both of these tasks.
Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory.
As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies here.
A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. You can set the number of acker tasks for a topology in the topology configuration using Config.TOPOLOGY_ACKERS. Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages.
问:when a tuple is acked in the topology, how does it know to which acker task to send that information?(选 acker task)—— 哈希 spout-tuple-id对应acker
答:Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with.
问:how the acker tasks track which spout tasks are responsible for each spout tuple they're tracking.(选 spout task)—— taskid-tupleid的对应关系
答:When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
问:Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. (tracking algorithm跟踪算法) —— a spout tuple id mapping (task id 到 ack val) \ 异或(XOR)
答:Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs. An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small. It is simply the xor of all tuple ids that have been created and/or acked in the tree.
When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed.
Let's go over all the failure cases and see how in each case Storm avoids data loss:(失败场景,避免数据丢失)—— 超时、从新处理
As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant.
六、调整可靠性:
Acker tasks are lightweight. You can track their performance through the Storm UI (component id "__acker").(据此调整acker数量)
If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires fewer ids to be kept in each downstream tuple, reducing bandwidth usage.(减小带宽占用)
注:There are three ways to remove reliability.(去掉可靠性的三种方式)
The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the ack
method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.(设置ackers为0)
The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the SpoutOutputCollector.emit
method.(发射tuple的时候不指定messageid)
Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked.(不anchor绑定,不跟踪)