history job的写入
1. org.apache.flink.runtime.jobmanager,Object JobManager
runJobManager中指定使用MemoryArchivist进行做业保存
startJobManagerActors中建立了进行做业保存的actor
此archive的actor会被传入jobmanager的actorweb
2. org.apache.flink.runtime.jobmanager,Class JobManager
handleMessage中接收到JobStatusChanged的msg以后会根据逻辑判断调用removeJob
接收到RemoveJob消息后,会调用removeJob
接收到RemoveCachedJob的时候,会调用removeJob
在SubmitJob的时候若是发现没有leader,会调用removeJob
3.MemoryArchivist
handleMessage中的 调用进行持久化的函数
archiveJsonFiles中的 传入路径path和执行图graph调用FsJobArchivist进行持久化apache
4.FsJobArchivist
archiveJob(Path rootPath, AccessExecutionGraph graph)
rootPath是配置的路径
graph是做业的执行图
archiveJob中首先调用WebMonitorUtils.getJsonArchivists()获取持久化的json类型,实际调用的是WebRuntimeMonitor.getJsonArchivists
目前的类型包括
new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),//joboverviewjson
new JobPlanHandler.JobPlanJsonArchivist(),//jobs/:jobid/plan
new JobConfigHandler.JobConfigJsonArchivist(),//jobs/:jobid/config
new JobExceptionsHandler.JobExceptionsJsonArchivist(),//jobs/:jobid/exceptions
new JobDetailsHandler.JobDetailsJsonArchivist(),//jobs/:jobid,//jobs/:jobid/vertices
new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),//jobs/:jobid/accumulatorsrestful
new CheckpointStatsHandler.CheckpointStatsJsonArchivist(),//jobs/:jobid/checkpoints
new CheckpointConfigHandler.CheckpointConfigJsonArchivist(),//jobs/:jobid/checkpoints/config
new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid
new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),//jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid函数
new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid
new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasktimes
new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),//jobs/:jobid/vertices/:vertexid/taskmanagers
new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/accumulators
new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/accumulatorsthis
new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum,//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt,
new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(),//jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators线程
上面全部的archivist都继承于JsonArchivist
其中只有一个接口 Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException
其从graph中获取相应的信息 组装成ArchivedJson,ArchivedJson的定义以下
public ArchivedJson(String path, String json) {
this.path = Preconditions.checkNotNull(path);
this.json = Preconditions.checkNotNull(json);
}
其中path指定存储的位置,json指定存储的内容rest
若是要新定义restful接口,则能够在上面增长JsonArchivist类型
若是只是要在已有的restful接口中增长字段,则能够修改上述的类型server
5.上述流程走完以后,每一个job会在hdfs上生成一个json文件,包含各类路径、指明对应的维度对象
History Job的读取
org.apache.flink.runtime.webmonitor.history
1.HistoryServer,负责历史做业的存储和展现,包含一个HistoryServerArchiveFetcher对象,此对象使用“刷新间隔,拉取路径,本地临时地址,”
2.HistoryServerArchiveFetcher根据指定的时间间隔,在单独的线程中调用JobArchiveFetcherTask获取的任务
3.JobArchiveFetcherTask是一个线程类,从指定的目录中不断的拉取数据,存入本地指定的路径;若是设置了每次拉取以后更新joboverview,则在拉取完毕以后进行joboverview的更新
4.org.apache.flink.runtime.history
调用FsJobArchivist中的Collection<ArchivedJson> getArchivedJsons(Path file)来获取数据,path指定存储的位置,返回该位置的全部Json数据
5.上述流程完毕以后,会在本地临时目录每一个job建立一个目录,目录中有不少子目录,分门别类的保存了各类的json文件
文件保存
从上述的过程当中,在jobmanager写入文件的时候,是不考虑频繁读取的,因此写成了一个大文件,也符合hdfs的要求,可是在history server的保存中,如上的在hdfs中的一个文件被安装路径和维度被拆成了不少个json文件,也是为了在UI上便于展现。