Storm 的编程模型是一个有向无环图,模型角度决定了 Storm 的 Spout 接收到外部系统的请求,将请求数据分发给下游的 bolt 进行处理后,spout 并不能获得 bolt 的处理结果并将结果返回给外部请求。因此应用场景中 Storm 对外部系统的调用都是采用回调的方式:html
但假若有一个需求:项目要接入各大银行的系统中,经过要求对方提供一个回调接口来实现同步是不可能的。必须依靠本身去实现同步请求响应,外部系统将消息发往storm实时平台,而后外部系统会阻塞,等待storm实时平台处理完后将结果返回给外部系统。这就要用到 DRPC了。java
DRPC设计目的是为了充分利用Storm的计算能力实现高密度的并行实时计算。经过一个 DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。编程
一个 DRPC请求过程:客户端程序向 DRPC Server 发送要执行的函数名称和该函数的参数。DRPC Server 将函数调用放到队列中,并用一个唯一的id标记,具有 DRPC功能的拓扑会使用一个 DRPCSpout 拉取 。Topo 计算好结果后会由一个名为 ReturnResults 的 bolt 去 链接 DRPC Server 给出对应函数调用id的结果,而后 DRPC Serve 根据 ID 找到等待中的客户端,为等待中的客户端消除阻塞,并发送结果给客户端。具体工做流程以下图所示:segmentfault
从一个客户端的角度来看,一个分布式RPC调用就像是一个常规的RPC调用。只须要传输服务名和请求参数便可。设计模式
实际就是个同步的、向远程系统发送socket请求并获得远程系统处理的结果的分布式调用而已。架构
由上面的架构图能够发现,DRPC Server 至关于一个第三方服务:并发
DRPC Server 要同时协调三个不一样的程序的请求,经过源码可知其经过定义 Thrift 接口完成了进程间的通讯,下面来详解每一个过程。框架
因为 Strom 的 drpc 是经过 thrift 框架 进行 rpc调用的,因此先查看 strom.thrift。有两个 thrift 接口: DistributedRPC 和 DistributedRPCInvocations 。socket
service DistributedRPC { // 请求 drpc 方法 string execute(1: string functionName, 2: string funcArgs) throws (1: DRPCExecutionException e, 2: AuthorizationException aze); } service DistributedRPCInvocations { // 返回 业务topo 处理结果给 DRPCServer void result(1: string id, 2: string result) throws (1: AuthorizationException aze); // 业务topo 拉取 DRPCServer从客户端接收到的请求 DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze); void failRequest(1: string id) throws (1: AuthorizationException aze); void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze); } struct DRPCRequest { 1: required string func_args; 2: required string request_id; }
须要查看对两个 thrift 接口的具体实现逻辑,只要查看接口的实现类便可,DRPC Server 中的具体实现类是 DRPCThrift,它同时实现了两个接口中的方法,即处理 DRPC客户端的请求,又处理 DRPC业务Topo拉请求的请求。分布式
进行 DRPC调用的第一步是 客户端调用 execute(name, args) ,DRPC Server 的 execute( ) 会对该请求作以下处理:
public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface { // 构造方法注入 private final DRPC drpc; //请求队列 <requestName, request queue>,将请求排队给业务topo消费,Waiting to be fetched ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = new ConcurrentHashMap<>(); //结果map <requestId, request>,用来接收结果返回给客户端,Waiting to be returned _requests = new ConcurrentHashMap<>(); @Override public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { return drpc.executeBlocking(functionName, funcArgs); } } public class DRPC implements AutoCloseable { public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { String id = nextId(); T req = BlockingOutstandingRequest .FACTORY .mkRequest(functionName, new DRPCRequest(funcArgs, id)); _requests.put(id, req); ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName); q.add(req); try { return req.getResult(); } finally { cleanup(req.getRequest().get_request_id()); } } }
DRPCSpout 做为 thrift客户端 经过调用 fetchRequest() 拉取请求,这里须要转换一下思惟,DRPCThrift 依然做为 thrift 服务端,因此 DRPCThrift 要实现两个接口。
所以 DRPCSpout 使用的是 DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface,在 nextTuple() 中不断调用 client.fetchRequest(function); 获得 DRPC客户端 的请求来处理。
后面的流程就进入咱们编写的业务 Topo 中了,经过 LinearDRPCTopologyBuilder 的builder.createRemoteTopology()
来构建线性的drpc topo
,该topo
的链路为:spout -> PrepareRequest Bolt-> 用户bolt1 -> 用户bolt2 -> JoinResult Bolt -> ReturnResults Bolt
其中 JoinResult Bolt ,用两个 Map 分别记录 PrepareRequest 收到的请求 Id,最后一个业务Topo处理后的请求 Id,这两个Id是同样的,当两个Id都在 Map中时就认为该 DRPC请求完成,则继续发送给 ReturnResults Bolt 。
最后 ReturnResults Bolt 经过调用 client.result(id, result); 用于返回 Topo 处理结果,在 DRPC 类中 returnResult() 的具体实现:
# DRPC类 public void returnResult(String id, String result) { OutstandingRequest req = _requests.get(id); if (req != null) { req.returnResult(result); } } } # BlockingOutstandingRequest 类 public void returnResult(String result) { _result = result; _sem.release(); }
阅读源码的过程当中对 DRPCSput 的 client.fetchRequest(function);
链路不清楚,想看它的服务端业务是怎么实现的,点进去看到的是 DRPCInvocationsClient 的 fetchRequest() ,这里c.fetchRequest(func);
竟然是直接又用 thrift客户端调 fetchRequest()?看:
public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { // 构造方法 client.set(new DistributedRPCInvocations.Client(protocol)); // # DRPCInvocationsClient @Override public DRPCRequest fetchRequest(String func) { DistributedRPCInvocations.Client c = client.get(); if (c == null) { throw new TException("Client is not connected..."); } // 这里是真正的客户端请求,DistributedRPCInvocations.Client 是 thrift 抽象的客户端 return c.fetchRequest(func); } }
小伙子,你会觉得fetchRequest(func)
是重复的 thrift客户端调用,说明你对 DRPCInvocationsClient
类不熟,对设计模式也不熟啊!!首先,DRPCInvocationsClient
和DistributedRPCInvocations.Client
同样,都实现DistributedRPCInvocations.Iface
,你就根据仅有的thrift
知识,觉得实现了DistributedRPCInvocations.Iface
接口的都是要写服务端业务逻辑的;其实这里DRPCInvocationsClient
只是用了静态代理模式,对 DistributedRPCInvocations.Client
的代理而已,对各方法多了异常处理啊,真正的客户端请求确实是c.fetchRequest(func);
既然这样,那就看还有什么类是实现了DistributedRPCInvocations.Iface
接口的,就是c.fetchRequest(func);
对应的服务端相应逻辑,就在DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface
,同时实现了两个 drpc 的接口进行所有方法实现,具体逻辑在DRPCThrift
的成员变量DRPC
类中!
class DRPC { // DRPCSpout 中调用的 fetchRequest,实际调用的是这里。 public DRPCRequest fetchRequest(String functionName) throws AuthorizationException { meterFetchRequestCalls.mark(); checkAuthorizationNoLog("fetchRequest", functionName); ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName); OutstandingRequest req = q.poll(); if (req != null) { //Only log accesses that fetched something logAccess("fetchRequest", functionName); req.fetched(); DRPCRequest ret = req.getRequest(); return ret; } return NOTHING_REQUEST; } }