Storm是一个分布式的流处理系统,利用anchor和ack机制保证全部tuple都被成功处理。若是tuple出错,则能够被重传,可是如何 保证出错的tuple只被处理一次呢?Storm提供了一套事务性组件Transaction Topology,用来解决这个问题。数据库
Transactional Topology目前已经再也不维护,由Trident来实现事务性topology,可是原理相同。框架
Storm如何实现即对tuple并行处理,又保证事务性。本节从简单的事务性实现方法入手,逐步引出Transactional Topology的原理。分布式
保证tuple只被处理一次,最简单的方法就是将tuple流变成强顺序的,而且每次只处理一个tuple。从1开始,给每一个tuple都顺序加上 一个id。在处理tuple的时候,将处理成功的tuple id和计算结果存在数据库中。下一个tuple到来的时候,将其id与数据库中的id作比较。若是相同,则说明这个tuple已经被成功处理过了,忽略 它;若是不一样,根据强顺序性,说明这个tuple没有被处理过,将它的id及计算结果更新到数据库中。ide
以统计消息总数为例。每来一个tuple,若是数据库中存储的id 与当前tuple id不一样,则数据库中的消息总数加1,同时更新数据库中的当前tuple id值。如图:ui
可是这种机制使得系统一次只能处理一个tuple,没法实现分布式计算。spa
为了实现分布式,咱们能够每次处理一批tuple,称为一个batch。一个batch中的tuple能够被并行处理。设计
咱们要保证一个batch只被处理一次,机制和上一节相似。只不过数据库中存储的是batch id。batch的中间计算结果先存在局部变量中,当一个batch中的全部tuple都被处理完以后,判断batch id,若是跟数据库中的id不一样,则将中间计算结果更新到数据库中。orm
如何确保一个batch里面的全部tuple都被处理完了呢?能够利用Storm提供的CoordinateBolt。如图:对象
可是强顺序batch流也有局限,每次只能处理一个batch,batch之间没法并行。要想实现真正的分布式事务处理,可使用storm提供的Transactional Topology。在此以前,咱们先详细介绍一下CoordinateBolt的原理。blog
CoordinateBolt具体原理以下:
真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt咱们称为real bolt。
每一个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些tuple发送信息(一样根据groping信息)
Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪一个task了。
等全部的tuple都发送完了以后,CoordinateBolt经过另一个特殊的stream以emitDirect的方式告诉全部它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和本身已经接收到的tuple数量作对比,若是相等,则说明处理 完了全部的tuple。
下游CoordinateBolt会重复上面的步骤,通知其下游。
整个过程如图所示:
CoordinateBolt主要用于两个场景:
DRPC
Transactional Topology
CoordinatedBolt对于业务是有侵入的,要使用CoordinatedBolt提供的功能,你必需要保证你的每一个bolt发送的每一个 tuple的第一个field是request-id。 所谓的“我已经处理完个人上游”的意思是说当前这个bolt对于当前这个request-id所须要作的工做作完了。这个request-id在DRPC 里面表明一个DRPC请求;在Transactional Topology里面表明一个batch。
Storm提供的Transactional Topology将batch计算分为process和commit两个阶段。Process阶段能够同时处理多个batch,不用保证顺序 性;commit阶段保证batch的强顺序性,而且一次只能处理一个batch,第1个batch成功提交以前,第2个batch不能被提交。
仍是以统计消息总数为例,如下代码来自storm-starter里面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);
builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);
builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“);
TransactionalTopologyBuilder共接收四个参数。
这个Transactional Topology的id。Id用来在Zookeeper中保存当前topology的进度,若是这个topology重启,能够继续以前的进度执行。
Spout在这个topology中的id
一个TransactionalSpout。一个Trasactional Topology中只能有一个TrasactionalSpout.在本例中是一个MemoryTransactionalSpout,从一个内存变量(DATA)中读取数据。
TransactionalSpout的并行度(可选)。
下面是BatchCount的定义:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “count“));
}
}
BatchCount的prepare方法的最后一个参数是batch id,在Transactional Tolpoloyg里面这id是一个TransactionAttempt对象。
Transactional Topology里发送的tuple都必须以TransactionAttempt做为第一个field,storm根据这个field来判断tuple属于哪个batch。
TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的做用就是咱们上面介绍的对于每一个batch中的tuple是惟一的,并且无论这个batch replay多少次都是同样的。attempt id是对于每一个batch惟一的一个id, 可是对于同一个batch,它replay以后的attempt id跟replay以前就不同了, 咱们能够把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不一样版本。
execute方法会为batch里面的每一个tuple执行一次,你应该把这个batch里面的计算状态保持在一个本地变量里面。对于这个例子来讲, 它在execute方法里面递增tuple的个数。
最后, 当这个bolt接收到某个batch的全部的tuple以后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。
下面是UpdateGlobalCount类的定义:
public static class UpdateGlobalCount extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “sum“));
}
}
UpdateGlobalCount实现了ICommitter接口,因此storm只会在commit阶段执行finishBatch方法。而execute方法能够在任何阶段完成。
在UpdateGlobalCount的finishBatch方法中,将当前的transaction id与数据库中存储的id作比较。若是相同,则忽略这个batch;若是不一样,则把这个batch的计算结果加到总结果中,并更新数据库。
Transactional Topolgy运行示意图以下:
下面总结一下Transactional Topology的一些特性
Transactional Topology将事务性机制都封装好了,其内部使用CoordinateBolt来保证一个batch中的tuple被处理完。
TransactionalSpout只能有一个,它将全部tuple分为一个一个的batch,并且保证同一个batch的transaction id始终同样。
BatchBolt处理batch在一块儿的tuples。对于每个tuple调用execute方法,而在整个batch处理完成的时候调用finishBatch方法。
若是BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交以后才会执行。而且它会重试直到topology里面的全部bolt在commit完成提交。
Transactional Topology隐藏了anchor/ack框架,它提供一个不一样的机制来fail一个batch,从而使得这个batch被replay。
Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。若是你使用过Pig或Cascading,对这些接口就不会陌生。
Trident将stream中的tuples分红batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。
Trident事务性原理这里不详细介绍,有兴趣的读者请自行查阅资料。