原文地址:http://storm.apache.org/documentation/Distributed-RPC.htmlhtml
分布式RPC的目的是在storm进行大量的实时计算时,可以并行的调用storm上的函数。Storm topology能够将函数参数做为输入Stream,而且将被调用方法产生的结果做为返回发送出去。java
与其说DRPC是storm的一个特色,不如说它只是storm基本概念如steams,spouts,bolts和topologies的一种表达方式。DRPC能够独立于storm做为一个库发布,但在和storm捆绑在一块儿时将会很是有用。git
分布式RPC被“DRPC server”实现(storm包中已经有了对应的实例)。DRPC server协调接收一个RPC请求并将这个请求发送给storm topology,而后接收storm topology算出的结果,再将结果发送给等待中的客户端。从客户端的视角来看,分布式RPC调用过程跟普通的RPC调用过程同样。举例:这里有一个客户端调用“reach”方法,输入参数是http://twitter.com,而后获得计算结果的例子。github
DRPCClient client = new DRPCClient("drpc-host", 3772);数据库
String result = client.execute("reach", "http://twitter.com");apache
这个DRPC的工做流能够描述为:api
一个客户端向DRPC Server发送了想要调用的方法名称和方法参数。实现了这个方法的topology用一个DRPCSpout从DRPC Server接收了函数调用Stream。每一个远程方法在DRPC Server上都有一个惟一的ID。Topology计算出结果以后,使用一个ReturnResults的bolt链接DPRC Server后,将结果交给它。DRPC Server根据方法ID匹配出结果,而后唤醒等待的客户端,将结果发送给客户端app
Storm中有一个LinearDRPCTopologyBuilder 的topology 生成,已经自动实现了DPRC调用过程当中的绝大部分。包括:jvm
1:配置spout分布式
2:将结果返回给DPRC Server
3:为bolt提供了简单的tuple之间的聚合操做
让咱们看一个简单的例子,这里有一个DPRC topology的实现,他能够给输入的参数后面添加一个“!”。
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...
}
就像咱们看到的,代码很是简单。当咱们建立一个LinearDRPCTopologyBuilder时,你告诉它实现了DRPC功能的topology的名字。一个DRPC server能够配置不少topology名称,这些名称不能重复。第一个bolt(不在代码中,应该是LinearDRPCTopologyBuilder内部的—译者注)会接受2个tuples,第一个属性是请求id,第二个是方法的参数。LinearDRPCTopologyBuilder中最后一个bolt返回了2个tuples的输出Stream,Stream的格式为[id, result],固然了,过程当中产生的全部tuple的第一个属性都是请求id。
在这个例子中,ExclaimBolt只是简单的在tuple的第二种属性上添加了一个“!”。LinearDRPCTopologyBuilder完成了链接DRPC server并返回结果的其余过程。
DRPC能够在本地运行,例子:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();
第一步,建立一个LocalDRPC对象。这个队形模拟DPRC server的运行,就像LocalCluster模拟集群运行同样。而后你建立一个LocalCluster在本地运行topology。LinearDRPCTopologyBuilder有独立的方法分布建立本地topology和远程topology。本地运行的LocalDRPC不须要绑定端口,由于在本地topology不须要端口来传递对象。这就是为何createLocalTopology将LocalDRPC做为输入。
启动以后,你就能够看到DRPC调用过程在本地执行。
在实际的集群上使用DRPC也很是简单,只须要三步:
1:启动DRPC server
2:配置DRPC server地址
3:将DRPC topology提交到storm集群
启动DRPC server就行启动Nimbus或UI同样简单
bin/storm drpc
接下来,你须要为storm集群配置DRPC的地址,才能DRPCSpout让知道在哪里读取方法调用。能够在storm.yaml中配置或者经过topology配置。在storm.yaml中配置以下
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最后,你要经过StormSubmitter启动DPRC topology,就想启动任何topology同样。远程模式运行上面的例子你能够:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology用来为storm集群建立topology。
上面那个感叹号例子用来熟悉DPRC的概念仍是过于简单。让咱们看一个更复杂的例子,一个真正须要经过并行运行storm来计算的DRPC方法。例子就是计算Twitter上的URL的reach。
URL的reach就是这个URL暴漏给多少用户。为了计算reach,你须要:
1:获取这个URL的全部twitter
2:获取这些twitter的follower
3:去掉重复的follower
4:计算每一个URL的follower
一个真实的计算须要数千次的数据库交互和上百万的flowwer记录的计算。这是很是很是碉堡的计算。但正如你所看到的,基于storm实现这个功能却很是的简单。在单台机器上,reach要花上数分钟来计算;可是在storm集群上,你能够在数秒钟就计算出最难算的URL的reach。
在storm-starter上有一个reach topology(here)样例,这里告诉你如何定义一个reach topology。
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
.shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
.fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
.fieldsGrouping(new Fields("id"));
这个topology计算一共须要四步:
1:GetTweeters获取了tweeted这个URL的用户。它将输入stream[id, url]转换成了一个输出Stream[id, tweeter]。每个URL映射了多个用户tuple。
2:获取了每个tweeter的followers(粉丝)。它将输入stream[id, tweeter]转换成了一个输出Stream [id, follower]。通过这个过程,因为一个follower是同时tweet了同一个URL的多个用户的粉丝,就会产生一些重复的follower tuple。
3.PartialUniquer将follower Stream按照follower和id分组,保证同一个follower会进入同一个task。因此每个PartialUniquer 的task都会接收到相互独立的follower集合。当PartialUniquer接收了全部根据request id分配给它的follower tuples,它就会将去重以后的follower集合的数量发射出去。
4.最终,CountAggregator接收了每个PartialUniquer task发射的数量,而且经过计算总和来完成reach的计算过程。
下面来看一下PartialUniquer bolt:
public class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>();
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}
PartialUniquer经过继承BaseBatchBolt实现了IBatchBolt接口。批处理bolt提供了一流的API能够将批量的tuples看成一个批次来处理。每个request id都会建立一个batch bolt实例,而storm负责在适当的时候清理这些实例。
每当PartialUniquer在execute方法中接收到一个follower tuple,它就将follower放到request id对应的set集合中。
当发射到这个bolt的全部tuples都被处理以后,batch bolts中的finishBatch方法将会被调用。 PartialUniquer发射了一个包含follower数量的tuple。
在后台,CoordinatedBolt用来判断bolt是否所有接受了指定request id的全部tuple。CoordinatedBolt利用直接Stream来管理此次协调。
这个topology的剩余部分就显而易见了。就像你看到的,reach计算的每一步都是并行计算,并且实现DRPC topology是多么的简单。
LinearDRPCTopologyBuilder只能处理线下的DRPC topologies:整个计算能够分割为多个独立的顺序步骤。它很难处理包含有bolt分支和bolt合并的复杂topology。目前,为了实现复杂的功能,你只能经过直接使用CoordinatedBolt。
DRPCSpout发射了[args, return-info].return-info中包含DRPC的地址和端口,就像DRPC的id同样。
构建一个topology包含:
DRPCSpout
PrepareRequest(准备请求:产生一个request id并为return-info和参数分别建立一个Stream)
CoordinatedBolt wrappers and direct groupings(CoordinatedBolt封装和直接分组)
JoinResult(将result-info加入结果)
ReturnResult(链接DRPC server 并返回结果)
LinearDRPCTopologyBuilder是一个很是好storm高级抽象的例子
KeyedFairBolt封装同时处理多个请求
如何直接使用CoordinatedBolt
译者:须要详细了解CoordinatedBolt,我的推荐徐明明的一个博客http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/