flink RPC(akka)

  flink中的rpc框架使用的akka。在本节并不详细讲述akka,而是就flink中rpc来说述akka的部份内容。本节,我从AkkaRpcActor.handleRpcInvocation方法讲起。
  看过hadoop、yarn、hive、hbase、presto的rpc框架,感受flink的通讯框架是最容易让人绕晕的。虽然以前也看过一点spark中akka的通讯,但如今早已忘得一干二净。现在重拾akka通讯,感受仍是挺复杂的。所以,这里特地拿出一节来说解。
  1.这里首先要讲述的是flink中关于心跳的rpc交互。这里也是akka中第一种远程通讯方式,也就是说经过tell方式异步传输。
  这里咱们从HeartbeatTarget.requestHeartbeat开始讲。真正调用的是ResourceManager.registerTaskExecutorInternal方法中 类型为HeartbeatTarget的匿名类,其内部调用了taskExecutorGateway.heartbeatFromResourceManager。这里的taskExecutorGateway是一个代理类,其invocationHandler为AkkaInvocationHandler。所以,这里首先调用的是AkkaInvocationHandler.invoke,因为这里要调用的并不是本地方法,所以接着调用了方法AkkaInvocationHandler.invokeRpc。在该方法中首先经过方法createRpcInvocationMessage封装了发现taskmanager端的请求RemoteRpcInvocation,接着获取了欲调用方法的返回值(这里的判断是为了后面使用不一样的akka通讯方式)。咱们这里的返回值为Void。而后调用了AkkaInvocationHandler.tell。这里的入参是刚刚封装的RemoteRpcInvocation,该方法内部调用了ActorRef.tell。该actor就是taskmanager端的化生,发送了RemoteRpcInvocation(可序列化)。jobmanager端,也就是resourcemanager端的流程到这里就结束了,由于咱们远程调用的方法是无返回值的。
  接着,咱们来到taskmanager端,这里的AkkaRpcActor.onReceive接收到resourcemanager端发来的消息。根据类型的匹配,咱们来到AkkaRpcActor.handleRpcMessage。因为这里的信息是RemoteRpcInvocation,实现了接口RpcInvocation,所以,咱们来到AkkaRpcActor.handleRpcInvocation方法。这里首先调用方法lookupRpcMethod根据方法名获取taskmanager端对应的方法,也就是TaskExecutor中对应的方法。接着,设置了其访问属性后,便开始反射调用。因为咱们这里的方法返回值类型为Void,所以,在调用了TaskExecutor.heartbeatFromResourceManager后再无后续操做。
  2.接着是akka中的第二种通讯方式——异步返回。我这里的使用的是taskmanager向resourcemanager远程注册的例子来说解。
  这里使用了akka的异步返回机制。若是对akka的异步返回不太熟悉的朋友,我推荐你们看一下http://sunxiang0918.cn/2016/01/10/Akka-in-JAVA-1/。这里一共有四篇文章,对于akka入门有极大裨益。另外,我会在下篇博客发布时,将整理的flink中关于akka的代码发布到个人github上,到时你们能够参考一下。这里我配合思惟导图方便你们的理解。
  从TaskExecutorToResourceManagerConnection.ResourceManagerRegistration.invokeRegistration讲起。该方法内部调用了resourceManager.registerTaskExecutor。这里的resourceManager实际类型是FencedAkkaInvocationHandler。FencedAkkaInvocationHandler继承自AkkaInvocationHandler。这里的部分调用流程与上面的异步无返回相似,我就从其中不一样的地方讲起。因为咱们这里的返回值类型为CompletableFuture<RegistrationResponse>,不是Void类型,所以,这里首先调用了FencedAkkaInvocationHandler.ask,接着调用了FencedAkkaInvocationHandler.fenceMessage将信息类型封装为RemoteFencedMessage,接着调用AkkaInvocationHandler.ask。这里是比较复杂的地方。首先调用了Patterns.ask(ActorRef, message),这里的ActorRef是resourcemanager端的化身,Patterns.ask是akka用于远程异步调用的一种方式。其返回值为scala.concurrent.Future,也就是scala类型的Future。该类型有方法onComplete,做用是当该Future完成是,不管是抛出异常或返回值完成此将来时,调用该方法入参中的函数。这里咱们经过FutureUtils.toJava将scala中的Future转换为java中的CompletableFuture。获得CompletableFuture后,taskmanager端接着调用CompletableFuture.thenApply方法,内部调用了返回值的deserializeValue方法,也就是获取到远程的序列化的返回值后,将其反序列化。因为咱们这里rpc调用的方法返回值是CompletableFuture类型,所以这里并不阻塞,直接返回。
  而后,咱们来到resourcemanager端,这里的AkkaRpcActor.onReceive方法被调用(注意,这里的实际类型是FencedAkkaRpcActor),因为传入的类型为RemoteFencedMessage,这里接着调用了FencedAkkaRpcActor.handleRpcMessage。通过几个判断后,这里调用了AkkaRpcActor.handleRpcMessage,此时,这里的入参为RemoteFencedMessage.getPayload,也就是RemoteRpcInvocation。接下来的流程我在上面已经提到,这里就不赘述了。所不一样的是,咱们这里的返回为类型为CompletableFuture,所以,这里接着会调用AkkaRpcActor.sendAsyncResponse。这里首先调用了方法——Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender),这里的promise是scala中的Promise.DefaultPromise类型,该方法的做用其实就是讲java中的CompletableFuture转换为scala中的类型DefaultPromise,毕竟,java中的CompletableFuture类型没法实现rpc。sendAsyncResponse方法的做用就是,当入参asyncResponse完成后,会调用Promise.DefaultPromise的相应方法(success或failure)被调用。此时,因为Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender)已经被调用,所以,taskmanager端调用Patterns.ask方法的返回的future为完成状态,也就是调用了其onComplete。接着,在taskmanager端将返回值反序列化,完成异步rpc的调用。
  3.接着是akka的最后通讯方式——阻塞返回。在flink中的对应的方法是AkkaRpcActor.sendSyncResponse(这里在flink中不多用到,所以我这里并无举例)。
  这里rpc调用方法的返回值为非CompletableFuture类型,前面的调用流程与上面讲述的异步返回同样,所不一样的是,因为方法返回值类型为非CompletableFuture,所以,这里调用了CompletableFuture.get,这里会一直阻塞,直待该CompletableFuture的完成。这里的CompletableFuture其实就是经过FutureUtils.toJava实现了将scala中的future转换为java中的CompletableFuture。也就是说,这里会一直等到远程方法Promise.DefaultPromise的相应方法(success或failure)被调用,这里的阻塞才会被打断。
  好了,到这里为止,关于flink中应用akka完成其rpc通讯框架的流程就结束了,感谢你们的关注。