前情提要:html
上一篇咱们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Reactor模型以及一些经常使用的类。这一篇咱们仍是用上一篇的例子,从代码的角度讲述Spark RPC的运行时序,从而揭露Spark RPC框架的运行原理。咱们主要将分红两部分来说,分别从服务端的角度和客户端的角度深度解析。java
不过源码解析部分都是比较枯燥的,Spark RPC这里也是同样,其中不少东西都是绕来绕去,墙裂建议使用上一篇中介绍到的那个Spark RPC项目,下载下来并运行,经过断点的方式来一步一步看,结合本篇文章,你应该会有更大的收获。程序员
PS:所用spark版本:spark2.1.0算法
咱们将以上一篇HelloworldServer为线索,深刻到Spark RPC框架内部的源码中,来看看启动一个服务时都作了些什么。编程
由于代码部分都是比较绕的,每一个类也常常会搞不清楚,我在介绍一个方法的源码时,一般都会将类名也一并写出来,这样应该会更加清晰一些。bootstrap
HelloworldServer{ ...... def main(args: Array[String]): Unit = { //val host = args(0) val host = "localhost" val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345) val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv) rpcEnv.setupEndpoint("hello-service", helloEndpoint) rpcEnv.awaitTermination() } ...... }
这段代码中有两个主要流程,咱们分别来讲promise
首先是下面这条代码的运行流程:网络
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)多线程
其实就是经过 NettyRpcEnvFactory 建立出一个 RPC Environment ,其具体类是 NettyRpcEnv 。并发
咱们再来看看建立过程当中会发生什么。
object NettyRpcEnvFactory extends RpcEnvFactory { ...... def create(config: RpcEnvConfig): RpcEnv = { val conf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance] //根据配置以及地址,new 一个 NettyRpcEnv , val nettyEnv = new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress) //若是是服务端建立的,那么会启动服务。服务端和客户端都会经过这个方法建立一个 NettyRpcEnv ,但区别就在这里了。 if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => //启动服务的方法,下一步就是调用这个方法了 nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv } ...... }
还没完,若是是服务端调用这段代码,那么主要的功能是建立RPCEnv,即NettyRpcEnv(客户端在后面说)。以及经过下面这行代码,
nettyEnv.startServer(config.bindAddress, actualPort)
去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看。
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... def startServer(bindAddress: String, port: Int): Unit = { // here disable security val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList() //TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输 server = transportContext.createServer(bindAddress, port, bootstraps) //在每一个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的做用是客户端调用的时候先用它来询问 Endpoint 是否存在。 dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) } ...... }
执行完毕以后这个create方法就结束。这个流程主要就是开启一些服务,而后返回一个新的NettyRpcEnv。
这条代码会去调用NettyRpcEnv中相应的方法
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } ...... }
咱们看到,这个方法主要是调用dispatcher进行注册的。dispatcher的功能上一节已经说了,
Dispatcher的主要做用是保存注册的RpcEndpoint、分发相应的Message到RpcEndPoint中进行处理。Dispatcher便是上图中ThreadPool的角色。它同时也维系一个threadpool,用来处理每次接受到的 InboxMessage。而这里处理InboxMessage是经过inbox实现的。
这里咱们就说一说dispatcher的流程。
dispatcher在NettyRpcEnv被建立的时候建立出来。
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... //初始化时建立 dispatcher private val dispatcher: Dispatcher = new Dispatcher(this) ...... }
dispatcher类被建立的时候也有几个属性须要注意:
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) { ...... //每一个 RpcEndpoint 其实都会被整合成一个 EndpointData 。而且每一个 RpcEndpoint 都会有一个 inbox。 private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) } //一个阻塞队列,当有 RpcEndpoint 相关请求(InboxMessage)的时候,就会将请求塞到这个队列中,而后被线程池处理。 private val receivers = new LinkedBlockingQueue[EndpointData] //初始化便建立出来的线程池,当上面的 receivers 队列中没内容时,会阻塞。当有 RpcEndpoint 相关请求(即 InboxMessage )的时候就会马上执行。 //这里处理 InboxMessage 本质上是调用相应 RpcEndpoint 的 inbox 去处理。 private val threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, Runtime.getRuntime.availableProcessors())) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool } ...... }
了解一些Dispatcher的逻辑流程后,咱们来正式看看Dispatcher的registerRpcEndpoint方法。
顾名思义,这个方法就是将RpcEndpoint注册到Dispatcher中去。当有Message到来的时候,便会分发Message到相应的RpcEndPoint中进行处理。
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) { ...... def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) //注册 RpcEndpoint 时须要的是 上面的 EndpointData ,其中就包含 endpointRef ,这个主要是供客户端使用的。 val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) //多线程环境下,注册一个 RpcEndpoint 须要判断如今是否处于 stop 状态。 synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } //新建 EndpointData 并存储到一个 ConcurrentMap 中。 if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) //将 这个 EndpointData 加入到 receivers 队列中,此时 dispatcher 中的 threadpool 会去处理这个加进来的 EndpointData //处理过程是调用它的 inbox 的 process()方法。而后 inbox 会等待消息到来。 receivers.offer(data) // for the OnStart message } endpointRef } ...... }
Spark RPC服务端逻辑小结:咱们说明了Spark RPC服务端启动的逻辑流程,分为两个部分,第一个是RPC env,即NettyRpcEnv的建立过程,第二个则是RpcEndpoint注册到dispatcher的流程。
1. NettyRpcEnvFactory 建立 NettyRpcEnv
2. Dispatcher注册RpcEndpoint
依旧是以上一节 HelloWorld 的客户端为线索,咱们来逐层深刻在 RPC 中,客户端 HelloworldClient 的 asyncCall() 方法。
object HelloworldClient { ...... def asyncCall() = { val rpcConf = new RpcConf() val config = RpcEnvClientConfig(rpcConf, "hello-client") val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config) val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") val future: Future[String] = endPointRef.ask[String](SayHi("neo")) future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") } Await.result(future, Duration.apply("30s")) rpcEnv.shutdown() } ...... }
建立Spark RPC客户端Env(即NettyRpcEnvFactory)部分和Spark RPC服务端是同样的,只是不会开启监听服务,这里就不详细展开。
咱们从这一句开始看,这也是Spark RPC客户端和服务端区别的地方所在。
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
上面的的setupEndpointRef最终会去调用下面setupEndpointRef()这个方法,这个方法中又进行一次跳转,跳转去setupEndpointRefByURI这个方法中。须要注意的是这两个方法都是RpcEnv里面的,而RpcEnv是抽象类,它里面只实现部分方法,而NettyRpcEnv继承了它,实现了所有方法。
abstract class RpcEnv(conf: RpcConf) { ...... def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = { //会跳转去调用下面的方法 setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) } def setupEndpointRefByURI(uri: String): RpcEndpointRef = { //其中 asyncSetupEndpointRefByURI() 返回的是 Future[RpcEndpointRef]。 这里就是阻塞,等待返回一个 RpcEndpointRef。 // defaultLookupTimeout.awaitResult 底层调用 Await.result 阻塞 直到结果返回或返回异常 defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri)) } ...... }
这里最主要的代码其实就一句,
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
这一段能够分为两部分,第一部分的defaultLookupTimeout.awaitResult其实底层是调用Await.result阻塞等待一个异步操做,直到结果返回。
而asyncSetupEndpointRefByURI(uri)则是根据给定的uri去返回一个RpcEndpointRef,它是在NettyRpcEnv中实现的:
class NettyRpcEnv( val conf: RpcConf, javaSerializerInstance: JavaSerializerInstance, host: String) extends RpcEnv(conf) { ...... def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = { //获取地址 val addr = RpcEndpointAddress(uri) //根据地址等信息新建一个 NettyRpcEndpointRef 。 val RpcendpointRef = new NettyRpcEndpointRef(conf, addr, this) //每一个新建的 RpcendpointRef 都有先有一个对应的verifier 去检查服务端存不存在对应的 Rpcendpoint 。 val verifier = new NettyRpcEndpointRef( conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this) //向服务端发送请求判断是否存在对应的 Rpcendpoint。 verifier.ask[Boolean](RpcEndpointVerifier.createCheckExistence(endpointRef.name)).flatMap { find => if (find) { Future.successful(endpointRef) } else { Future.failed(new RpcEndpointNotFoundException(uri)) } }(ThreadUtils.sameThread) } ...... }
asyncSetupEndpointRefByURI()这个方法实现两个功能,第一个就是新建一个RpcEndpointRef。第二个是新建一个verifier,这个verifier的做用就是先给服务端发送一个请求判断是否存在RpcEndpointRef对应的RpcEndpoint。
这段代码中最重要的就是verifiter.ask[Boolean](...)了。若是有找到以后就会调用Future.successful这个方法,反之则会经过Future.failed抛出一个异常。
ask能够算是比较核心的一个方法,咱们能够到ask方法中去看看。
class NettyRpcEnv{ ...... private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = { val promise = Promise[Any]() val remoteAddr = message.receiver.address // def onFailure(e: Throwable): Unit = { // println("555"); if (!promise.tryFailure(e)) { log.warn(s"Ignored failure: $e") } } def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => println("666"); if (!promise.trySuccess(rpcReply)) { log.warn(s"Ignored message: $reply") } } try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { //跳转到这里执行 //封装一个 RpcOutboxMessage ,同时 onSuccess 方法也是在这里注册的。 val rpcMessage = RpcOutboxMessage(serialize(message), onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) promise.future.onFailure { case _: TimeoutException => println("111");rpcMessage.onTimeout() // case _ => println("222"); }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { // println("333"); onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) //promise 对应的 future onComplete时会去调用,但当 successful 的时候,上面的 run 并不会被调用。 promise.future.onComplete { v => // println("4444"); timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) } promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) } ...... }
这里涉及到使用一些scala多线程的高级用法,包括Promise和Future。若是想要对这些有更加深刻的了解,能够参考这篇文章。
这个函数的做用从名字中就能够看得出,其实就是将要发送的消息封装成一个RpcOutboxMessage,而后交给OutBox去发送,OutBox和前面所说的InBox对应,对应Actor模型中的MailBox(信箱)。用于发送和接收消息。
其中使用到了Future和Promise进行异步并发以及错误处理,好比当发送时间超时的时候Promise就会返回一个TimeoutException,而咱们就能够设置本身的onFailure函数去处理这些异常。
OK,注册完RpcEndpointRef后咱们即可以用它来向服务端发送消息了,而其实RpcEndpointRef发送消息仍是调用ask方法,就是上面的那个ask方法。上面也有介绍,本质上就是经过OutBox进行处理。
咱们来梳理一下RPC的客户端的发送流程。
客户端逻辑小结:客户端和服务端比较相似,都是须要建立一个NettyRpcEnv。不一样的是接下来客户端建立的是RpcEndpointRef,并用之向服务端对应的RpcEndpoint发送消息。
1.NettyRpcEnvFactory建立NettyRpcEnv
2. 建立RpcEndpointRef
3. RpcEndpointRef使用同步或者异步的方式发送请求。
OK,以上就是SparkRPC时序的源码分析。下一篇会将一个实际的例子,Spark的心跳机制和代码。喜欢的话就关注一波吧
推荐阅读 :
从分治算法到 MapReduce
Actor并发编程模型浅析
大数据存储的进化史 --从 RAID 到 Hadoop Hdfs
一个故事告诉你什么才是好的程序员