Livy探究(六) -- RPC的实现

Livy基于netty构建了一个RPC通讯层。本篇咱们来探究一下Livy的RPC层的实现细节。读者应当具有netty编程的基础知识。web

RPC相关的代码主要在rsc目录和org.apache.livy.rsc包中。apache

KryoMessageCodec

Kryo是一种对象序列化和反序列化工具。通讯双方须要互相发送消息,livy选择了Kryo做为消息的编解码器,并在netty框架中实现编码和解码接口:编程

class KryoMessageCodec extends ByteToMessageCodec<Object> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {...}
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) {...}
}

当请求消息到来时,netty首先会调用decode对消息进行解码;当消息要发送到对端的最后关头,netty会调用encode对消息进行编码。缓存

SaslHandler

livy的rpc通讯支持基于sasl的认证。因此在livy的rpc实现中,有一个叫SaslHandlerSimpleChannelInboundHandler。在正式通讯前,客户端和服务端须要通过一次认证的过程。这里不罗列代码,可是将认证的过程作一个分析。回顾一下第三篇中核心架构细节部分的时序图,一个session的建立过程为:livyServer启动一个RpcServer1和一个SparkSubmit(提交driver)。这时有个细节是,livyServer会生成一个clientId,记录在内存中,并把clientId经过配置文件传给driver。driver启动后要链接RpcServer1,就要带上这个clientId。livy经过SaslMessage消息来封装clientIdsession

static class SaslMessage {
  final String clientId;
  final byte[] payload;
  SaslMessage() {
    this(null, null);
  }
  SaslMessage(byte[] payload) {
    this(null, payload);
  }
  SaslMessage(String clientId, byte[] payload) {
    this.clientId = clientId;
    this.payload = payload;
  }
}

driver会先发送SaslMessage给RpcServer1,livyServer收到后,从本身内存中寻找是否存在SaslMessage.clientId,若是存在就算认证经过了。driver接下来才得以进一步发送其余消息。架构

因此,一个rpc信道的创建分为未认证阶段和认证完成阶段。livy是基于netty实现的通讯层,咱们知道netty是经过添加pipeline的方式添加处理环节的。在服务端完成bind,或者客户端完成connect后的pipeline是这样的:框架

image.png

客户端经过发送hello发起"认证"(认证的逻辑上面提到了)。认证完成后,SaslHandler会从pipeline中移除,并添加新的业务handler,称为RpcDispatcherRpcDispatcher根据功能不一样有不一样的实现。下面的代码片断中,SaslHandler将自身从netty的pipeline中移除:异步

abstract class SaslHandler extends SimpleChannelInboundHandler<Rpc.SaslMessage> {
...
    @Override
    protected final void channelRead0(ChannelHandlerContext ctx, Rpc.SaslMessage msg)
    throws Exception {
        LOG.debug("Handling SASL challenge message...");
        ...
        // If negotiation is complete, remove this handler from the pipeline, and register it with
        // the Kryo instance to handle encryption if needed. 
        ctx.channel().pipeline().remove(this);
        ...
    }
...
}

下面的代码片断,在netty中添加须要的RpcDispatcheride

void setDispatcher(RpcDispatcher dispatcher) {
  Utils.checkNotNull(dispatcher);
  Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
  this.dispatcher = dispatcher;
  channel.pipeline().addLast("dispatcher", dispatcher);
  dispatcher.registerRpc(channel, this);
}

RpcDispatcher

RpcDispatcher顾名思义是一种处理请求的分发器,负责把请求分发给合适的处理函数处理。在livy中只要是从链路中收到的消息都由RpcDispatcher分发和处理。函数

消息分为CALLREPLYERROR三类,从源码的MessageHeader看得出来:

static enum MessageType {
  CALL,
  REPLY,
  ERROR;
}
static class MessageHeader {
  final long id;
 final MessageType type;
 MessageHeader() {
    this(-1, null);
 }
  MessageHeader(long id, MessageType type) {
    this.id = id;
 this.type = type;
 }
}

MessageHeader中包含请求id和请求type。

发起RPC请求一方,会将请求暂存在rpcCalls缓存中,应答方会返回REPLY或者ERROR。请求方的RpcDispatcher此时处理REPLYERROR的时候,从rpcCalls中找到匹配的Promise,并激活。下面的流程展现了这个过程:

image.png

上述利用Promise实现了一种典型的异步请求框架

对于CALL类型的消息,RpcDispatcher采用反射的方式,实现真正的分发动做,与许多web框架的作法十分类似。

image.png

在第五篇"解释器的实现"中,提到的ReplDriver就是一种RpcDispatcher,回顾一下其中的handle方法:

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplCompleteRequest): Array[String] = {
    ...
}

def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetReplJobResults): ReplJobResults = {
    ...
}

综上所述,经过反射,RpcDispatcher将消息分发给对应的handle方法处理。

在livy中包含以下几种RpcDispatcher

  • RSCDriver,处理通用Job类消息。在driver侧使用
  • ReplDriver,继承自RSCDriver,处理ReplJob类消息,在driver侧使用
  • RegistrationHandler,只处理RemoteDriverAddress消息。是livyServer在启动driver后,为了可以接收driver反向发送过来的RemoteDriverAddress

总结

本篇从源码角度,剖析了livy中rpc通讯的关键部分。livy采用kryo作编解码;在通讯初期采用sasl进行认证和握手;完成认证后,采用反射实现了一套请求分发机制。此外,livy大量采用netty框架提供的Promise,提供了一种异步RPC机制,也值得学习和借鉴。

相关文章
相关标签/搜索