在前面的篇章中,咱们把重点放在livy的REPL
功能的展现和源码分析。这篇咱们探索一下livy
Using the Programmatic API功能。html
与REPL
不一样的是,Programmatic API
提供了在一个“已经存在”的SparkContext上执行处理程序的机制。用户须要实现Job
接口:apache
public interface Job<T> extends Serializable { T call(JobContext jc) throws Exception; }
JobContext
对象能够访问到SparkContext
和SQLContext
,因此,用户不用关心SparkContext
的建立。api
当用户编写了实现Job
接口的类后,打包成jar包,就能够使用LivyClient
上传jar包、提交Job了。session
这里作个补充livyUrl
是指http://host:port/sessions/xxxx
,即为对应session的RESTful地址。livy的例子文档中,没有写清楚的是livyUrl
是什么。
livy服务端,会在对应session的SparkContext上运行Job。框架
LivyClient
和LivyClientFactory
在livy源码中是接口,而实现是HttpClientFactory
:dom
/** * Factory for HTTP Livy clients. */ public final class HttpClientFactory implements LivyClientFactory { @Override public LivyClient createClient(URI uri, Properties config) { if (!"http".equals(uri.getScheme()) && !"https".equals(uri.getScheme())) { return null; } return new HttpClient(uri, new HttpConf(config)); } }
而LivyClientBuilder
使用ServiceLoader加载LivyClientFactory
的实现类。ide
HttpClient会向以下几个接口地址请求:源码分析
例如,当调用sumbit时,会请求/sessions/%d/submit-job
。咱们重点看submit-job
,最终会调用sendJob
:ui
// command就是submit-job或run-job private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) { final ByteBuffer serializedJob = serializer.serialize(job); JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer); handle.start(command, serializedJob); return handle; }
这里看到,对job类的实例进行了序列化
。这里用的序列化方式就是前面篇章中提到的kryo
。序列化最终会把类转化成byte[]
。稍后在服务端,能够看到会使用反序列化恢复Job类。spa
客户端的http请求首先会到达livyServer,入口主要在InteractiveSessionServlet
:
上面的代码接收到客户端请求,选择到对应的session,并调用session的submitJob
。
在第三篇中,咱们知道livyServer中的一个session本质上对应了一个正在运行的driver程序。而且session会链接到driver,建立出一条链路,进而与driver通讯。
上述submitJob最终会经过这条链路,向driver发送消息,那么发送的具体是什么消息呢?经过下面代码能够看到,其实发送的是BypassJobRequest
,而且这里尚未对Job进行反序列化
:
String jobId = UUID.randomUUID().toString(); Object msg = new BypassJobRequest(jobId, jobType, BufferUtils.toByteArray(serializedJob), sync); ... if (driverRpc.isSuccess()) { try { return driverRpc.get().call(msg, retType); } catch (Exception ie) { throw Utils.propagate(ie); } }
根据这条线索,以及第六篇中关于RPC框架的结论。咱们在RSCDriver
里面找到了对应的handle
:
进一步在BypassJob
中看到,对序列化的job对象进行反序列化,并执行call
的地方:
本篇介绍了livy
的Programmatic API,这部分官方文档目前比较少。所以,更多从源码角度作个分析,以便后续采坑。
下图总结了Job提交运行的基本流程:
BypassJobRequest
,发送给对应的drivercall