storm源码分析---State APIs

你已经看到实现有且只有一次被执行的语义时的复杂性。Trident这样作的好处把全部容错想过的逻辑都放在了State里面 -- 做为一个用户,你并不须要本身去处理复杂的txid,存储多余的信息到数据库中,或者是任何其余相似的事情。你只须要写以下这样简单的codejava

  TridentTopology topology = new  TridentTopology();          TridentState wordCounts =         topology.newStream("spout1", spout)           .each(new Fields("sentence"), new Split(), new  Fields("word"))           .groupBy(new Fields("word"))           .persistentAggregate(MemcachedState.opaque(serverLocations), new  Count(), new Fields("count"))                           .parallelismHint(6);  

全部管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此以外,数据库的更新会自动以batch的形式来进行以免屡次访问数据库。State的基本接口只包含下面两个方法:数据库

  public interface State {       void beginCommit(Long txid); // can be null for things like  partitionPersist occurring off a DRPC stream       void commit(Long txid);  }  

当一个State更新开始时,以及当一个State更新结束时你都会被告知,而且会告诉你该次的txid。Trident并无对你的state的工做方式有任何的假定。微信

假定你本身搭了一套数据库来存储用户位置信息,而且你想要在Trident中去访问它。则在State的实现中应该有用户信息的set、get方法:ide

  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocation(long userId, String location) {         // code to access database and set location       }          public String getLocation(long userId) {         // code to get location from database       }  }  

而后你还须要提供给Trident一个StateFactory来在Trident的task中建立你的State对象。LocationDB 的 StateFactory可能会以下所示:spa

  public class LocationDBFactory implements  StateFactory {      public State makeState(Map conf, int partitionIndex, int  numPartitions) {         return new LocationDB();     }    }  

Trident提供了一个QueryFunction接口用来实现Trident中在一个state source上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新statesource的功能。好比说,让咱们写一个查询地址的操做,这个操做会查询LocationDB来找到用户的地址。下面以以怎样在topology中使用该功能开始,假定这个topology会接受一个用户id做为输入数据流:code

  TridentTopology topology = new  TridentTopology();  TridentState locations =  topology.newStaticState(new LocationDBFactory());  topology.newStream("myspout",  spout)           .stateQuery(locations, new Fields("userid"), new  QueryLocation(), new Fields("location"))  

接下来让咱们一块儿来看看QueryLocation 的实现应该是什么样的:orm

  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<String> ret = new ArrayList();           for(TridentTuple input: inputs) {               ret.add(state.getLocation(input.getLong(0)));           }           return ret;       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  

QueryFunction的执行分为两部分。首先Trident收集了一个batch的read操做并把他们统一交给batchRetrieve。在这个例子中,batchRetrieve会接受到多个用户id。batchRetrieve应该返还一个大小和输入tuple数量相同的result列表。result列表中的第一个元素对应着第一个输入tuple的结果,result列表中的第二个元素对应着第二个输入tuple的结果,以此类推。server

你能够看到,这段代码并无像Trident那样很好的利用batch的优点,而是为每一个输入tuple去查询了一次LocationDB。因此一种更好的操做LocationDB方式应该是这样的:对象

  public class LocationDB implements State {       public void beginCommit(Long txid) {            }          public void commit(Long txid) {            }          public void setLocationsBulk(List<Long> userIds,  List<String> locations) {         // set locations in bulk       }          public List<String> bulkGetLocations(List<Long> userIds) {         // get locations in bulk       }  }  

接着,你能够这样改写上面的QueryLocation:接口

  public class QueryLocation extends  BaseQueryFunction<LocationDB, String> {       public List<String> batchRetrieve(LocationDB state,  List<TridentTuple> inputs) {           List<Long> userIds = new ArrayList<Long>();           for(TridentTuple input: inputs) {               userIds.add(input.getLong(0));           }           return state.bulkGetLocations(userIds);       }          public void execute(TridentTuple tuple, String location,  TridentCollector collector) {           collector.emit(new Values(location));       }      }  

经过有效减小访问数据库的次数,这段代码比上一个实现会高效的多。

若是你要更新State,你须要使用StateUpdater接口,下面是一个StateUpdater的例子,用来将新的地址信息更新到LocationDB当中。

  public class LocationUpdater extends  BaseStateUpdater<LocationDB> {       public void updateState(LocationDB state, List<TridentTuple>  tuples, TridentCollector collector) {           List<Long> ids = new ArrayList<Long>();           List<String> locations = new ArrayList<String>();           for(TridentTuple t: tuples) {               ids.add(t.getLong(0));               locations.add(t.getString(1));          }           state.setLocationsBulk(ids, locations);       }  }  

下面列出了你应该如何在Trident topology中使用上面声明的LocationUpdater:

  TridentTopology topology = new  TridentTopology();  TridentState locations =        topology.newStream("locations", locationsSpout)           .partitionPersist(new LocationDBFactory(), new  Fields("userid", "location"), new LocationUpdater())  

partitionPersist 操做会更新一个State,其内部是将State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操做。

在这段代码中,只是简单的从输入的tuple中提取出userid和对应的location,并一块儿更新到State中。

partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的location db。 而后你就可使用这个state在topology的任何地方进行查询操做了。

同时,你也能够看到咱们传了一个TridentCollector给StateUpdaters,collector发送的tuple就会去往一个新的stream。在这个例子中,咱们并无去往一个新的stream的须要,可是若是你在作一些事情,好比说更新数据库中的某个count,你能够emit更新的count到这个新的stream。而后你能够经过调用TridentState#newValuesStream方法来访问这个新的stream来进行其余的处理。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费交流群:

相关文章
相关标签/搜索