Storm1.1.1 对 0.10.x 版 Kafka之commit offsets

因为 0.10.x 版 Kafka 与 0.8.x 版有很大的变化,这种变化对下游 Storm 有很是大的影响,0.10.x 版的 Kafka 不但增长了权限管理的功能,并且还将 simple 和 high consumer 的 offsets 进行统一管理,也就意味着在 0.8.x 中 Storm 须要去负责管理 offsets,而在 0.10.x 中,Storm 不须要关心 consumer 的 offsets 的问题,这对 KafkaSpout 的设计有很大的影响,本文就是对 Storm 对 0.10.x 版 Kafka 支持的实现部分的解析。java

0.10.x 版 KafkaSpout 的实现

社区对新版 Kafka 的支持,整体分为两种状况:git

  1. 一种是选择自动 commit 机制;
  2. 另外一种是非自动 commit,就是将 commit 的权利交与 Storm 来控制。

下面分别对这两种状况进行分析。github

Kafka Consumer 的一些配置会对 Storm 的性能很大影响,下面的三个参数的设置对其性能的影响最大(默认值是根据MICROBENCHMARKING APACHE STORM 1.0 PERFORMANCE测试获得):apache

  • fetch.min.bytes:默认值 1;
  • fetch.max.wait.ms:默认值 500(ms);
  • Kafka Consumer instance poll timeout, 它能够在经过 KafkaSpoutConfig 的方法 setPollTimeoutMs 来配置,默认值是 200ms;

自动 commit 模式

自动 commit 模式就是 commit 的时机由 Consumer 来控制,本质上是异步 commit,当定时达到时,就进行 commit。而 Storm 端并无进行任何记录,也就是这部分的容错彻底由 Consumer 端来控制,而 Consumer 并不会关心数据的处理成功与否,只关心数据是否 commit,若是未 commit,就会从新发送数据,那么就有可能致使下面这个后果:异步

形成那些已经 commit、但 Storm 端处理失败的数据丢失

丢失的缘由函数

一些数据发送到 Spout 以后,刚好 commit 的定时到达,进行了 commit,可是这中间有某条或者几条数据处理失败,这就是说,这几条处理失败的数据已经进行 commit 了,Kafka 端也就不会从新进行发送。性能

可能出现的这种后果也肯定了自动 commit 模式不能知足咱们的需求,为了保证数据不丢,须要数据在 Storm 中 ack 以后才能被 commit,所以,commit 仍是应该由 Storm 端来进行控制,才能保证数据被正确处理。测试

非自动 commit 模式

当选用非自动的 commit 机制(实际上就是使用 Consumer 的同步 commit 机制)时,须要手动去设置 commit 的参数,有如下两项须要设置:fetch

  • offset.commit.period.ms:设置 spout 多久向 Kafka commit一次,在 KafkaSpoutConfig 的 setOffsetCommitPeriodMs 中配置;
  • max.uncommitted.offsets:控制在下一次拉取数据以前最多能够有多少数据在等待 commit,在 KafkaSpoutConfig 的 setMaxUncommittedOffsets 中配置;

spout 的处理过程

关于 Kafka 的几个 offset 的概念,能够参考 offset的一些相关概念spa

KafkaSpout 的处理过程主要是在 nextTuple() 方法,其处理过程以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void nextTuple() {
  if (initialized) {
    if (commit()) {// Step1 非自动 commit,而且定时达到
      commitOffsetsForAckedTuples(); // 对全部已经 ack 的 msgs 进行 commit
    }
 
    if (poll()) {//Step2 拉取的数据都已经发送,而且未 commit 的消息数小于设置的最大 uncommit 数
      setWaitingToEmit(pollKafkaBroker());
      //将拉取的全部 record 都放到 waitingToEmit 集合中,可能会重复拉取数据(因为一些 msg 须要重试,经过修改 Last Committed Offset 的值来实现的)
    }
 
    if (waitingToEmit()) {//Step3 waitingToEmit 中还有数据
      emit(); //发送数据,但会跳过已经 ack 或者已经发送的消息
    }
  } else {
    LOG.debug( "Spout not initialized. Not sending tuples until initialization completes");
  }
}

