Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是否是以为本身会用ack了。好吧,那就让我开始啪啪打大家脸吧。mysql

先说一下ACK机制:redis

  为了保证数据能正确的被处理, 对于spout产生的每个tuple, storm都会进行跟踪。算法

  这里面涉及到ack/fail的处理,若是一个tuple处理成功是指这个Tuple以及这个Tuple产生的全部Tuple都被成功处理, 会调用spout的ack方法;sql

  若是失败是指这个Tuple或这个Tuple产生的全部Tuple中的某一个tuple处理失败, 则会调用spout的fail方法;数据库

  在处理tuple的每个bolt都会经过OutputCollector来告知storm, 当前bolt处理是否成功。api

  另外须要注意的,当spout触发fail动做时,不会自动重发失败的tuple,须要咱们在spout中从新获取发送失败数据,手动从新再发送一次。缓存

Ack原理
  Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每个Tuple的Tuple树(由于一个tuple经过spout发出了,通过每个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。
Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只须要恒定的20字节就能够进行跟踪。
Acker跟踪算法的原理:acker对于每一个spout-tuple保存一个ack-val的校验值,它的初始值是0,而后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,而且把获得的值更新为ack-val的新值。那么假设每一个发射出去的Tuple都被ack了,那么最后ack-val的值就必定是0。Acker就根据ack-val是否为0来判断是否彻底处理,若是为0则认为已彻底处理。
框架

要实现ack机制:
1,spout发射tuple的时候指定messageId
2,spout要重写BaseRichSpout的fail和ack方法
3,spout对发射的tuple进行缓存(不然spout的fail方法收到acker发来的messsageId,spout也没法获取到发送失败的数据进行重发),看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg貌似须要本身cache,而后用这个msgId去查询,太坑爹了
3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple能够选择重发。
4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);
dom

Storm的Bolt有BsicBolt和RichBolt:
  在BasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
  使用RichBolt须要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);而且须要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
分布式

由一个tuple产生一个新的tuple称为:anchoring,你发射一个tuple的同时也就完成了一次anchoring。

  ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理;在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动做,即认为该tuple处理失败,timeout时间能够经过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。或者收到Acker发送的fail响应tuple,也认为失败,触发fail动做

  注意,我开始觉得若是继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,可是我错了,程序直接异常中止了

  这里我以分布式程序入门案例worldcount为例子吧。请看下面大屏幕:没有错我就是那个大家走在路上常常听见的名字刘洋。

  这里spout1-1task发送句子"i am liu yang"给bolt2-2task进行处理,该task把句子切分为单词,根据字段分发到下一个bolt中,bolt2-2,bolt4-4,bolt5-5对每个单词添加一个后缀1后再发送给下一个bolt进行存储到数据库的操做,这个时候bolt7-7task在存储数据到数据库时失败,向spout发送fail响应,这个时候spout收到消息就会再次发送的该数据。

  好,那么我思考一个问题:spout如何保证再次发送的数据就是以前失败的数据,因此在spout实例中,绝对要定义一个map缓存,缓存发出去的每一条数据,key固然就是messageId,当spout实例收到全部bolt的响应后若是是ack,就会调用咱们重写的ack方法,在这个方法里面咱们就要根据messageId删除这个key-value,若是spout实例收到全部bolt响应后,发现是faile,则会调用咱们重写的fail方法,根据messageId查询到对应的数据再次发送该数据出去。

spout代码以下

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L; // key:messageId,Data private HashMap<String, String> waitAck = new HashMap<String, String>(); private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { String sentence = "i am liu yang"; String messageId = UUID.randomUUID().toString().replaceAll("-", ""); waitAck.put(messageId, sentence); //指定messageId,开启ackfail机制 collector.emit(new Values(sentence), messageId); } @Override public void ack(Object msgId) { System.out.println("消息处理成功:" + msgId); System.out.println("删除缓存中的数据..."); waitAck.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("消息处理失败:" + msgId); System.out.println("从新发送失败的信息..."); //重发若是不开启ackfail机制,那么spout的map对象中的该数据不会被删除的。 collector.emit(new Values(waitAck.get(msgId)),msgId); } }

 虽然在storm项目中咱们的spout源一般来源kafka,并且咱们使用storm提供的工具类KafkaSpout类,其实这个类里面就维护者<messageId,Tuple>对的集合。

Storm怎么处理重复的tuple?
  由于Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并从新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。由于实时计算一般并不要求很高的精确度,后续的批处理计算会更正实时计算的偏差。
(2)使用第三方集中存储来过滤,好比利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloom filter作过滤,简单高效。

问题一:大家有没有想过若是某一个task节点处理的tuple一直失败,消息一直重发会怎么样?

  咱们都知道,spout做为消息的发送源,在没有收到该tuple来至左右bolt的返回信息前,是不会删除的,那么若是消息一直失败,就会致使spout节点存储的tuple数据愈来愈多,致使内存溢出。

问题二:有没有想过,若是该tuple的众多子tuple中,某一个子tuple处理failed了,可是另外的子tuple仍然会继续执行,若是子tuple都是执行数据存储操做,那么就算整个消息失败,那些生成的子tuple仍是会成功执行而不会回滚的。

  这个时候storm的原生api是没法支持这种事务性操做,咱们可使用storm提供的高级api-trident来作到(具体如何我不清楚,目前没有研究它,可是我能够它内部必定是根据分布式协议好比两阶段提交协议等)。向这种业务中要保证事务性功能,咱们彻底能够根据咱们自身的业务来作到,好比这里的入库操做,咱们先记录该消息是否已经入库的状态,再入库时查询状态来决定是否给予执行。

问题三:tuple的追踪并不必定要是从spout结点到最后一个bolt,只要是spout开始,能够在任意层次bolt中止追踪作出应答。

Acker task 组件来设置一个topology里面的acker的数量,默认值是一,若是你的topoogy里面的tuple比较多的话,那么请把acker的数量设置多一点,效率会更高一点。

调整可靠性 
acker task是很是轻量级的, 因此一个topology里面不须要不少acker。你能够经过Strom UI(id: -1)来跟踪它的性能。 若是它的吞吐量看起来不正常,那么你就须要多加点acker了。

若是可靠性对你来讲不是那么重要 — 你不太在乎在一些失败的状况下损失一些数据, 那么你能够经过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减小一半, 由于对于每个tuple都要发送一个ack消息。而且它须要更少的id来保存下游的tuple 减小带宽占用。
有三种方法能够去掉可靠性。
第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种状况下, storm会在spout发射一个tuple以后立刻调用spoutack方法。也就是说这个tuple树不会被跟踪。
第二个方法是在tuple层面去掉可靠性。 你能够在发射tuple的时候不指定messageid来达到不跟粽某个特定的spout tuple的目的。
最后一个方法是若是你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么能够在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面, 也就不会被跟踪了。

可靠性配置

有三种方法能够去掉消息的可靠性:

将参数Config.TOPOLOGY_ACKERS设置为0,经过此方法,当Spout发送一个消息的时候,它的ack方法将马上被调用;

Spout发送一个消息时,不指定此消息的messageID。当须要关闭特定消息可靠性的时候,可使用此方法;

最后,若是你不在乎某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要作锚定,即在emit方法中不指定输入消息。由于这些子孙消息没有被锚定在任何tuple tree中,所以他们的失败不会引发任何spout从新发送消息。

如何关闭Ack机制

2种途径

spout发送数据是不带上msgid

设置acker数等于0

值得注意的一点是Storm调用Ack或者fail的task始终是产生这个tuple的那个task,因此若是一个Spout,被分为不少个task来执行,消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

做为Storm的使用者,有两件事情要作以更好的利用Storm的可靠性特征,首先你在生成一个tuple的时候要通知Storm,其次,彻底处理一个tuple以后要通知Storm,这样Storm就能够检测到整个tuple树有没有完成处理,而且通知源Spout处理结果。

1 因为对应的task挂掉了,一个tuple没有被Ack:

Storm的超时机制在超时以后会把这个tuple标记为失败,从而能够从新处理。

2 Acker挂掉了: 在这种状况下,由这个Acker所跟踪的全部spout tuple都会出现超时,也会被从新的处理。

3 Spout 挂掉了:在这种状况下给Spout发送消息的消息源负责从新发送这些消息。

三个基本的机制,保证了Storm的彻底分布式,可伸缩的而且高度容错的。

另外Ack机制还经常使用于限流做用 为了不spout发送数据太快,而bolt处理太慢,经常设置pending数,当spout有等于或超过pending数的tuple没有收到ackfail响应时,跳过执行nextTuple, 从而限制spout发送数据。

经过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

相关文章
相关标签/搜索