Livy探究(七) -- 编程接口分析

在前面的篇章中,咱们把重点放在livy的REPL功能的展现和源码分析。这篇咱们探索一下livyUsing the Programmatic API功能。html

REPL不一样的是,Programmatic API提供了在一个“已经存在”的SparkContext上执行处理程序的机制。用户须要实现Job接口:apache

public interface Job<T> extends Serializable {
  T call(JobContext jc) throws Exception;
}

JobContext对象能够访问到SparkContextSQLContext,因此,用户不用关心SparkContext的建立。api

当用户编写了实现Job接口的类后,打包成jar包,就能够使用LivyClient上传jar包、提交Job了。session

这里作个补充 livyUrl是指 http://host:port/sessions/xxxx,即为对应session的RESTful地址。livy的例子文档中,没有写清楚的是 livyUrl是什么。

livy服务端,会在对应session的SparkContext上运行Job。框架

源码分析

客户端代码

LivyClientLivyClientFactory在livy源码中是接口,而实现是HttpClientFactorydom

/**
 * Factory for HTTP Livy clients. */
public final class HttpClientFactory implements LivyClientFactory {
    @Override
    public LivyClient createClient(URI uri, Properties config) {
        if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) {
            return null;
        }
        return new HttpClient(uri, new HttpConf(config));
    }
}

LivyClientBuilder使用ServiceLoader加载LivyClientFactory的实现类。ide

HttpClient会向以下几个接口地址请求:源码分析

  • /sessions/%d/upload-jar
  • /sessions/%d/add-jar
  • /sessions/%d/upload-file
  • /sessions/%d/add-file
  • /sessions/%d/submit-job
  • /sessions/%d/run-job

例如,当调用sumbit时,会请求/sessions/%d/submit-job。咱们重点看submit-job,最终会调用sendJobui

// command就是submit-job或run-job
private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) {
    final ByteBuffer serializedJob = serializer.serialize(job);
    JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer);
    handle.start(command, serializedJob);
    return handle;
}

这里看到,对job类的实例进行了序列化。这里用的序列化方式就是前面篇章中提到的kryo。序列化最终会把类转化成byte[]。稍后在服务端,能够看到会使用反序列化恢复Job类。spa

服务端代码

客户端的http请求首先会到达livyServer,入口主要在InteractiveSessionServlet

image.png

上面的代码接收到客户端请求,选择到对应的session,并调用session的submitJob

在第三篇中,咱们知道livyServer中的一个session本质上对应了一个正在运行的driver程序。而且session会链接到driver,建立出一条链路,进而与driver通讯。

上述submitJob最终会经过这条链路,向driver发送消息,那么发送的具体是什么消息呢?经过下面代码能够看到,其实发送的是BypassJobRequest,而且这里尚未对Job进行反序列化

String jobId = UUID.randomUUID().toString();
Object msg = new BypassJobRequest(jobId, jobType, BufferUtils.toByteArray(serializedJob), sync);
...
if (driverRpc.isSuccess()) {
  try {
    return driverRpc.get().call(msg, retType);
 } catch (Exception ie) {
    throw Utils.propagate(ie);
 }
}

根据这条线索,以及第六篇中关于RPC框架的结论。咱们在RSCDriver里面找到了对应的handle

image.png

进一步在BypassJob中看到,对序列化的job对象进行反序列化,并执行call的地方:

image.png

总结

本篇介绍了livyProgrammatic API,这部分官方文档目前比较少。所以,更多从源码角度作个分析,以便后续采坑。

下图总结了Job提交运行的基本流程:

image.png

  • Client端首先对Job对象采用kryo进行序列化,经过http接口调用到livyServer
  • livyServer封装请求为RPC消息BypassJobRequest,发送给对应的driver
  • driver侧对Job执行反序列化,并调用其call
相关文章
相关标签/搜索