TaskTracker执行map或reduce任务的过程(二)

  上次说到,当MapLauncher或ReduceLancher(用于执行任务的线程,它们扩展自TaskLauncher),从它们所维护的LinkedList也即队列中获取到TaskInProgress,而且TaskTracker有空闲的slot时,该线程就调用了TaskTracker的startNewTask(tip)方法,以下所示: spa

 public void run() {
      while (!Thread.interrupted()) {
        try {
          TaskInProgress tip;
          Task task;
          synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();//当队列为空时呗阻塞,知道有新的tip到来才会被唤醒
            }
            //get the TIP
            tip = tasksToLaunch.remove(0);
            task = tip.getTask();
      ......//当有空闲的slot时执行启动一个任务
          startNewTask(tip);
      ......
      }
    }

  接下了来就让咱们看下startNewTask(tip)的神秘面纱吧,因为在其内部经过实习Runnable建立了一个线程,咱们只需分析线程体的run方法便可,关键代码以下,为便于说明,给3个核心语句分别标识为**1,**2:
线程

public void run() {
        try {
          RunningJob rjob = localizeJob(tip);        //**1
          tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); 
          // task本地化已经完成,此刻若是rjob.jobConf或者rjob.ugi为空的话,会抛出异常
      launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2
......

} }

  **1的源码以下,code

    Task t = tip.getTask();
    JobID jobId = t.getJobID();
    RunningJob rjob = addTaskToJob(jobId, tip);
    InetSocketAddress ttAddr = getTaskTrackerReportAddress();

  从中咱们能够看出,首先建立了一个该任务所属的RunningJob,并把它放入到一个该TaskTracker所维护的TreeMap<jobId,RunningJob>中,同时在RunningJob中记录将要执行的task,也即把tip放入到RunningJob.tasks(一个HashSet<TaskInProgress>)中。由此,咱们能够知道,每一个TaskTracker都维护者一个TreeMap用以记录它正在执行的哪一个做业的哪些任务(map、reduce任务)。xml

  接下来localizeJob(tip)要作的就是调用initializeJob(t, rjob, ttAddr)初始化工做目录,并下载相应的job.xml以及job.jar(TaskController负责)文件,TaskController最后调用RunJar.unJar()将包解压到相应的工做目录,,至此初始化工做完成,调用launchTaskForJob开始执行Task。blog

  **2的核心代码为:队列

 protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException {
    synchronized (tip) {
      jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
                  localStorage.getDirsString());
      tip.setJobConf(jobConf);
      tip.setUGI(rjob.ugi);
      tip.launchTask(rjob);
    }
  }

  由此看出,它主要是调用TaskTracker.TaskInProgress的launchTask()方法,在该方法中它建立了一个TaskRunner线程,并启这个线程执行这个task,其run方法核心代码以下:进程

public final void run() {
    //设置工做目录
final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString());
......

// 设置环境变量 List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager); .......

    //启动Task子进程 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); } }

  未完待续...... ip

相关文章
相关标签/搜索