Trident State(翻译)

Trident 是对状态化(stateful)数据源进行读取和写入操做的最好抽象。状态(state)既能够保存在拓扑内部(例如保存在内存中并备份到HDFS上),也能够存入像Memcached或者Cassandra这样的外部数据库。而对于Trident API而言,这两种机制没有区别。html

Trident manages state in a fault-tolerant way so that state updates are idempotent in the face of retries and failures. This lets you reason about Trident topologies as if each message were processed exactly-once.java

Trident使用一种容错性的方式来对state进行管理,这样即便在面对失败或重试时,状态的更新还是幂等的。基于这个机制,每条消息被看做被刚好处理了一次(exactly-once)。git

State更新中存在多级容错性机制。在讨论这一点以前,咱们先来看一个例子,这个例子展现了实现刚好一次(exactly-once)语义时必需的诀窍。假如你正在对数据流进行一个count聚合操做,并打算将计数结果存入数据库中。在这个例子里,你在数据库中存入一个单一值来表示count值,并且你每次计算一个新tuple来增长count。github

当出现失败状况,tuple 将被重发。这样就给state更新带来了一个问题(或者其余反作用)—— 你没法知道当前的这个tuple更新操做是否已经被处理过。也许你以前并无处理这个tuple,那么你须要增长count;也许你以前已经处理了这个tuple而且成功增长了count,可是在另外一部执行操做过程当中tuple处理失败了,这时你就不该该增长count;还有可能你以前在使用这个tuple,可是在更新数据库时候出错了,出现这种状况时,你仍然须要更新数据库。数据库

在数据库中只存储count值时,你不知道这个tuple是否以前被处理过。因此你须要更多的信息来得到正确的决定。Trident提供了下面能够知足刚好处理一次的语义。apache

  1. Tuples被做为small batch来处理(see the tutorial)
  2. 每一个tuples的batch被赋予一个"transaction id" (txid)。若是这个batch被从新发放,将被赋予严格相同的txid
  3. 状态更新在batch之间是有序的。这样,直到对于batch 2的状态更新成功,对于batch 3的状态更新才会应用。

基于这些原语,你的 State 实现就能够检测tuple的batch以前是否已经被处理,而后来选择合适的方式来进行state更新操做。你具体的操做取决于你在每一个batch中输入spout提供的语义。有三类支持容错性的 spout:“非事务型”(non-transactional)、“事务型”(transactional)以及“模糊事务型”(opaque transactional)。接下来咱们来分析下每种 spout 类型的容错性语义。编程

Transactional spouts

记住,Trident 是经过small batch来处理tuple的,并且每一个batch都会有一个惟一的txid。spout的特性是由他们所提供给每一个batch的保证机制决定的。事务型 spout 包含如下特性:缓存

  1. 每一个batch的txid老是相同的。对于某个txid的batchs从新处理,处理的tuples集合必须跟第一次处理操做彻底相同。
  2. 在不一样的batch之间是不会出现tuples重叠状况。
  3. 每一个tuple都会在某个batch(不会遗漏tuples)

这是一种容易理解的spout类型,stream被切分红固定不变的batchs。storm-contrib提供一个关于Kafka的an implementation of a transactional spout 。安全

你可能会有疑问:为何不所有使用事务型 spout 呢?缘由很好理解。一方面,有些spout并没有必要去保障足够可靠的容错性。好比,TransactionalTridentKafkaSpout的工做方式就是使得带有某个 txid 的 batch 中包含有来自一个 Kafka topic 的全部 partition 的 tuple。一旦一个 batch 被发送出去,在未来不管从新发送这个 batch 多少次,batch 中都会包含有彻底相同的 tuple 集,这是由事务型 spout 的语义决定的。如今假设 TransactionalTridentKafkaSpout 发送出的某个 batch 处理失败了,而与此同时,Kafka 的某个节点由于故障下线了。这时你就没法从新处理以前的 batch 了(由于 Kafka 的节点故障,Kafka topic 必然有一部分 partition 没法获取到),这个处理过程也会所以终止。app

