storm入门 第四章 消息的可靠处理

4.1 简介

storm能够确保spout发送出来的每一个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。 html

4.2 理解消息被完整处理

一个消息(tuple)从spout发送出来,可能会致使成百上千的消息基于此消息被建立。 git

咱们来思考一下流式的“单词统计”的例子: github

storm任务从数据源(Kestrel queue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每一个单词以及它出现过的次数。 算法

本例中,每一个从spout发送出来的消息(每一个英文句子)都会触发不少的消息被建立,那些从句子中分隔出来的单词就是被建立出来的新消息。 并发

这些消息构成一个树状结构,咱们称之为“tuple tree”,看起来如图1所示: 性能

图1 示例tuple tree spa

在什么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被知足: 设计

  • tuple tree再也不生长
  • 树中的任何消息被标识为“已处理”

若是在指定的时间内,一个消息衍生出来的tuple tree未被彻底处理成功,则认为此消息未被完整处理。这个超时值能够经过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为30秒。 orm

4.3 消息的生命周期

若是消息被完整处理或者未被完整处理,Storm会如何进行接下来的操做呢?为了弄清这个问题,咱们来研究一下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口: htm

首先, Storm使用spout实例的nextTuple()方法从spout请求一个消息(tuple)。 收到请求之后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,Spout会给这个消息提供一个message ID,它将会被用来标识这个消息。

假设咱们从kestrel队列中读取消息,Spout会将kestrel 队列为这个消息设置的ID做为此消息的message ID。 向SpoutOutputCollector中发送消息格式以下:

接来下,这些消息会被发送到后续业务处理的bolts, 而且Storm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,Storm会调用Spout中的ack方法,并将此消息的messageID做为参数传入。同理,若是某消息处理超时,则此消息对应的Spout的fail方法会被调用,调用时此消息的messageID会被做为参数传入。

注意:一个消息只会由发送它的那个spout任务来调用ack或fail。若是系统中某个spout由多个任务运行,消息也只会由建立它的spout任务来应答(ack或fail),决不会由其余的spout任务来应答。

咱们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout须要作些什么(假设这个spout的名字是KestrelSpout)。

咱们先简述一下kestrel消息队列:

当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答之后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其余的客户端看到。另外,若是一个客户端意外的断开链接,则由此客户端“打开”的全部消息都会被从新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个惟一的标识。

KestrelSpout就是使用这个惟一的标识做为这个tuple的messageID的。稍后当ack或fail被调用的时候,KestrelSpout会把ack或者fail连同messageID一块儿发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它从新放回队列中。

4.4 可靠相关的API

为了使用Storm提供的可靠处理特性,咱们须要作两件事情:

  1. 不管什么时候在tuple tree中建立了一个新的节点,咱们须要明确的通知Storm;
  2. 当处理完一个单独的消息时,咱们须要告诉Storm 这棵tuple tree的变化状态。

经过上面的两步,storm就能够检测到一个tuple tree什么时候被彻底处理了,而且会调用相关的ack或fail方法。Storm提供了简单明了的方法来完成上述两步。

为tuple tree中指定的节点增长一个新的节点,咱们称之为锚定(anchoring)。锚定是在咱们发送消息的同时进行的。为了更容易说明问题,咱们使用下面代码做为例子。本示例的bolt将包含整句话的消息分解为一系列的子消息,每一个子消息包含一个单词。

每一个消息都经过这种方式被锚定:把输入消息做为emit方法的第一个参数。由于word消息被锚定在了输入消息上,这个输入消息是spout发送过来的tuple tree的根节点,若是任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被从新发送。

与此相反,咱们来看看使用下面的方式emit消息时,Storm会如何处理:

若是以这种方式发送消息,将会致使这个消息不会被锚定。若是此tuple tree中的消息处理失败,派生此tuple tree的根消息不会被从新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。

一个输出消息能够被锚定在一个或者多个输入消息上,这在作join或聚合的时候是颇有用的。一个被多重锚定的消息处理失败,会致使与之关联的多个spout消息被从新发送。多重锚定经过在emit方法中指定多个输入消息来实现:

多重锚定会将被锚定的消息加到多棵tuple tree上。

注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:

图2 多重锚定构成的钻石型结构

Storm的实现能够像处理树那样来处理DAGs。

锚定代表了如何将一个消息加入到指定的tuple tree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时咱们该作些什么。这些是经过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,能够发现当全部的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。

每一个被处理的消息必须代表成功或失败(acked 或者failed)。Storm是使用内存来跟踪每一个消息的处理状况的,若是被处理的消息没有应答的话,早晚内存会被耗尽!>

不少bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。通常的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可使用BasicBolt来重写:

使用这种方式,代码比以前稍微简单了一些,可是实现的功能是同样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,而且,当execute执行完毕的时候,会自动的应答输入消息。

不少状况下,一个消息须要延迟应答,例如聚合或者是join。只有根据一组输入消息获得一个结果以后,才会应答以前全部的输入消息。而且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。

4.5 高效的实现tuple tree

Storm 系统中有一组叫作“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每一个消息。每当发现一个DAG被彻底处理,它就向建立这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度能够经过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提升acker任务的并发度。

为了理解Storm可靠性处理机制,咱们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被建立的时候(不管是在spout仍是bolt中),系统都为该消息分配一个64bit的随机值做为id。这些随机的id是acker用来跟踪由spout消息派生出来的tuple tree的。

每一个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tuple tree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,可是我派生出了一些新的消息,帮忙跟踪一下吧。

举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,tuple tree是如何变化的。

由于在C被从树中移除的同时D和E会被加入到tuple tree中,所以tuple tree不会被过早的认为已彻底处理。

关于Storm如何跟踪tuple tree,咱们再深刻的探讨一下。前面说过系统中能够有任意个数的acker,那么,每当一个消息被建立或应答的时候,它怎么知道应该通知哪一个acker呢?

系统使用一种哈希算法来根据spout消息的messageId肯定由哪一个acker跟踪此消息派生出来的tuple tree。由于每一个消息都知道与之对应的根消息的messageId,所以它知道应该与哪一个acker通讯。

当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会建立一个新的tuple tree。当acker发现这棵树被彻底处理以后,他就会通知对应的spout任务。

tuple是如何被跟踪的呢?系统中有成千上万的消息,若是为每一个spout发送的消息都构建一棵树的话,很快内存就会耗尽。因此,必须采用不一样的策略来跟踪每一个消息。因为使用了新的跟踪算法,Storm只须要固定的内存(大约20字节)就能够跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。

acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,经过这个id,acker就知道消息处理完成时该通知哪一个spout任务。第二个值是一个64bit的数字,咱们称之为“ack val”, 它是树中全部消息的随机id的异或结果。ack val表示了整棵树的的状态,不管这棵树多大,只须要这个固定大小的数字就能够跟踪整棵树。当消息被建立和被应答的时候都会有相同的消息id发送过来作异或。

每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被彻底处理了。由于消息的随机ID是一个64bit的值,所以ack val在树处理完以前被置为0的几率很是小。假设你每秒钟发送一万个消息,从几率上说,至少须要50,000,000年才会有机会发生一次错误。即便如此,也只有在这个消息确实处理失败的状况下才会有数据的丢失!

4.6 选择合适的可靠性级别

Acker任务是轻量级的,因此在拓扑中并不须要太多的acker存在。能够经过Storm UI来观察acker任务的吞吐量,若是看上去吞吐量不够的话,说明须要添加额外的acker。

若是你并不要求每一个消息必须被处理(你容许在处理过程当中丢失一些信息),那么能够关闭消息的可靠处理机制,从而能够获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每一个消息不须要应答了)。另外,关闭消息的可靠处理能够减小消息的大小(不须要每一个tuple记录它的根id了),从而节省带宽。

