Spark RPC框架源码分析(一)简述

一. Spark rpc框架概述

Spark是最近几年已经算是最为成功的大数据计算框架,那么此次咱们就来介绍它内部的一个小点,Spark RPC框架。html

在介绍以前,咱们须要先说明什么是RPC,引用百度百科:java

RPC(Remote Procedure Call)—远程过程调用,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通讯程序之间携带信息数据。react

Spark RPC能够说是Spark分布式集群的基础,如果将Spark类比为一我的的话,Spark RPC无疑就是它的血液部分。而在Spark1.6以前,它的RPC部分仍是用akka实现的,但以后底层就换成了netty来实现。为何要这样作呢?由于啊,这样将Spark和Akka耦合在了一块儿,若是你系统自己就有使用到Akka,而后又想使用Spark的话,那两个Akka框架版本不一致可怎么办呀,这无疑是很让人头痛的。Spark团队正是考虑到了这一点,因此将Akka替换成了netty。git

此次咱们就来看看Spark是如何让它的血液流动起来的吧。有一位大神将Spark RPC中的RPC部分剥离出来,弄成一个新的可运行的 RPC 项目,这个项目自己就能够看成一个简易的Akka来使用,地址在这Spark RPC程序员

虽然名字不同,但这个项目的类和内容基本和Spark Core中RPC部分的代码和结构基本是同样的,这样咱们就能够经过这个来学习Spark RPC框架。github

PS:所用spark版本:spark2.1.0算法

二.Spark RPC中的 Hello world

咱们程序员学东西最喜欢从一个Hello world开始,那么接下来咱们就来演示如何下载并运行最简单的Hello World例子吧。编程

首先,我使用的编译器是IDEA,经过idea将github上的代码clone下来。
能够看到项目目录下有两个模块,网络

  • kraps-rpc
  • kraps-rpc-example

kraps-rpc存放的是Spark RPC的源代码,而咱们要作的便是运行 kraps-rpc-example中的示例代码。架构

启动PRC的话首先须要启动Server端,开启监听服务,而后才能经过Client进行访问。这里在HelloworldServer.scala中都已经帮咱们写好,不过在main方法中须要修改一下内容,就是将host改成本机地址。

def main(args: Array[String]): Unit = {
//    val host = args(0)
    val host = "localhost"
    val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }

而后咱们只须要右键该文件而后执行便可。

接下来咱们就须要启动Client端代码,咱们先到HelloworldClient文件中,这里面提供了同步和异步两个方法能够运行。代码一样都已经写好,经过修改注释便可使用不一样的方法运行。一样是右键点击该文件执行。

def main(args: Array[String]): Unit = {
    //异步方法
    //asyncCall()
    //同步方法
    syncCall()
  }

异步方法中,ask会返回一个Future(注意这里的Future是scala中的Future,和java的是不同的)。而且在Future运行结果出来前,咱们能够去作其余事情(异步的优点所在)。scala中的Future和Java的Future有些不一样,不过这能够先不去管,先看成Java里面的Future便可。

def asyncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete {
      case scala.util.Success(value) => println(s"Got the result = $value")
      case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("3s"))
    //在future结果运行出来前,会先打印这条语句。
    println("print me at first!")
    Thread.sleep(7)
  }

而同步方法是直接将结果返回,而且会阻塞,这个时间内你没法作其余事情,只能等待,直到结果返回

def syncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val result = endPointRef.askWithRetry[String](SayBye("neo"))
    println(result)

  }

很简单是吧,运行过例子后,咱们就能够来了解一些Spark RPC运行过程当中相当重要的两个编程模型,以及在这其中使用到的一些主要的类。

三.Spark RPC中的两个编程模型以及各个类

Spark RPC是使用了Actor模型和Reactor模型的混合模式,咱们结合两种模型分别说明Spark RPC中各个类的做用:

首先咱们先来看Spark RPC的类图。

Spark RPC 类图