这就是要有“模糊事务型” spout 的缘由了 —— 模糊事务型 spout 支持在数据源节点丢失的状况下仍然能够实现exactly-once刚好一次的处理语义。咱们会在下一节讨论这类 spout。

顺便提一点,若是 Kafka 支持数据复制,那么就能够放心地使用事务型 spout 提供的容错性机制了,由于这种状况下某个节点的故障不会致使数据丢失。

在讨论“模糊事务型” spout 以前,让咱们先来看看如何为事务型 spout 设计一种支持exactly-once刚好一次语义的State。这个 State 就称为 “事务型 state”,它利用了特定的 txid 永远只与同一组 tuple 相关联这一事实。

假如你的拓扑须要计算单词数,并且你准备将计数结果存入一个 K-V 型数据库中。这里的 key 就是单词,value 对应于单词数。从上面的讨论中你应该已经明白了仅仅存储计数结果是没法肯定某个 batch 中的tuple 是否已经被处理过的。因此,如今你应该将 txid 做为一种原子化的值与计数值一块儿存入数据库。随后,在更新计数值的时候,你就能够将数据库中的 txid 与当前处理的 batch 的 txid 进行比对。若是二者相同,你就能够跳过更新操做 —— 因为 Trident 的强有序性处理机制,能够肯定数据库中的值是对应于当前的 batch 的。若是二者不一样,你就能够放心地增长计数值。因为一个 batch 的 txid 永远不会改变,并且 Trident 可以保证 state 的更新操做彻底是按照 batch 的顺序进行的,因此,这样的处理逻辑是彻底可行的。

下面来看一个例子。假如你正在处理 txid 3,其中包含有如下几个 tuple:

["man"]
["man"]
["dog"]

假如数据库中有如下几个 key-value 对:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

其中与 “man” 相关联的 txid 为 1。因为当前处理的 txid 为 3,你就能够肯定当前处理的 batch 与数据库中存储的值无关,这样你就能够放心地将 “man” 的计数值加上 2 并更新 txid 为 3。另外一方面,因为 “dog” 的 txid 与当前的 txid 相同,因此,“dog” 的计数是以前已经处理过的,如今不能再对数据库中的计数值进行更新操做。这样,在结束 txid3 的更新操做以后,数据库中的结果就会变成这样:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

如今咱们再来讨论一下“模糊事务型” spout。

Opaque transactional spouts

前面已经提到过,模糊事务型 spout 不能保证一个 txid 对应的 batch 中包含的 tuple 彻底一致。模糊事务型 spout 有如下的特性:

  1. 每一个 tuple 都会经过某个 batch 处理完成。不过,在 tuple 处理失败的时候,tuple 有可能继续在另外一个 batch 中完成处理,而不必定是在原先的 batch 中完成处理。

OpaqueTridentKafkaSpout 就具备这样的特性,同时它对 Kafka 节点的丢失问题具备很好的容错性。OpaqueTridentKafkaSpout 在发送一个 batch 的时候总会总上一个 batch 结束的地方开始发送新 tuple。这一点能够保证 tuple 不会被遗漏,并且也不会被多个 batch 处理。

不过,模糊事务型 spout 的缺点就在于不能经过 txid 来识别数据库中的 state 是不是已经处理过的。这是由于在 state 的更新的过程当中,batch 有可能会发生变化。

在这种状况下,你应该在数据库中存储更多的 state 信息。除了一个结果值和 txid 以外,你还应该存入前一个结果值。咱们再以上面的计数值的例子来分析如下这个问题。假如你的 batch 的部分计数值是 “2”,如今你须要应用一个更新操做。假定如今数据库中的值是这样的:

{ value = 4,
  prevValue = 1,
  txid = 2
}

假如当前处理的 txid 为 3,这与数据库中的 txid 不一样。这时能够将 “prevValue” 的值设为 “value” 的值,再为 “value” 的值加上部分计数的结果并更新 txid。执行完这一系列操做以后的数据库中的值就会变成这样:

{ value = 6,
  prevValue = 4,
  txid = 3
}

