接着上一篇来讲执行入口的分析,CliDriver
最终将用户指令command
提交给了Driver
的run
方法(针对经常使用查询语句而言),在这里用户的command
将会被编译,优化并生成MapReduce任务进行执行。因此Driver
也是Hive的核心,他扮演了一个将用户查询和MapReduce Task转换并执行的角色,下面咱们就看看Hive是如何一步一步操做的。node
在说run
方法以前,因为CliDriver
须要获得一个Driver
类的实例,因此首先看一下Driver
的构造方法。Driver
有三个构造函数,主要功能也就是设置类的实例变量HiveConf
。SessionState
前文已经有介绍,SessionState
返回了当前会话的一些信息,提取配置文件,初始化Driver
实例。apache
public Driver() { if (SessionState.get() != null) { conf = SessionState.get().getConf(); } }
下面就开始解析Driver
内部对用户命令command
的处理流程,首先是入口函数run
. run
函数经过调用runInternal
方法处理用户指令,在处理完成runInternal
以后,若是执行过程当中出现出错,还附加了对错误码和错误信息的处理,此处省略。segmentfault
public CommandProcessorResponse run(String command) throws CommandNeedRetryException { return run(command, false); } public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); ... }
runInternal
方法包含的主要操做有,处理preRunHook
(具体功能能够顾名思义哦),compile
, execute
, 处理postRunHook
以及构造CommandProcessorResponse
并返回。下面依次从代码的角度分析这几步的具体操做:并发
处理preRunHook
,首先根据配置文件和指令,构造用户Hook执行的上下文hookContext
,而后读取用户PreRunHook
配置指定的类(字符串), 此配置项对应于Hive配置文件当中的“hive.exec.driver.run.hooks”
一项,利用反射机制Class.forName
实例化PreRunHook
类实例(getHook
函数完成),依次执行各钩子的功能(preDriverRun
函数完成)。app
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf, command); // Get all the driver run hooks and pre-execute them. List<HiveDriverRunHook> driverRunHooks; try{ driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class); for (HiveDriverRunHook driverRunHook : driverRunHooks) { driverRunHook.preDriverRun(hookContext); } }catch (Exception e) { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = ErrorMsg.findSQLState(e.getMessage()); downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return createProcessorResponse(12); }
编译,直接调用complieInternal
函数编译用户指令,将指令翻译成MapReduce
任务。这一个过程涉及的内容比较多,也很重要,后面将单独用一篇文章说明编译优化的过程。这里借用网上的一幅图,帮助对compile
的功能有个总体的理解,参考文献: Hive实现原理.pdf。ide
在运行以前还有获取锁的操做,因为新版本添加了ACID
事务的支持,还设置了事务管理器等,目前还没详细的弄懂这块的处理逻辑和功能,先放一下,主要看下execute
函数执行了什么操做,也就是如何根据编译结果执行任务的。函数
首先是从编译获得的查询计划QueryPlan
里获取基本的查询ID,查询字串等信息,并在回话状态中把当前查询字串和查询计划插入到历史记录中。oop
String queryId = plan.getQueryId(); String queryStr = plan.getQueryStr(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().startQuery(queryStr, conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); SessionState.get().getHiveHistory().logPlanProgress(plan); }
与PreRunHook
相似,在执行任务以前,检查并执行用户设定的"hive.pre.exec.hooks"
,此处再也不详述。完成这部操做以后,向控制台简单的打印一些信息以后,就开始正式执行任务了。源码分析
DriverContextpost
建立执行上下文DriverContext,它记录的信息主要包括可执行的任务队列(Queue<Task> runnable), 正在运行的任务队列(Queue<TaskRunner> running), 当前启动的任务数curJobNo, statsTasks(Map<String, StatsTask>, what used for?)以及语义分析Semantic Analyzers依赖的Context对象等。
DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); public DriverContext(Context ctx) { this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>(); this.running = new LinkedBlockingQueue<TaskRunner>(); this.ctx = ctx; } public void prepare(QueryPlan plan) { // extract stats keys from StatsTask List<Task<?>> rootTasks = plan.getRootTasks(); NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>() { public void apply(StatsTask statsTask) { statsTasks.put(statsTask.getWork().getAggKey(), statsTask); } }); }
顺便提一下Context
对象,在Context
的源码注释当中提到, 每个查询都要对应一个Context
对象,不一样查询之间Context
对象是不可重用的, 执行完一个查询以后须要clear
对应的Context
对象(主要是语法分析用到的temp
文件目录),在Hive的实现中也是这么作的。回顾上一篇文章,从CliDriver
循环的读取用户指令,每读取到一条指令都要进行processLine
,processCmd
,processLocalCmd
的处理,而后提交给Driver
编译解析。Context
对象是在compile
函数中实例化的,也就说每一条查询都会建立一个Context
对象,当执行完一条查询从Driver
返回到processLocalCmd
中时,都会调用Driver
对象的close
函数对Context
进行清理(ctx.clear
),这样就保证了一条查询对应一个Context
对象。对于DriverContext
对象也是相似,在execute
函数中实例化,Driver
的close
函数中关闭(driverCtx.shutdown
),和Context
相比一个用来辅助语义分析,一个用来辅助任务执行。还有,咱们发如今processCmd
函数中经过CommandProcessorFactory
设置了Driver
类的实例对象,也就是每一条查询都须要一个Driver
对象进行处理,那这些Driver
对象之间是否能够共享呢?答案是确定的,在CommandProcessorFactory
中维持了一个HiveConf
到Driver
的Map,每次获取Driver
对象时都是根据conf对象来查找到的,若是不存在才从新建立一个Driver
对象,而HiveConf
对象又是在CliDriver
的run
方法中实例化的,与一个CliSessionState
对应,因此Driver
实例应该是与一个Cli的会话对应,同一个会话内部的查询共享一个Driver
实例。
Manage and run all tasks
扯得有点远,继续看Driver
对查询任务的执行,在实例化DriverContext
对象以后,就将查询计划plan中的任务放入到DriverContext
的runnable
队列中。
for (Task<? extends Serializable> tsk : plan.getRootTasks()) { assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); driverCxt.addToRunnable(tsk); }
下面就开始运行任务Task,整个任务的运行由一个循环控制,只要DriverContext
没有被关闭,而且runnable
和running
队列中还有任务就一直循环。为了方便描述,下文将一次对任务循环过程的每一步进行说明,这里只给出循环判断条件。
while (!destroyed && driverCxt.isRunning()) {} public synchronized boolean isRunning() { return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); }
1. Put all the tasks into runnable queue
在循环内部,首先不停的从runnable
队列中抽取队首的任务,而后launch
该任务。
while (!destroyed && driverCxt.isRunning()) { // Launch upto maxthreads tasks Task<? extends Serializable> task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; } }
2. Launch a task
在launch一个任务的过程当中,根据任务类型(是否是MapReduceTask或者ConditialTask),作一些操做(don't know what used for),将DriverContext
当前已启动任务数curJobNo
加1,而后根据配置文件conf,查询计划plan,执行上下文cxt(DriverContext
),初始化一个任务,接着建立任务结果TaskResult
对象和任务执行对象TaskRunner
,将TaskRunner
放入DriverContext
的running
队列中,表示该任务正在运行。最后,根据配置文件指定的任务运行模式,便是否支持并行运行,启动任务。
private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName, String jobname, int jobs, DriverContext cxt) throws HiveException { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); } if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { if (noName) { conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")"); } conf.set("mapreduce.workflow.node.name", tsk.getId()); Utilities.setWorkflowAdjacencies(conf, plan); cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); } tsk.initialize(conf, plan, cxt); TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); cxt.launching(tskRun); // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && (tsk.isMapRedTask() || (tsk instanceof MoveTask))) { // Launch it in the parallel mode, as a separate thread only for MR tasks //并发执行 if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); } tskRun.setOperationLog(OperationLog.getCurrentOperationLog()); tskRun.start(); } else { if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in serial mode"); } //顺序执行 tskRun.runSequential(); } return tskRun; }
3. Poll a finished task
完成任务的启动以后,将调用DriverContext
的pollFinished
函数,查看任务是否执行完毕,若是有任务完成,则将该任务出队,并将已完成的任务添加到钩子上下文HookContext
中。
TaskRunner tskRun = driverCxt.pollFinished(); if (tskRun == null) { continue; } hookContext.addCompleteTask(tskRun); public synchronized TaskRunner pollFinished() throws InterruptedException { while (!shutdown) { Iterator<TaskRunner> it = running.iterator(); while (it.hasNext()) { TaskRunner runner = it.next(); if (runner != null && !runner.isRunning()) { it.remove(); return runner; } } wait(SLEEP_TIME); } return null; }
4. Handle the finished task
针对一个已完成的任务,首先获取任务的结果对象TaskResult
和退出状态, 若是任务非正常退出,则第一步先判断任务是否支持Retry
,若是支持,关闭当前DriverContext
,设置jobTracker
为初始状态,抛出CommandNeedRetry
异常,这个异常会在CliDriver
的processLocalCmd
中捕获,而后尝试从新处理该命令,参见上一篇文章的说明。若是任务不支持Retry
,则启动备份任务backupTask
(相似于回滚?),并添加到runnable
队列,在下次循环过程当中执行。若是没有backupTask
,则查找用户配置“hive.exec.failure.hooks”
,根据用户配置相应出错处理,并关闭DriverContext
, 返回退出码。
Task<? extends Serializable> tsk = tskRun.getTask(); TaskResult result = tskRun.getTaskResult(); int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); throw new CommandNeedRetryException(); } Task<? extends Serializable> backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); console.printError(errorMessage); // add backup task to runnable if (DriverContext.isLaunchable(backupTask)) { driverCxt.addToRunnable(backupTask); } continue; } else { hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); // Get all the failure execution hooks and execute them. for (Hook ofh : getHooks(HiveConf.ConfVars.ONFAILUREHOOKS)) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); ((ExecuteWithHookContext) ofh).run(hookContext); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); } setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); SQLState = "08S01"; console.printError(errorMessage); driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); return exitVal; } }
5. Find children tasks
最后调用DriverContext
的finished
函数,对完成的任务进行处理(处理逻辑没看懂), 而后判断当前任务是否包含子任务,若是包含则依次将子任务添加到runnable
队列,下次循环中被启动执行。
driverCxt.finished(tskRun); if (tsk.getChildTasks() != for (Task<? extends Serializable> child : tsk.getChildTasks()) { if (DriverContext.isLaunchable(child)) { driverCxt.addToRunnable(child); } } }
6. Do something before return
当全部的任务都完成以后,若是发现DriverContext
已经被关闭,代表任务取消,打印信息并返回对应的状态码。最后清楚任务执行中不完整的输出,并加载执行用户指定的"hive.exec.post.hooks"
,完成对应的钩子功能。对于执行过程当中出现的异常,CommandNeedRetryException
将会直接向上抛出,其余Exception
,直接打印出错信息。不管是否发生异常,只要可以获取到任务执行过程当中的MapReduce状态信息,都将在finally语句块中打印。(限于篇幅,此处只给出部分代码,钩子的处理方式前文已经给出再也不详述,异常处理的部分,有兴趣的执行查看)
//判断DriverContext是否被关闭 if (driverCxt.isShutdown()) { SQLState = "HY008"; errorMessage = "FAILED: Operation cancelled"; console.printError(errorMessage); return 1000; } //删除不完整的输出 HashSet<WriteEntity> remOutputs = new HashSet<WriteEntity>(); for (WriteEntity output : plan.getOutputs()) { if (!output.isComplete()) { remOutputs.add(output); } } for (WriteEntity output : remOutputs) { plan.getOutputs().remove(output); }
最后的最后,若是全部的任务都正常执行完毕,这次查询完成,plan.setDone(),打印OK~
还没完~当execute
函数执行完成后,返回到runInternal
函数中,接着释放锁,与以前的PreRunHook
相对应,还须要加载相应用户自定义的PostRunHook
(代码再也不重复),最后才调用creatProcessorResponse
,建立响应对象CommandProcessorResponse
并返回。
private CommandProcessorResponse createProcessorResponse(int ret) { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); }
想更一进步的支持我,请扫描下方的二维码,你懂的~