是否是感受很乱?没事,咱们来逐步剖析各个类。

为了更加清楚了说明各个类的关系,咱们要先知道两个模型,分别是Actor模型和Reactor模型,咱们将从这两个模型的角度来拆解各个类的关系。

Actor模型

其实以前也有写过一篇介绍Actor模型的文章,感兴趣的同窗能够点击这里查看Actor模型浅析

其实Actor主要就是这副图的内容:
Actor并发编程模型
在Spark RPC中有几个类分别与Actor模型中的各个角色对应,对应以下,左边的是Spark RPC中的类,右边的是Actor模型中的角色:

RpcEndpoint => Actor

RpcEndpointRef => ActorRef

RpcEnv => ActorSystem

咱们逐个来看:

RpcEnv --RPC Environment

RPC Environment 是 RpcEndpoint 的运行环境。它管理 RpcEndpoint 的整个生命周期:

  1. 经过名字或 URI 注册 RpcEndpoint。
  2. 对到底的消息进行路由,决定分发给哪一个 RpcEndpoint。
  3. 中止 RpcEndpoint。

RPC Environment在akka已经被移除的2.0后面版本中,RPC Environment的实现类是NettyRpcEnv。一般是由NettyRpcEnvFactory.create建立。

RpcEndpoint

RpcEndpoint能经过callbacks接收消息。一般须要咱们本身写一个类继承RpcEndpoint。编写本身的接收信息和返回信息规则。

RpcEndpoint的生命周期被RPC Environment管理。其生命周期包括,onStart,receive和onStop。

它是做为服务端,好比上面例子中的HelloworldServer就是一个RpcEndpoint。

RpcEndpointRef

RpcEndpointRef是RpcEndpoint在RPC Environment中的一个引用。

它包含一个地址(即Spark URL)和名字。RpcEndpointRef做为客户端向服务端发送请求并接收返回信息,一般能够选择使用同步或异步的方式进行发送。

Reactor模型

Spark RPC采用Actor模型和Reactor模型混合的结构,上面已经介绍了Actor,那么如今咱们就来介绍Reactor模型,一样,咱们能够从一张图来看Reactor的架构。

Reactor模型

使用Reactor模型,由底层netty建立的EventLoop作I/O多路复用,这里使用Multiple Reactors这种形式,如上图所示,从netty的角度而言,Main Reactor和Sub Reactor对应BossGroup和WorkerGroup的概念,前者负责监听TCP链接、创建和断开,后者负责真正的I/O读写。

而图中的ThreadPool就是的Dispatcher中的线程池,它来解耦开来耗时的业务逻辑和I/O操做,这样就能够更scalabe,只须要少数的线程就能够处理成千上万的链接,这种思想是标准的分治策略,offload非I/O操做到另外的线程池。

Dispatcher

Dispatcher的主要做用是保存注册的RpcEndpoint、分发相应的Message到RpcEndPoint中进行处理。Dispatcher便是上图中ThreadPool的角色。它同时也维系一个threadpool,用来处理每次接受到的InboxMessage。而这里处理InboxMessage是经过inbox实现的。

Inbox

Inbox其实属于Actor模型,是Actor中的信箱,不过它和Dispatcher联系紧密因此放这边。

InboxMessage有多个实现它的类,好比OneWayMessage,RpcMessage,等等。Dispatcher会将接收到的InboxMessage分发到对应RpcEndpoint的Inbox中,而后Inbox便会处理这个InboxMessage。

OK,此次就先介绍到这里,下次咱们从代码的角度来看Spark RPC的运行机制

若是以为对你有帮助,不妨关注一波吧~~

参考资料:https://zhuanlan.zhihu.com/p/28893155


推荐阅读:
从分治算法到 MapReduce
Actor并发编程模型浅析
大数据存储的进化史 --从 RAID 到 Hadoop Hdfs
一个故事告诉你什么才是好的程序员

相关文章
相关标签/搜索