若是当前处理的 txid 为 2,也就是和数据库中存储的 txid 一致,这种状况下的处理逻辑与上面的 txid 不一致的状况又有所不一样。由于此时你会知道数据库中的更新操做是由上一个拥有相同 txid 的batch 作出的。不过那个 batch 有可能与当前的 batch 并不相同,因此你须要忽略它的操做。这个时候,你应该将 “prevValue” 加上 batch 中的部分计数值来计算新的 “value”。在这个操做以后数据库中的值就会变成这样:

{ value = 3,
  prevValue = 1,
  txid = 2
}

这种方法之因此可行是由于 Trident 具备强顺序性处理的特性。一旦 Trident 开始处理一个新的 batch 的状态更新操做,它永远不会回到过去的 batch 的处理上。同时,因为模糊事务型 spout 会保证 batch 之间不会存在重复 —— 每一个 tuple 只会被某一个 batch 完成处理 —— 因此你能够放心地使用 prevValue 来更新 value。

Non-transactional spouts

非事务型 spout 不能为 batch 提供任何的安全性保证。非事务型 spout 有可能提供一种“至多一次”的处理模型,在这种状况下 batch 处理失败后 tuple 并不会从新处理;也有可能提供一种“至少一次”的处理模型,在这种状况下可能会有多个 batch 分别处理某个 tuple。总之,此类 spout 不能提供“刚好一次”的语义。

Summary of spout and state types

下图显示了不一样的 spout/state 的组合是否支持刚好一次的消息处理语义:

模糊事务型 state 具备最好的容错性特征,不过这是以在数据库中存储更多的内容为代价的(一个 txid 和两个 value)。事务型 state 要求的存储空间相对较小,可是它的缺点是只对事务型 spout 有效。相对的,非事务型要求的存储空间最少,可是它也不能提供任何的刚好一次的消息执行语义。

你选择 state 与 spout 的时候必须在容错性与存储空间占用之间权衡。能够根据你的应用的需求来肯定哪一种组合最适合你。

State APIs

从上文的描述中你已经了解到了exactly-once刚好一次的消息执行语义的原理是多么的复杂。不过做为用户你并不须要处理这些复杂的 txid 比对、多值存储等操做,Trident 已经在 State 中封装了全部的容错性处理逻辑,你只须要像下面这样写代码便可:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                
        .parallelismHint(6);


全部处理模糊事务型 state 的逻辑已经封装在 MemcachedState.opaque 的调用中了。另外,状态更新都会自动调整为批处理操做,这样能够减少与数据库的反复交互的资源损耗。

基本的 State 接口只有两个方法:

public interface State {
    void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream
    void commit(Long txid);
}

前面已经说过,state 更新操做的开始时和结束时都会获取一个 txid。对于你的 state 怎么工做,你在其中使用什么样的方法执行更新操做,或者使用什么样的方法从 state 中读取数据,Trident 并不关心。

假如你有一个包含有用户的地址信息的定制数据库,你须要使用 Trident 与该数据库交互。你的 State 的实现就会包含有用于获取与设置用户信息的方法,好比下面这样:

public class LocationDB implements State {
    public void beginCommit(Long txid) {    
    }

    public void commit(Long txid) {    
    }

    public void setLocation(long userId, String location) {
      // code to access database and set location
    }

    public String getLocation(long userId) {
      // code to get location from database
    }
}

接着你就能够为 Trident 提供一个 StateFactory 来建立 Trident 任务内部的 State 对象的实例。对应于你的数据库(LocationDB)的 StateFactory 大概是这样的:

public class LocationDBFactory implements StateFactory {
   public State makeState(Map conf, int partitionIndex, int numPartitions) {
      return new LocationDB();
   } 
}

Trident 提供了一个用于查询 state 数据源的 QueryFunction 接口,以及一个用于更新 state 数据源的 StateUpdater 接口。例如,咱们能够写一个查询 LocationDB 中的用户地址信息的 “QueryLocation”。让咱们从你在拓扑中使用这个操做的方式开始。假如在拓扑中须要读取输入流中的 userid 信息:

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

这里的 QueryLocation 的实现多是这样的:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for(TridentTuple input: inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }    
}

