上节,咱们加载了job的描述属性 + 正确性验证 !node
==============================================================数据库
stop in azkaban.project.DirectoryFlowLoader.resolveDependenciesjson
==============================================================post
这个也很简单,好比有个Hello.job,World.jobui
指定了world依赖Hellospa
则最后在ip
private HashMap<String, Map<String, Edge>> nodeDependencies;ci
这种结构里存的是什么呢?get
HashMap<it
String, ---World
Map<
String, ---Hello
Edge ---<Hello,World>
>
>
是否是很简单,没啥可说的!
接下来是
// Create the flows.
buildFlowsFromDependencies();
到目前为止,咱们装载了点+校验正确性+边。有了点和边,其实就有了图,有了图,咱们就能够构建Flow了
这也正是咱们下面要作的事情!
======================================================================================
细节略
======================================================================================
最后就是生成报告,只要写的没问题,都经过!
而后就是数据库的操做,这是咱们须要关心的!
---
synchronized (project) {
int newVersion = projectLoader.getLatestProjectVersion(project) + 1;
Map<String, Flow> flows = loader.getFlowMap();
for (Flow flow : flows.values()) {
flow.setProjectId(project.getId());
flow.setVersion(newVersion);
}
logger.info("Uploading file to db " + archive.getName());
projectLoader.uploadProjectFile(project, newVersion, fileType, archive.getName(), archive,
uploader.getUserId());
logger.info("Uploading flow to db " + archive.getName());
projectLoader.uploadFlows(project, newVersion, flows.values());
logger.info("Changing project versions " + archive.getName());
projectLoader.changeProjectVersion(project, newVersion, uploader.getUserId());
project.setFlows(flows);
logger.info("Uploading Job properties");
projectLoader.uploadProjectProperties(project, new ArrayList<Props>(jobProps.values()));
logger.info("Uploading Props properties");
projectLoader.uploadProjectProperties(project, propProps);
}
logger.info("Uploaded project files. Cleaning up temp files.");
projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
"Uploaded project files zip " + archive.getName());
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
file.deleteOnExit();
e.printStackTrace();
}
logger.info("Cleaning up old install files older than " + (project.getVersion() - projectVersionRetention));
projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
return reports;
}
首先是查询出最大版本号,+1最为最新值。
---
1)上传文件,分段,10M为1个单位,相关表---Project-files
2) project_versions
3)project_flows :记录根据file分析出来的拓扑图信息
好比一个可能的拓扑图以下:
json = "{"metadata":{},"project.id":9,"nodes":[{"layout":{"level":0},"propSource":null,"jobSource":"Hello.job","expectedRuntime":1,"id":"Hello","jobType":"command"},{"layout":{"level":1},"propSource":null,"jobSource":"World.job","expectedRuntime":1,"id":"World","jobType":"command"}],"edges":[{"source":"Hello","target":"World"}],"failure.email":[],"success.email":[],"id":"World","type":"flow","version":11,"mailCreator":"default","props":[],"layedout":false}"