Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"html
Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.java
Storm默认的reliable特性支持at least once processing guarantee.
这个在某些场景下明显是不够的, 好比计数, 不断的replay必然致使计数不许, 那么须要支持exactly once semantics.git
Storm 0.7就提供transactional topology特性来支持, 其实这个和DRPC同样, Storm只是提供一种特殊的topology的封装, 固然transactional topology更复杂.github
这里说transactional topologies为了提供strong ordering, 这个要求是要强于以前说的exactly once semantics.数据库
对于每一个transaction有惟一的transaction id来标识, 对于第一种design, 每一个transaction就是一个tuple
拿计数做为例子, 每一个tuple产生的number, 最终须要累加到数据库里面
不使用transactional, 重复replay一个tuple, 必然会致使该tuple的number被反复累加到数据库apache
怎么处理? 其实想法很简单, 引入transaction的概念, 并在累加number到数据库的同时记下该transactioin id.
这样若是replay该tuple, 只须要对比transaction id就知道该transaction已经累加过, 能够直接ignore缓存
看到这里, 就知道保持strong ordering的重要性, 强顺序意味着, 若是当前的transaction失败, 会反复被replay, 直到成功才继续下一个transaction.
这意味着, 在数据库咱们只须要记录latest的transaction id, 而不是累加过的全部transaction id, 实现上会简单许多.并发
可是design1的问题是效率过低, 彻底线性的处理tuple, 没法利用storm的并发能力, 并且数据库的负载很高, 每一个tuple都须要去操做数据库app
The core idea behind transactional topologies is to provide a strong ordering on the processing of data.
The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.jvm
Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id 1
, the second id 2
, and so on.
There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to be completely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.
Design2的想法很简单, 用batch tuple来做为transaction的单位, 而不是一个tuple.
这样带来的好处是, batch内部的tuple能够实现并行, 而且以batch为单位去更新数据库, 大大减小数据库负载.
但本质上和Design1没有区别, batch之间仍然是串行的, 因此效率仍然比较低
Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction.
So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed.
Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:
这个设计体现出storm的创意, 将topology的过程分为processing和commit, processing就是进行局部的计算和统计, 只有commit时才会把计算的结果更新到全局数据集(数据库)
那么对于processing阶段彻底没有必要限制, 只要保证在commit的时候按照顺序一个个commit就ok.
好比对于计数, 不一样的batch的局部计数过程没有任何限制, 能够彻底并行的完成, 可是当须要将计数结果累加到数据库的时候, 就须要用transaction来保证只被累加一次
processing和commit阶段合称为transaction, 任何阶段的失败都会replay整个transaction
A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:
Computing the partial count for the batch
Updating the global count in the database with the partial count
The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able to pipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.
Storm accomplishes this distinction by breaking the computation of a batch into two phases:
The processing phase: this is the phase that can be done in parallel for many batches
The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful.
The two phases together are called a "transaction".
Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase.
If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).
为了实现上面的Design3, storm在transactional topologies里面默默的作了不少事
管理状态, 经过Zookeeper去记录全部transaction相关的状态信息
协调transactions, 决定应该执行那个transaction的那个阶段
Fault 检测, 使用storm acker机制来detect batch是否被成功执行, 而且storm在transactional topology上 对acker机制作了比较大的优化, 用户不用本身去acking或anchoring, 方便许多
提供batch bolt接口, 在bolt接口中提升对batch的支持, 好比提供finishbatch接口
最后, transactional topology要求source queue具备replay an exact batch的能力, 这儿说kafka是很好的选择
不过我很好奇, 为何要由source queue来提供batch replay的功能, 好的设计应该是batch对source queue透明, spout自身控制batch的划分和replay, 这样不能够吗?
When using transactional topologies, Storm does the following for you:
Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies.
This includes the current transaction id as well as the metadata defining the parameters for each batch.
Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point.
Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed.
Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples.
Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction.
Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).
Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka in storm-contrib contains a transactional spout implementation for Kafka.
You build transactional topologies by using TransactionalTopologyBuilder. Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from TransactionalGlobalCount in storm-starter.
MemoryTransactionalSpout spout = MemoryTransactionalSpout(DATA, Fields(""), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = TransactionalTopologyBuilder("", "", spout, 3); builder.setBolt("", BatchCount(), 5) .shuffleGrouping(""); builder.setBolt("", UpdateGlobalCount()) .globalGrouping("");
首先须要使用TransactionalSpout, MemoryTransactionalSpout
被用来从一个内存变量里面读取数据(DATA), 第二个参数制定数据的fields, 第三个参数指定每一个batch的最大tuple数量
接着, 须要使用TransactionalTopologyBuilder, 其余和普通的topology看上去没有不一样, storm的封装作的很好
下面经过processing和commit阶段的bolt来了解对batch和transaction的支持
首先看看BatchCount, processing阶段的bolt, 用于统计局部的tuple数目
BatchCount BaseBatchBolt { Object _id; BatchOutputCollector _collector; _count = 0; @Override prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override execute(Tuple tuple) { _count++; } @Override finishBatch() { _collector.emit( Values(_id, _count)); } @Override declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( Fields("", "")); } }
BatchCount继承自BaseBatchBolt, 代表其对batch的支持, 主要反应在finishBatch函数, 而普通的bolt的不一样在于, 只有在finishBatch的时候才会去emit结果, 而不是每次execute都emit结果
在prepare时, 多出个id, a TransactionAttempt object, 而且从output定义看出, 全部emit的tuple第一个参数必须是id(TransactionAttempt
)
The
TransactionAttempt
contains two values: the "transaction id" and the "attempt id"(表示被replay次数).
The "transaction id" is the unique id chosen for this batch and is the same no matter how many times the batch is replayed.
The "attempt id" is a unique id for this particular batch of tuples and lets Storm distinguish tuples from different emissions of the same batch. Without the attempt id, Storm could confuse a replay of a batch with tuples from a prior time that batch was emitted.
All tuples emitted within a transactional topology must have the TransactionAttempt
as the first field of the tuple. This lets Storm identify which tuples belong to which batches. So when you emit tuples you need to make sure to meet this requirement.
其实这里的BaseBatchBolt, 是通用的batch基类, 也能够用于其余的须要batch支持的场景, 好比DRPC, 只不过此处的id类型变为RPC id
若是只是要support tansactional topology场景, 能够直接使用BaseTransactionalBolt
BaseTransactionalBolt BaseBatchBolt<TransactionAttempt> { }
继续看, commit阶段的bolt, UpdateGlobalCount, 将统计的结果累加到全局数据库中
UpdateGlobalCount之间继承自BaseTransactionalBolt, 因此此处prepare的参数直接是TransactionAttempt attempt(而不是object id)
而且比较重要的是实现ICommitter接口, 代表这个bolt是个commiter, 意味着这个blot的finishBatch函数须要在commit阶段被调用
另外一种把bolt标识为committer的方法是, 在topology build的时候使用setCommitterBolt来替代setBolt
First, notice that this bolt implements the ICommitter
interface. This tells Storm that the finishBatch
method of this bolt should be part of the commit phase of the transaction.
So calls to finishBatch
for this bolt will be strongly ordered by transaction id (calls to execute
on the other hand can happen during either the processing or commit phases).
An alternative way to mark a bolt as a committer is to use the setCommitterBolt
method in TransactionalTopologyBuilder
instead of setBolt
.
UpdateGlobalCount BaseTransactionalBolt ICommitter { TransactionAttempt _attempt; BatchOutputCollector _collector; _sum = 0; @Override prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override execute(Tuple tuple) { _sum+=tuple.getInteger(1); } @Override finishBatch() { Value val = DATABASE.get(GLOBAL_COUNT_KEY); Value newval; (val == || !val.txid.equals(_attempt.getTransactionId())) { newval = Value(); newval.txid = _attempt.getTransactionId(); (val==) { newval.count = _sum; } { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); } { newval = val; } _collector.emit( Values(_attempt, newval.count)); } @Override declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( Fields("", "")); } }
storm会保证commiter里面的finishBatch被顺序执行, 而且在finishBatch里面, 须要check transaction id, 确保只有新的transaction的结果才被更新到全局数据库.
The code for finishBatch
in UpdateGlobalCount
gets the current value from the database and compares its transaction id to the transaction id for this batch. If they are the same, it does nothing. Otherwise, it increments the value in the database by the partial count for this batch.
A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the TransactionalWords class.
There are three kinds of bolts possible in a transactional topology:
BasicBolt: This bolt doesn't deal with batches of tuples and just emits tuples based on a single tuple of input.
BatchBolt: This bolt processes batches of tuples. execute
is called for each tuple, and finishBatch
is called when the batch is complete.
BatchBolt's that are marked as committers: The only difference between this bolt and a regular batch bolt is when finishBatch
is called. A committer bolt has finishedBatch
called during the commit phase. The commit phase is guaranteed to occur only after all prior batches have successfully committed, and it will be retried until all bolts in the topology succeed the commit for the batch.
上面列出可能遇到的3种bolt, 下面的例子给出不一样blot的区别,
红线标出的都是commiter, 这里有两个commiter, 分别是B和D
A的输出分别输出到B和C
B能够先执行execute(processing), 但不能直接执行finishBatch, 由于须要等待storm调度, 必须等前面的batch commit完后, 才能进行commit
因此C也没法马上执行finishBatch, 由于须要等从B过来的tuple
对于D, 原文说它会在commit阶段接收全部的batch tuple, 因此能够直接commit, 这个怎么保证?
Notice that even though Bolt D is a committer, it doesn't have to wait for a second commit message when it receives the whole batch. Since it receives the whole batch during the commit phase, it goes ahead and completes the transaction.
Committer bolts act just like batch bolts during the commit phase.
The only difference between committer bolts and batch bolts is that committer bolts will not call finishBatch
during the processing phase of a transaction.
Notice that you don't have to do any acking or anchoring when working with transactional topologies. Storm manages all of that underneath the hood. The acking strategy is heavily optimized.
因为封装的比较好, 不须要用户去ack或fail tuple, 那么怎么去fail一个batch?
抛出FailedException, Storm捕获这个异常会replay Batch, 而不会crash
When using regular bolts, you can call the fail
method on OutputCollector
to fail the tuple trees of which that tuple is a member.
Since transactional topologies hide the acking framework from you, they provide a different mechanism to fail a batch (and cause the batch to be replayed).
Just throw a FailedException. Unlike regular exceptions, this will only cause that particular batch to replay and will not crash the process.
Transactional spout和普通的spout彻底不一样的实现, 自己就是一个mini的topology, 分为coordinator spout和emitter bolt
The TransactionalSpout
interface is completely different from a regular Spout
interface. A TransactionalSpout
implementation emits batches of tuples and must ensure that the same batch of tuples is always emitted for the same transaction id.
A transactional spout looks like this while a topology is executing:
The coordinator on the left is a regular Storm spout that emits a tuple whenever a batch should be emitted for a transaction. The emitters execute as a regular Storm bolt and are responsible for emitting the actual tuples for the batch. The emitters subscribe to the "batch emit" stream of the coordinator using an all grouping.
The need to be idempotent with respect to the tuples it emits requires a TransactionalSpout
to store a small amount of state. The state is stored in Zookeeper.
下面是transactional spout的工做流程,
首先coordinator spout只会有一个task, 并会产生两种stream, batch stream和commit stream
它会决定什么时候开始某transaction processing阶段, 此时就往batch stream里面发送包含TransactionAttempt的tuple
它也决定什么时候开始某transaction commit阶段(当经过acker知道processing阶段已经完成的时候, 而且全部prior transaction都已经被commit), 此时就往commit steam里面发送一个包含TransactionAttempt的tuple做为通知, 全部commtting bolt都会预订(经过setBolt的all grouping方式)commit stream, 并根据收到的通知完成commit阶段.
对于commit阶段和processing阶段同样, 经过acker来判断是成功仍是fail, 前面说了transactional topology对acker机制作了较大的优化, 因此全部acking和anchoring都由storm自动完成了.
对于emitter bolt, 能够并发的, 而且以all grouping的方式订阅coordinator的batch stream, 即全部emitter都会获得同样的batch stream, 使用几个emitter取决于场景.
对于topology而言, emitter bolt是真正产生数据的地方, 当coordinator开始某batch的processing过程, 并往batch steam放tuple数据时, emitter bolt就会从batch stream收到数据, 并转发给topology
Here's how transactional spout works:
Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt
The coordinator is a regular spout with a parallelism of 1
The emitter is a bolt with a parallelism of P, connected to the coordinator's "batch" stream using an all grouping
When the coordinator determines it's time to enter the processing phase for a transaction, it emits a tuple containing the TransactionAttempt and the metadata for that transaction to the "batch" stream
Because of the all grouping, every single emitter task receives the notification that it's time to emit its portion of the tuples for that transaction attempt
Storm automatically manages the anchoring/acking necessary throughout the whole topology to determine when a transaction has completed the processing phase. The key here is that *the root tuple was created by the coordinator, so the coordinator will receive an "ack" if the processing phase succeeds, and a "fail" if it doesn't succeed for any reason (failure or timeout).
If the processing phase succeeds, and all prior transactions have successfully committed, the coordinator emits a tuple containing the TransactionAttempt to the "commit" stream.
All committing bolts subscribe to the commit stream using an all grouping, so that they will all receive a notification when the commit happens.
Like the processing phase, the coordinator uses the acking framework to determine whether the commit phase succeeded or not. If it receives an "ack", it marks that transaction as complete in zookeeper.
从后面的讨论, 能够知道transactional spout的batch replay是依赖于source queue的
好比, 对于kafka这种数据是分布在partition上的queue, 须要使用partitioned transactional spout, 用于封装对从不一样partition读数据的过程
Partitioned Transactional Spout
A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how TransactionalKafkaSpout works. An
IPartitionedTransactionalSpout
automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability.
对于Transactional spout, 并不会象普通tuple同样由spout缓存和负责replay, 只会记下该batch数据在source queue的位置(应该是zookeeper), 当须要replay的时候, Transactional spout会重新去source queue去读batch而后replay.
这样的问题是过于依赖source queue, 并且会致使transaction batch没法被replay(好比因为某个partition fail)
这个问题如何解决? 能够参考原文, 比较好的方法, 是fail当前和后续全部的transaction, 而后从新产生transaction的batch数据, 并跳过失败部分
我的决定这个设计不太好, 过于依赖source queue
为什么不在spout缓存batch数据, 虽然这样对于比较大的batch可能有效率问题, 或者会限制同时处理的batch数目, 但从新从source queue读数据来replay也会有不少问题...