QueryFunction 的执行包含两个步骤。首先,Trident 会将读取的一些数据中汇总为一个 batch 传入 batchRetrieve 方法中。在这个例子中,batchRetrieve 方法会收到一些用户 id。而后 batchRetrieve 会返回一个与输入 tuple 列表大小相同的队列。结果队列的第一个元素与第一个输入 tuple 对应,第二个元素与第二个输入 tuple 相对应,以此类推。

你会发现这段代码并无发挥出 Trident 批处理的优点,由于这段代码仅仅一次查询一下 LocationDB。因此,实现 LocationDB 的更好的方式应该是这样的:

public class LocationDB implements State {
    public void beginCommit(Long txid) {    
    }

    public void commit(Long txid) {    
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
      // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
      // get locations in bulk
    }
}

而后,你能够这样实现 QueryLocation 方法:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for(TridentTuple input: inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }    
}

这段代码大幅减小了域数据库的IO,具备更高的执行效率。

你须要使用 StateUpdater 接口来更新 state。下面是一个更新 LocationDB 的地址信息的 StateUpdater 实现:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for(TridentTuple t: tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

而后你就能够在 Trident 拓扑中这样使用这个操做:

TridentTopology topology = new TridentTopology();
TridentState locations = 
    topology.newStream("locations", locationsSpout)
        .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())

partitionPersist 操做会更新 state 数据源。StateUpdater 接收 State 和一批 tuple 做为输入,而后更新这个 State。上面的代码仅仅从输入 tuple 中抓取 userid 和 location 信息,而后对 State 执行一个批处理更新操做。

在 Trident 拓扑更新 LocationDB 以后,partitionPersist 会返回一个表示更新后状态的 TridentState 对象。随后你就能够在拓扑的其余地方使用 stateQuery 方法对这个 state 执行查询操做。

你也许注意到了 StateUpdater 中有一个 TridentCollector 参数。发送到这个 collector 的 tuple 会进入一个“新的数值流”中。在这个例子里向这个新的流发送 tuple 并无意义,不过若是你须要处理相似于更新数据库中的计数值这样的操做,你能够考虑将更新后的技术结果发送到这个流中。能够经过 TridentState.newValuesStream 方法来获取新的流的数据。

persistentAggregate

Trident 使用一个称为 persistentAggregate 的方法来更新 State。你已经在前面的数据流单词统计的例子里见过了这个方法,这里再写一遍:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

partitionPersist是一个接收Trident聚合器做为参数并对state数据源进行更新的方法,persistentAggregate 就是构建于partitionPersist上层的一个编程抽象。在这个例子里,因为是一个分组数据流(grouped stream),Trident 须要你提供一个实现 MapState 接口的 state。被分组的域就是 state 中的 key,而聚合的结果就是 state 中的 value。MapState 接口是这样的:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

而当你在非分组数据流上执行聚合操做时(全局聚合操做),Trident须要你提供一个实现了 Snapshottable 接口的对象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapState and MemcachedState each implement both of these interfaces.

Implementing Map States

实现 MapState 接口很是简单,Trident 几乎已经为你作好了全部的准备工做。OpaqueMapTransactionalMap、与NonTransactionalMap 类都分别实现了各自的容错性语义。你只须要为这些类提供一个用于对不一样的 key/value 进行 multiGets 与 multiPuts 处理的 IBackingMap 实现类。IBackingMap 接口是这样的:

 

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys); 
    void multiPut(List<List<Object>> keys, List<T> vals); 
}

OpaqueMap 会使用 OpaqueValue做为 vals 参数来调用 multiPut 方法,TransactionalMap 会使用TransactionalValue做为参数,而 NonTransactionalMap 则直接将拓扑中的对象传入。

Trident 也提供了一个  CachedMap用于实现 K-V map 的自动 LRU 缓存功能。

最后,Trident 还提供了一个 SnapshottableMap  类,该类经过将全局聚合结果存入一个固定的 key 中的方法将 MapState 对象转化为一个 Snapshottable 对象。

能够参考MemcachedState的实现来了解如何将这些工具结合到一块儿来提供一个高性能的 MapState。MemcachedState 支持选择模糊事务型、事务型或者非事务型语义。

相关文章
相关标签/搜索