因为 0.10.x 版 Kafka 与 0.8.x 版有很大的变化,这种变化对下游 Storm 有很是大的影响,0.10.x 版的 Kafka 不但增长了权限管理的功能,并且还将 simple 和 high consumer 的 offsets 进行统一管理,也就意味着在 0.8.x 中 Storm 须要去负责管理 offsets,而在 0.10.x 中,Storm 不须要关心 consumer 的 offsets 的问题,这对 KafkaSpout 的设计有很大的影响,本文就是对 Storm 对 0.10.x 版 Kafka 支持的实现
部分的解析。java
社区对新版 Kafka 的支持,整体分为两种状况:git
下面分别对这两种状况进行分析。github
Kafka Consumer 的一些配置会对 Storm 的性能很大影响,下面的三个参数的设置对其性能的影响最大(默认值是根据MICROBENCHMARKING APACHE STORM 1.0 PERFORMANCE测试获得):apache
fetch.min.bytes
:默认值 1;fetch.max.wait.ms
:默认值 500(ms);Kafka Consumer instance poll timeout
, 它能够在经过 KafkaSpoutConfig 的方法 setPollTimeoutMs 来配置,默认值是 200ms;自动 commit 模式就是 commit 的时机由 Consumer 来控制,本质上是异步 commit,当定时达到时,就进行 commit。而 Storm 端并无进行任何记录,也就是这部分的容错彻底由 Consumer 端来控制,而 Consumer 并不会关心数据的处理成功与否,只关心数据是否 commit,若是未 commit,就会从新发送数据,那么就有可能致使下面这个后果:异步
丢失的缘由函数
一些数据发送到 Spout 以后,刚好 commit 的定时到达,进行了 commit,可是这中间有某条或者几条数据处理失败,这就是说,这几条处理失败的数据已经进行 commit 了,Kafka 端也就不会从新进行发送。性能
可能出现的这种后果也肯定了自动 commit 模式不能知足咱们的需求,为了保证数据不丢,须要数据在 Storm 中 ack 以后才能被 commit,所以,commit 仍是应该由 Storm 端来进行控制,才能保证数据被正确处理。测试
当选用非自动的 commit 机制(实际上就是使用 Consumer 的同步 commit 机制)时,须要手动去设置 commit 的参数,有如下两项须要设置:fetch
offset.commit.period.ms
:设置 spout 多久向 Kafka commit一次,在 KafkaSpoutConfig 的 setOffsetCommitPeriodMs 中配置;max.uncommitted.offsets
:控制在下一次拉取数据以前最多能够有多少数据在等待 commit,在 KafkaSpoutConfig 的 setMaxUncommittedOffsets 中配置;关于 Kafka 的几个 offset 的概念,能够参考 offset的一些相关概念spa
KafkaSpout 的处理过程主要是在 nextTuple()
方法,其处理过程以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public void nextTuple() {
if (initialized) {
if (commit()) {// Step1 非自动 commit,而且定时达到
commitOffsetsForAckedTuples();
// 对全部已经 ack 的 msgs 进行 commit
}
if (poll()) {//Step2 拉取的数据都已经发送,而且未 commit 的消息数小于设置的最大 uncommit 数
setWaitingToEmit(pollKafkaBroker());
//将拉取的全部 record 都放到 waitingToEmit 集合中,可能会重复拉取数据(因为一些 msg 须要重试,经过修改 Last Committed Offset 的值来实现的)
}
if (waitingToEmit()) {//Step3 waitingToEmit 中还有数据
emit();
//发送数据,但会跳过已经 ack 或者已经发送的消息
}
}
else {
LOG.debug(
"Spout not initialized. Not sending tuples until initialization completes");
}
}
|
上面主要分为三步:
numUncommittedOffsets
中)小于设置的最大 uncommit 数,那么就根据更新后的 offset (将 offset 重置到须要重试的 msg 的最小 offset,这样该 offset 后面的 msg 仍是会被从新拉取)拉取数据,并将拉取到的数据存储到 waitingToEmit
集合中;waitingToEmit
集合中还有数据,就发送数据,但在发送数据的过程当中,会进行判断,只发送没有 ack 的数据。举个示例,以下图所示
consumer offset
nextTuple()
循环结束以后,offset 为14那条数据处理失败,而offset 为15-18的数据处理成功;也就是说从 offset 为14开始,后面的数据会从新发送。
有人可能会问,那样的话会不会形成数据重复发送?
Storm 是如何解决这个问题的呢?答案就是 Storm 会用一个 map 记录已经 ack 的数据(acked
),Storm 在进行 commit 的时候也是根据这个 map 的数据进行 commit 的,不过 commit 数据的 offset 必须是连续的,如上图所示,只能将 offset 为11-13的数据 commit,而15-18的数据因为 offset 为14的数据未处理成功而不能 commit。offset 为11-13的数据在 commit 成功后会从 map 中移除,而 offset 为15-18的数据依然在 map 中,Storm 在将从 Kafka 拉取的数据加入到 waitingToEmit
集合时后,进行 emit 数据时,会先检测该数据是否存在 acked
中,若是存在的话,就证实该条数据已经处理过了,不会在进行发送。
这里有几点须要注意的:
这样设计有一个反作用就是:若是有一个 msg 一直不成功,就会致使 KafkaSpout 由于这一条数据的影响而不断地重复拉取这批数据,形成整个拓扑卡在这里。
Kafka Rebalance 能够参考Consumer Rebalance.
KafkaSpout 实现了一个内部类用来监控 Group Rebalance 的状况,实现了两个回调函数,一旦发现 group 的状态变为 preparingRabalance
以后
onPartitionsRevoked
这个方法会在 Consumer 中止拉取数据以后、group 进行 rebalance 操做以前调用,做用是对已经 ack 的 msg 进行 commit;onPartitionsAssigned
这个方法 group 已经进行 reassignment 以后,开始拉取数据以前调用,做用是清理内存中不属于这个线程的 msg、获取 partition 的 last committed offset。这部分仍是有可能致使数据重复发送的,设想下面一种状况:
若是以前因为一个条消息处理失败(Partition 1),形成部分数据没有 commit 成功,在进行 rebalance 后,刚好 Partition 1 被分配到其余 spout 线程时,那么当前的 spout 就会关于 Partition 1 的相关数据删除掉,致使部分已经 commit 成功的数据(记录在 acked 中)被删除,而另外的 spout 就会从新拉取这部分数据进行处理,那么就会致使这部分已经成功处理的数据重复处理。