
1. Job执行在Linkis处在什么位置微信
Job执行在Linkis的架构中处于统一做业执行服务(Unified Job Execution Services)中,以下图所示:
websocket

2. Job处理流程架构

2.1 Job封装并发
1. Clinet(如前端)发起请求执行Job,Job请求信息以下:
异步
2. GateWay收到Job后将Job经过executeApplicationName和requestApplicationName转发给对应的Entrance。socket
3. Entrance接受到对应的Job后,会调用JobHistory的RPC对Job的元信息进行持久化,并对前端的请求进行解析对自定义变量和参数进行封装,封装为能够执行的Job。svg
2.2 Job的执行高并发
1. 拿到解析后的Job后,Entrance会先为用户去请求一个计算引擎,在请求前先去ResourceManager找到负载最低的EngineManager(引擎管理器);
2. 接着会向负载低的EM请求启动Engine,若是是JDBC和MLSQL没有EM的服务,这步会在Entrance里面进行;
3. EM接收到启动新的Engine后,会先向RM(资源管理器)为这个用户申请资源启动Engine,若是资源足够则准备启动引擎;
4. EM根据申请到的资源启动新的Engine,引擎启动成功后会向EM推送信息,EM接着推送给Entrance,至此引擎的申请流程结束;
5. Entrance请求到Engine后,首先为该Job申请锁定该Engine以防止其余Entrance和Job提交到该Engine;
6. Entrance锁定Engine成功后,会将Job发送给Engine执行,至此Job开始实际执行
这里须要说明的是若是用户存在一个可用空闲的引擎,则会跳过1,2,3,4 四个步骤;
2.3 Job信息推送
1. Job执行过程当中为了客户端的用户能够实时看到job的运行情况,Engine会将Job的状态/进度/日志经过RPC实时推送给Entrance;
2. Entrance收到Job信息进行持久化处理,若是是websocket则将信息直接推送给用户;
3. Job执行完成后,Engine会将最终状态和结果集推送给Entrance,Entrance收到后将结果保存到结果集路径,接着更新JobHistory中的状态信息和结果集信息;
4. 客户端经过状态接口判断Job的成功与否,若是成功则能够经过调用接口请求JobHistory拿到结果集信息。至此整个Job则执行完成。
3 Linkis的Job执行源码解读
第三章对Job的处理流程进行了一个简单讲解,本章主要讲解在这一流程中Entrance,EngineManager,Engine的代码调用流程。
3.1.Entrance处理流程
Entrance是Job执行的入口,当Job从GateWay转发后会先到EntranceRestfulApi或者EntranceWebSocketService分别对应Rest和WebSocket请求的逻辑处理。下面咱们从WebSocket请求进行讲解,Rest请求相似;
1. 前端的WebSocket请求经过Gateway转发到ServerSocket类,继承了WebSocketAdapter,ServerSocket会调用ControllerServer的onMessage的方法将消息投递到serverListenerEventBus消息总线,serverListenerEventBus接着会将消息给到EntranceWebSocketService进行处理处理部分代码以下:
2. 执行请求处理逻辑:EntranceWebSocketService的dealExecute方法会调用entranceServer.execute去执行Job,entranceServer接着会对job进行解析封装而后提交给调度器。
3. Job执行流程:调度拿到Job后会为这个Job生成一个groupName,而后经过groupName去ConsumerManager获取一个Consumer(分组消费器),并将该Job传递给这个FIFOUserConsumer ,FIFOUserConsumer 接着会循环去BlockingLoopQueue里面取Job进行消费,拿到Job后FIFOUserConsumer 会请求一个引擎,接着将该Job提交给线程池运行。
Job线程运行起来后首先会将该Job请求提交给远程Engine进行执行,并拿到响应,为了提高Job的性能,这里通常会返回AsynReturnExecuteResponse(异步返回的请求响应) 用于将Job的状态和信息都异步返回回来,该线程能够直接执行完下降线程开销
4. Job信息推送:Job信息推送为Engine经过RPC推送给Entrance,Entrance再经过WebSocket推送给用户,或者用户经过Rest请求。
3.2 EngineManager
引擎管理器EngineManager是用来管理引擎的,用于对引擎的生命周期进行管理,当Entrance中的Consumer发起askExecutor时会将PRC请求发送给EngineManagerReceiver,
EM接收到消息会先向RM判断该用户和Creator是否还有足够的资源启动引擎,资源判断经过后才会发起引擎启动
Engine的正常退出流程分为两种一、用户发起kill命令,经过EM 杀掉Engine 二、Engine空闲时间过长自行kill,默认一个小时
用户调用kill后会EM会调用:engineManager.getEngineManagerContext.getOrCreateEngineFactory.delete
3.3.Engine处理流程
Engine启动成功后就能够接受Entrance的job请求,执行并推送Job信息给到Entrance,Entrance的job请求会发送到EngineReceiver
engine的Job执行调度流程和Entrance的相同,除了实现类不同都是经过Scheduler--Consumer--Executor--Executor执行Job,这里须要说明的是Executor对应的是相应Engine的具体执行代码的实现类不是Engnie,这是与Entrance有区别的地方,执行流程也主要在Executor进行实现的:
Job信息经过JobDemo定时进行推送:
3.4 Linkis的Job架构归纳
上面从入口、引擎管理器、引擎介绍了一个Job的总体执行流程,Job执行的总体的调用链能够总结为下图:

4. 总结
本文从Job在Linkis 0.X版本中所处的位置进行引入,介绍了一个Job从提交到封装、运行、信息推送的整个流程和源码解析。在Linkis1.0版本中咱们对Job执行作了多个优化:
1. 任务标签化:灵活的经过标签作租户隔离,智能路由对应的引擎服务,并支持经过标签指定提交的Hadoop集群等;
2. 任务全栈化:支持在交互做业的基础上,支持流式、一次性批量等做业类型;
3. 任务解析策略化:借鉴Calcite思想,对任务进行编排优化,支持更多的计算策略,智能调度执行;
4. 服务简化:统一Entrance和EngineManager服务,底层计算存储引擎只须要实现引擎插件(EngineConnPlugin)就能够完成新引擎的实现,不在须要实现Entrance和EngineManager服务。
Linkis1.0 新的架构对多个模块进行了架构调整和优化,敬请期待,同时Linkis1.0有多个模块正在开发实现当中,欢迎社区各位大佬的加入。
本文从Job在Linkis中所处的位置进行引入,介绍了一个Job从提交到封装、运行、信息推送的整个流程和源码解析。后续将为你们带来Linkis更多模块的代码解析,敬请期待。

扫码关注咱们
微信号公众号 : WeDataSphere
GitHub:WeDataSphere
若是喜欢咱们的产品或文章,请给咱们的GitHub点上你宝贵的star和fork哦~~
WeDataSphere,BIG DATA MADE EASY.
用心作一个有温度的开源社区
~欢迎关注~
欢迎加入咱们的有奖征文活动哦,详见以下连接~
同时诚挚的但愿您点开“阅读原文”,在OSC开源投票中,为Linkis与DataSphere Studio投上您宝贵的一票哦~~
本文分享自微信公众号 - WeDataSphere(gh_273e85fce73b)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。