Storm的acker机制,可以保证消息至少被处理一次(at least once)。也就是说,可以保证不丢消息。这里就详细解析一下acker的实现原理。算法
假设咱们有一个简单的topology,结构为spout -> bolt。 spout emit了一条消息,发送至bolt。bolt做为最后一个处理者,没有再向下游emit消息。框架
从 上图能够看到,全部的ack消息都会发送到acker,acker会根据算法计算从特定spout发射出来的tuple tree是否被彻底处理。若是成功处理,则发送__acker_ack消息给spout,不然发送__acker_fail消息给spout。而后 spout中能够作相应的逻辑如重发消息等。dom
在JStorm中,acker是一种bolt,所以它的处理、消息发送跟正常的bolt是同样的。只不过,acker是JStorm框架建立的bolt,用户不能自行建立。若是用户在代码中使用:spa
Config.setNumAckers(conf, 1);
就会自动建立并行度为1的acker bolt;若是为0,则就没有acker bolt了。code
acker的算法很是巧妙,它利用了数学上的异或操做来实现对整个tuple tree的判断。在一个topology中的一条消息造成的tuple tree中,全部的消息,都会有一个MessageId,它内部其实就是一个map:orm
Map<Long, Long> _anchorsToIds;
存储的是anchor和anchor value。而anchor其实就是root_id,它在spout中生成,而且一路透传到全部的bolt中,属于同一个tuple tree中的消息都会有相同的root_id,它能够惟一标识spout发出来的这条消息(以及从下游bolt根据这个tuple衍生发出的消息)。数学
下面是一个tuple的ack流程:it
<root_id, random()>
,即为这个root_id对应一个随机数值,而后随着消息自己发送到下游bolt中。假设有2个bolt,生成的随机数对分别为:<root_id, r1>
, <root_id, r2>
。<root_id, r1 ^ r2>
(即全部task产生的随机数列表的异或值)。当前值 ^ 新生成的随机数
。以此类推。咱们以一个稍微复杂一点的topology为例,描述一下它的整个过程。 假设咱们的topology结构为: spout -> bolt1/bolt2 -> bolt3
即spout同时向bolt1和bolt2发送消息,它们处理完后,都向bolt3发送消息。bolt3没有后续处理节点。ast
1). spout发射一条消息,生成root_id,因为这个值不变,咱们就用root_id来标识。 spout -> bolt1的MessageId = <root_id, 1>
spout -> bolt2的MessageId = <root_id, 2>
spout -> acker的MessageId = <root_id, 1^2>
原理
2). bolt1收到消息后,生成以下消息: bolt1 -> bolt3的MessageId = <root_id, 3>
bolt1 -> acker的MessageId = <root_id, 1^3>
3). 一样,bolt2收到消息后,生成以下消息: bolt2 -> bolt3的MessageId = <root_id, 4>
bolt2 -> acker的MessageId = <root_id, 2^4>
4). bolt3收到消息后,生成以下消息: bolt3 -> acker的MessageId = <root_id, 3>
bolt3 -> acker的MessageId = <root_id, 4>
5). acker中总共收到如下消息: <root_id, 1^2>
<root_id, 1^3>
<root_id, 2^4>
<root_id, 3>
<root_id, 4>
全部的值进行异或以后,即为1^2^1^3^2^4^3^4
= 0。