上面主要分为三步:

  1. 若是是非自动 commit,而且 commit 定时达到,那么就将全部已经 ack 的数据(这些数据的 offset 必须是连续的,不连续的数据不会进行 commit)进行 commit;
  2. 若是拉取的数据都已经发送,而且未 commit 的消息数(记录在 numUncommittedOffsets 中)小于设置的最大 uncommit 数,那么就根据更新后的 offset (将 offset 重置到须要重试的 msg 的最小 offset,这样该 offset 后面的 msg 仍是会被从新拉取)拉取数据,并将拉取到的数据存储到 waitingToEmit 集合中;
  3. 若是 waitingToEmit 集合中还有数据,就发送数据,但在发送数据的过程当中,会进行判断,只发送没有 ack 的数据。

KafkaSpout 如何进行容错

举个示例,以下图所示

consumer offset

  1. 图1表示一个 nextTuple() 循环结束以后,offset 为14那条数据处理失败,而offset 为15-18的数据处理成功;
  2. 图2表示在下次循环 Step 1 结束以后、Step 2 开始以前,Consumer 会将 the last committed offset 重置到 offset 为14的位置。

也就是说从 offset 为14开始,后面的数据会从新发送。

有人可能会问,那样的话会不会形成数据重复发送?

Storm 是如何解决这个问题的呢?答案就是 Storm 会用一个 map 记录已经 ack 的数据(acked),Storm 在进行 commit 的时候也是根据这个 map 的数据进行 commit 的,不过 commit 数据的 offset 必须是连续的,如上图所示,只能将 offset 为11-13的数据 commit,而15-18的数据因为 offset 为14的数据未处理成功而不能 commit。offset 为11-13的数据在 commit 成功后会从 map 中移除,而 offset 为15-18的数据依然在 map 中,Storm 在将从 Kafka 拉取的数据加入到 waitingToEmit 集合时后,进行 emit 数据时,会先检测该数据是否存在 acked 中,若是存在的话,就证实该条数据已经处理过了,不会在进行发送。

这里有几点须要注意的:

  1. 对已经 ack 的 msg 进行 commit 时,所 commit 的 msg 的 offset 必须是连续的(该 msg 存储在一个 TreeMap 中,按 offset 排序),断续的数据会暂时接着保存在集合中,不会进行 commit,若是出现断续,那就证实中间有数据处理失败,须要从新处理;
  2. storm 处理 failed 的 msg,会保存到一个专门的集合中,在每次拉取数据时(是拉取数据,不是发送数据,发送数据时会检测该数据是否已经成功处理),会遍历该集合中包含的全部 TopicPartiion,获取该 partition 的 Last Committed Offset;

这样设计有一个反作用就是:若是有一个 msg 一直不成功,就会致使 KafkaSpout 由于这一条数据的影响而不断地重复拉取这批数据,形成整个拓扑卡在这里。

Kafka Rebalance 的影响

Kafka Rebalance 能够参考Consumer Rebalance.

KafkaSpout 实现了一个内部类用来监控 Group Rebalance 的状况,实现了两个回调函数,一旦发现 group 的状态变为 preparingRabalance 以后

  1. onPartitionsRevoked 这个方法会在 Consumer 中止拉取数据以后、group 进行 rebalance 操做以前调用,做用是对已经 ack 的 msg 进行 commit;
  2. onPartitionsAssigned 这个方法 group 已经进行 reassignment 以后,开始拉取数据以前调用,做用是清理内存中不属于这个线程的 msg、获取 partition 的 last committed offset。

潜在的风险点

这部分仍是有可能致使数据重复发送的,设想下面一种状况:

若是以前因为一个条消息处理失败(Partition 1),形成部分数据没有 commit 成功,在进行 rebalance 后,刚好 Partition 1 被分配到其余 spout 线程时,那么当前的 spout 就会关于 Partition 1 的相关数据删除掉,致使部分已经 commit 成功的数据(记录在 acked 中)被删除,而另外的 spout 就会从新拉取这部分数据进行处理,那么就会致使这部分已经成功处理的数据重复处理

相关文章
相关标签/搜索