有三种方法能够关系消息的可靠处理机制:

  • 将参数Config.TOPOLOGY_ACKERS设置为0,经过此方法,当Spout发送一个消息的时候,它的ack方法将马上被调用;
  • 第二个方法是Spout发送一个消息时,不指定此消息的messageID。当须要关闭特定消息可靠性的时候,可使用此方法;
  • 最后,若是你不在乎某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要作锚定,即在emit方法中不指定输入消息。由于这些子孙消息没有被锚定在任何tuple tree中,所以他们的失败不会引发任何spout从新发送消息。

4.7 集群的各级容错

到如今为止,你们已经理解了Storm的可靠性机制,而且知道了如何选择不一样的可靠性级别来知足需求。接下来咱们研究一下Storm如何保证在各类状况下确保数据不丢失。

3.7.1 任务级失败

  • 由于bolt任务crash引发的消息未被应答。此时,acker中全部与此bolt任务关联的消息都会由于超时而失败,对应spout的fail方法将被调用。
  • acker任务失败。若是acker任务自己失败了,它在失败以前持有的全部消息都将会由于超时而失败。Spout的fail方法将被调用。
  • Spout任务失败。这种状况下,Spout任务对接的外部设备(如MQ)负责消息的完整性。例如当客户端异常的状况下,kestrel队列会将处于pending状态的全部的消息从新放回到队列中。

4.7.2  任务槽(slot) 故障

  • worker失败。每一个worker中包含数个bolt(或spout)任务。supervisor负责监控这些任务,当worker失败后,supervisor会尝试在本机重启它。
  • supervisor失败。supervisor是无状态的,所以supervisor的失败不会影响当前正在运行的任务,只要及时的将它从新启动便可。supervisor不是自举的,须要外部监控来及时重启。
  • nimbus失败。nimbus是无状态的,所以nimbus的失败不会影响当前正在运行的任务(nimbus失败时,没法提交新的任务),只要及时的将它从新启动便可。nimbus不是自举的,须要外部监控来及时重启。

4.7.3.  集群节点(机器)故障

  • storm集群中的节点故障。此时nimbus会将此机器上全部正在运行的任务转移到其余可用的机器上运行。
  • zookeeper集群中的节点故障。zookeeper保证少于半数的机器宕机仍可正常运行,及时修复故障机器便可。

4.8 小结

本章介绍了storm集群如何实现数据的可靠处理。借助于创新性的tuple tree跟踪技术,storm高效的经过数据的应答机制来保证数据不丢失。

storm集群中除nimbus外,没有单点存在,任何节点均可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要能够及时重启,就不会影响正在运行的任务。

相关文章
相关标签/搜索