Livy基于netty构建了一个RPC通讯层。本篇咱们来探究一下Livy的RPC层的实现细节。读者应当具有netty编程的基础知识。web
RPC相关的代码主要在rsc目录和org.apache.livy.rsc包中。apache
Kryo
是一种对象序列化和反序列化工具。通讯双方须要互相发送消息,livy选择了Kryo做为消息的编解码器,并在netty框架中实现编码和解码接口:编程
class KryoMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...} @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...} }
当请求消息到来时,netty首先会调用decode对消息进行解码;当消息要发送到对端的最后关头,netty会调用encode对消息进行编码。缓存
livy的rpc通讯支持基于sasl的认证。因此在livy的rpc实现中,有一个叫SaslHandler
的SimpleChannelInboundHandler
。在正式通讯前,客户端和服务端须要通过一次认证的过程。这里不罗列代码,可是将认证的过程作一个分析。回顾一下第三篇中核心架构细节部分的时序图,一个session的建立过程为:livyServer启动一个RpcServer1
和一个SparkSubmit(提交driver)。这时有个细节是,livyServer会生成一个clientId
,记录在内存中,并把clientId经过配置文件传给driver。driver启动后要链接RpcServer1
,就要带上这个clientId。livy经过SaslMessage
消息来封装clientId
:session
static class SaslMessage { final String clientId; final byte[] payload; SaslMessage() { this(null, null); } SaslMessage(byte[] payload) { this(null, payload); } SaslMessage(String clientId, byte[] payload) { this.clientId = clientId; this.payload = payload; } }
driver会先发送SaslMessage给RpcServer1
,livyServer收到后,从本身内存中寻找是否存在SaslMessage.clientId
,若是存在就算认证经过了。driver接下来才得以进一步发送其余消息。架构
因此,一个rpc信道的创建分为未认证阶段和认证完成阶段。livy是基于netty实现的通讯层,咱们知道netty是经过添加pipeline的方式添加处理环节的。在服务端完成bind,或者客户端完成connect后的pipeline是这样的:框架
客户端经过发送hello
发起"认证"(认证的逻辑上面提到了)。认证完成后,SaslHandler
会从pipeline中移除
,并添加新的业务handler
,称为RpcDispatcher
。RpcDispatcher
根据功能不一样有不一样的实现。下面的代码片断中,SaslHandler
将自身从netty的pipeline中移除:异步
abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> { ... @Override protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg) throws Exception { LOG.debug("Handling SASL challenge message..."); ... // If negotiation is complete, remove this handler from the pipeline, and register it with // the Kryo instance to handle encryption if needed. ctx.channel().pipeline().remove(this); ... } ... }
下面的代码片断,在netty中添加须要的RpcDispatcher
:ide
void setDispatcher(RpcDispatcher dispatcher) { Utils.checkNotNull(dispatcher); Utils.checkState(this.dispatcher == null, "Dispatcher already set."); this.dispatcher = dispatcher; channel.pipeline().addLast("dispatcher", dispatcher); dispatcher.registerRpc(channel, this); }
RpcDispatcher
顾名思义是一种处理请求的分发器,负责把请求分发给合适的处理函数处理。在livy中只要是从链路中收到的消息都由RpcDispatcher
分发和处理。函数
消息分为CALL
,REPLY
,ERROR
三类,从源码的MessageHeader
看得出来:
static enum MessageType { CALL, REPLY, ERROR; } static class MessageHeader { final long id; final MessageType type; MessageHeader() { this(-1, null); } MessageHeader(long id, MessageType type) { this.id = id; this.type = type; } }
MessageHeader中包含请求id和请求type。
发起RPC请求一方,会将请求暂存在rpcCalls
缓存中,应答方会返回REPLY
或者ERROR
。请求方的RpcDispatcher
此时处理REPLY
,ERROR
的时候,从rpcCalls
中找到匹配的Promise
,并激活。下面的流程展现了这个过程:
上述利用Promise实现了一种典型的异步请求框架
对于CALL
类型的消息,RpcDispatcher
采用反射的方式,实现真正的分发动做,与许多web框架的作法十分类似。
在第五篇"解释器的实现"中,提到的ReplDriver
就是一种RpcDispatcher
,回顾一下其中的handle方法:
def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = { ... } def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = { ... }
综上所述,经过反射,RpcDispatcher
将消息分发给对应的handle
方法处理。
在livy中包含以下几种RpcDispatcher
:
RSCDriver
,处理通用Job
类消息。在driver侧使用ReplDriver
,继承自RSCDriver
,处理ReplJob
类消息,在driver侧使用RegistrationHandler
,只处理RemoteDriverAddress
消息。是livyServer在启动driver后,为了可以接收driver反向发送过来的RemoteDriverAddress
。本篇从源码角度,剖析了livy中rpc通讯的关键部分。livy采用kryo作编解码;在通讯初期采用sasl进行认证和握手;完成认证后,采用反射实现了一套请求分发机制。此外,livy大量采用netty框架提供的Promise,提供了一种异步RPC机制,也值得学习和借鉴。