你已经看到实现有且只有一次被执行的语义时的复杂性。Trident这样作的好处把全部容错想过的逻辑都放在了State里面 -- 做为一个用户,你并不须要本身去处理复杂的txid,存储多余的信息到数据库中,或者是任何其余相似的事情。你只须要写以下这样简单的code:java
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6); |
全部管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此以外,数据库的更新会自动以batch的形式来进行以免屡次访问数据库。State的基本接口只包含下面两个方法:数据库
public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream void commit(Long txid); } |
当一个State更新开始时,以及当一个State更新结束时你都会被告知,而且会告诉你该次的txid。Trident并无对你的state的工做方式有任何的假定。微信
假定你本身搭了一套数据库来存储用户位置信息,而且你想要在Trident中去访问它。则在State的实现中应该有用户信息的set、get方法:ide
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } } |
而后你还须要提供给Trident一个StateFactory来在Trident的task中建立你的State对象。LocationDB 的 StateFactory可能会以下所示:spa
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } } |
Trident提供了一个QueryFunction接口用来实现Trident中在一个state source上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新statesource的功能。好比说,让咱们写一个查询地址的操做,这个操做会查询LocationDB来找到用户的地址。下面以以怎样在topology中使用该功能开始,假定这个topology会接受一个用户id做为输入数据流:code
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location")) |
接下来让咱们一块儿来看看QueryLocation 的实现应该是什么样的:orm
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
QueryFunction的执行分为两部分。首先Trident收集了一个batch的read操做并把他们统一交给batchRetrieve。在这个例子中,batchRetrieve会接受到多个用户id。batchRetrieve应该返还一个大小和输入tuple数量相同的result列表。result列表中的第一个元素对应着第一个输入tuple的结果,result列表中的第二个元素对应着第二个输入tuple的结果,以此类推。server
你能够看到,这段代码并无像Trident那样很好的利用batch的优点,而是为每一个输入tuple去查询了一次LocationDB。因此一种更好的操做LocationDB方式应该是这样的:对象
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } } |
接着,你能够这样改写上面的QueryLocation:接口
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } } |
经过有效减小访问数据库的次数,这段代码比上一个实现会高效的多。
若是你要更新State,你须要使用StateUpdater接口,下面是一个StateUpdater的例子,用来将新的地址信息更新到LocationDB当中。
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } } |
下面列出了你应该如何在Trident topology中使用上面声明的LocationUpdater:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater()) |
partitionPersist 操做会更新一个State,其内部是将State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操做。
在这段代码中,只是简单的从输入的tuple中提取出userid和对应的location,并一块儿更新到State中。
partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的location db。 而后你就可使用这个state在topology的任何地方进行查询操做了。
同时,你也能够看到咱们传了一个TridentCollector给StateUpdaters,collector发送的tuple就会去往一个新的stream。在这个例子中,咱们并无去往一个新的stream的须要,可是若是你在作一些事情,好比说更新数据库中的某个count,你能够emit更新的count到这个新的stream。而后你能够经过调用TridentState#newValuesStream方法来访问这个新的stream来进行其余的处理。
更多精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
关注超人学院java免费交流群: