TaskTracker获取并执行map或reduce任务的过程(一)

  咱们知道TaskTracker在默认状况下,每一个3秒就行JobTracker发送一个心跳包,也就是在这个心跳包中包含对任务的请求。JobTracker返回给TaskTracker的心跳包中包含有各类action(任务),若是有知足在此TaskTracker上执行的任务的话,该任务也就包含在心跳包的响应中。在TaskTracker端有线程专门等待map或reduce任务,并从队列中取出执行。函数

1. TaskTracker发送心跳包

  TaskTracker是做为一个单独的JVM运行的,它启动之后一直处于offerService()函数中,每隔3秒就执行一次transmitHeartBeat函数,以下所示:ui

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

  该函数具体代码为:this

  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  ......
if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); } } // // 检查是否能够接受新的任务 // boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart; }
......
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); ...... return heartbeatResponse; }

  咱们从中能够看出,TaskTracker首先建立一个TaskTrackerStatus对象,其中包含有TaskTracker的各类信息,好比,map slot的数目,reducer slot槽的数目,TaskTracker所在的主机名等信息。而后,对TaskTracker的空闲的slot以及磁盘空间进行检查,若是知足相应的条件时,最终就会经过JobClient(为JobTracker的代理)将心跳信息发送给JobTracker,并获得JobTracker的响应HeartbeatResponse。以下所示,JobClient是InterTrackerProtocol的一个实例,而JobTracker实现了InterTrackerProtocol这个接口。spa

    this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });

    那么,TaskTracker怎样经过JobTracker的代理与JobTracker进行通讯呢?它是经过RPC调用JobTracker的heartbeat(......)方法而实现的。线程

2. TaskTracker端获取任务

  TaskTracker接收到任务后,会将它们放入到相应的LinkedList中,LinkedList实现了List和Queue接口,它是基于链表实现的FIFO的队列。代理

heartbeatInterval = heartbeatResponse.getHeartbeatInterval();if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
         ......
          }
        }
  ......

  private void addToTaskQueue(LaunchTaskAction action) {
    if (action.getTask().isMapTask()) {
      mapLauncher.addToTaskQueue(action);
    } else {
      reduceLauncher.addToTaskQueue(action);
    }
    }code

 

  TaskTracker启动的时候,建立了两个线程:mapLauncher和reduceLauncher,它们分别处理map任务和reduce任务,map任务有mapLauncher负责将其放入到LinkedList中,reduce任务有reducerLauncher负责将其放入到它维护的LinkedList中。orm

  public void addToTaskQueue(LaunchTaskAction action) {
      synchronized (tasksToLaunch) {
        TaskInProgress tip = registerTask(action, this);
        tasksToLaunch.add(tip);
        tasksToLaunch.notifyAll();
      }
    }

  mapLauncher或者是reducerLauncher根据接收到的action,建立对应的TaskTracker.TaskInProgress对象,并放入到队列中,唤醒等待的线程进行处理。 以下所示,该线程负责从taskToLaunch中获取task,当有空间的slot时,执行这个task。对象

  synchronized (tasksToLaunch) {
            while (tasksToLaunch.isEmpty()) {
              tasksToLaunch.wait();
            }
            //get the TIP
            tip = tasksToLaunch.remove(0);
            task = tip.getTask();
            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
                     " which needs " + task.getNumSlotsRequired() + " slots");
          }
.....
          //获得空闲的slot后,启动这个task
          startNewTask(tip);

  这样,TaskTracker就获得了待处理的任务,具体如何执行请参考下一篇博客。blog

相关文章
相关标签/搜索