JobTracker是整个MapReduce计算框架中的主服务,至关于集群的“管理者”,负责整个集群的做业控制和资源管理。本文对JobTracker的启动过程及心跳接收与应答两个主要功能进行分析。 node
函数offerService()会启动 JobTracker内部几个比较重要的后台服务进程,分别是expireTrackersThread、retireJobsThread、 expireLaunchingTaskThread和completedJobsStoreThread。相关代码以下: 数据结构
public class JobTracker { ... ... ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, "expireLaunchingTasks"); ... ... public void offerService() throws InterruptedException, IOException { ... ... // expireTrackersThread后台服务进程。 this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); this.expireTrackersThread.start(); // retireJobsThread后台服务进程。 this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); this.retireJobsThread.start(); //expireLaunchingTaskThread后台服务进程。 expireLaunchingTaskThread.start(); // completedJobsStoreThread后台服务进程。 if (completedJobStatusStore.isActive()) { completedJobsStoreThread = new Thread(completedJobStatusStore, "completedjobsStore-housekeeper"); completedJobsStoreThread.start(); } ... ... } }
1) expireTrackersThread线程下面分别介绍这几个服务线程。 并发
该线程主要用于发现和清理死掉的 TaskTracker。每一个TaskTracker会周期性地经过心跳向JobTracker汇报信息,而JobTracker会记录每一个 TaskTracker最近的汇报心跳时间。若是某个TaskTracker在10分钟内未汇报心跳,则JobTracker认为它已死掉,并将经的相关 信息从数据结构trackToJobsToCleanup、trackerToTasksToCleanup、 trackerToMarkedTasksMap中清除,同时将正在运行的任务状态标注为KILLED_UNCLEAN。 app
2) retireJobsThread线程 框架
该线程主要用于清理长时间驻留在内存中的已经运行完成的做 业信息。JobTracker会将已经运行完成的做业信息存放到内存中,以便外部查询,但随着完成的做业愈来愈多,势必会占用JobTracker的大量 内存,为此,JobTracker经过该线程清理驻留在内存中较长时间的已经运行完成的做业信息。 函数
当一个做业知足以下条件一、2或者条件一、3时,将被从数据结构jobs转移到过时做业队列中。 高并发
条件1 做业已经运行完成,即运行状态为SUCCESSED、FAILED或KILLED。 oop
条件2 做业完成时间距如今已经超过24小时(可经过参数mapred.jobtracker.retirejob.interval配置)。 this
条件3 做业拥有者已经完成做业总数超过100(可经过参数mapred.jobtracker.completeuserjobs.maximum配置)个。 spa
过时做业被统一保存到过时队列中。当过时做业超过1000个(可经过参数mapred.job.tracker.retiredjobs.cache.size配置)时,将会从内存中完全删除。
3) expireLaunchingTaskThread线程
该线程用于发现已经被分配给某个TaskTracker但一直未汇报信息的任务。当JobTracker将某个任务分配给TaskTracker后,若是该任务在10分钟内未汇报进度,则JobTracker认为该任务分配失败,并将其状态标注为FAILED。
4) completedJobsStoreThread线程
该线程将已经运行完成的做业运行信息保存到HDFS上,并提供了一套存取这些信息的API。该线程可以解决如下两个问题。
n 用户没法获取好久以前的做业运行信息:前面提到线程retireJobsThread会清除长时间驻留在内存中的完成做业,这会致使用户没法查询好久以前某个做业的运行信息。
n JobTracker重启后做业运行信息丢失:当JobTracker因故障重启后,全部本来保存到内存中的做业信息将会所有丢失。
该线程经过保存做业运行日志的方式,使得用户能够查询任意时间提交的做业和还原做业的运行信息。
默认状况下,该线程不会启用,能够经过下表所示的几个参数配置并启用该线程。
配置参数 |
参数含义 |
mapred.job.tracker.persist.jobstatus.active |
是否启用该线程 |
mapred.job.tracker.persist.jobstatus.hours |
做业运行信息保存时间 |
mapred.job.tracker.persist.jobstatus.dir |
做业运行信息保存路径 |
在MapReduce中,JobTracker存在单点故障问题。若是它因异常退出后重启,那么全部正在运行的做业运行时信息将丢失。若是不采用适当的做业恢复机制对做业信息进行恢复,则全部做业需从新提交,且已经计算完成的任务需从新计算。这势必形成资源浪费。
为了解决JobTracker面临的单点故障问 题,Hadoop设计了做业恢复机制,过程以下:做业从提交到运行结束的整个过程当中,JobTracker会为一些关键事件记录日志(由 JobHistory类完成)。对于做业而言,关键事件包括做业提交、做业建立、做业开始运行、做业运行完成、做业运行失败、做业被杀死等;对于任务而 言,关键事件包括任务建立、任务开始运行、任务运行结束、任务运行失败、任务被杀死等。当JobTracker因故障重启后(重启过程当中,全部 TaskTracker仍然活着),若是管理员启用了做业恢复功能(将参数mapred.jobtracker.restart.recover置为 true),则JobTracker会检查是否存在须要恢复运行状态的做业,若是有,则经过日志恢复这些做业的运行状态(由 RecoveryManager类完成),并从新调度那些未运行完成的任务(包括产生部分结果的任务)。
心跳是沟通TaskTracker和JobTracker的桥梁,它其实是一个RPC函数。TaskTracker周期性地调用该函数汇报节点和任务状态信息,从而造成心跳。在Hadoop中,心跳主要有三个做用:
n 判断TaskTracker是否活着。
n 及时让JobTracker获取各个节点上的资源使用状况和任务运行状态。
n 为TaskTracker分配任务。
TaskTracker周期性地调用RPC函数heartbeat向JobTracker汇报信息和领取任务。该函数定义以下:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted,boolean initialContact,boolean acceptNewTasks, short responseId)
该函数的各个参数含义以下。
status |
该参数封装了TaskTracker上的各类状态信息。包括: String trackerName;//TaskTracker名称 String host;//TaskTracker主机名 int httpPort;//TaskTracker对外的HTTP端口号 int failures;//该TaskTracker上已经失败的任务总数 List<TaskStatus> taskReports;//正在运行的各个任务运行状态 volatile long lastSeen;//上次汇报心跳的时间 private int maxMapTasks;/*Map slot总数,即容许同时运行的Map Task总数,由参数mapred.tasktracker.map.tasks.maximum设定*/ private int maxReduceTasks;//Reduce slot总数 private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态 private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息 |
restarted |
表示TaskTracker是否刚刚从新启动。 |
initialContact |
表示TaskTracker是否初次链接JobTracker |
acceptNewTasks |
表示TaskTracker是否能够接收新任务,这一般取决于slot是否有剩余和节点健康状态等。 |
responseId |
表示心跳响应编号,用于防止重复发送心跳。每接收一次心跳后,该值加1。 |
该函数的返回值为一个HeartbeatResponse对象,该对象主要封装了JobTracker向TaskTracker下达的命令,具体以下:
class HeartbeatResponse implements Writable, Configurable { ... ... short responseId; // 心跳响应编号 int heartbeatInterval; // 下次心跳的发送间隔 TaskTrackerAction[] actions; // 来自JobTracker的命令,可能包括杀死做业等 Set<JobID> recoveredJobs = new HashSet<JobID>(); // 恢复完成的做业列表。 ... ... }
该函数的内部实现逻辑主要分为两个步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,而后根据这些状态信息和外界需求为其下达相应的命令。
函数heartbeat首先会更新TaskTracker/Job/Task的状态信息。相关代码以下:
接下来,跟踪进入函数processHeartbeat内部。该函数首先进行一系列异常状况检查,而后更新TaskTracker/Job/Task的状态信息。相关代码以下:
private synchronized boolean processHeartbeat( TaskTrackerStatus trackerStatus, boolean initialContact, long timeStamp) throws UnknownHostException { ... ... updateTaskStatuses(trackerStatus); // 更新Task状态信息 updateNodeHealthStatus(trackerStatus, timeStamp); // 更新节点健康状态 ... ... }
更新完状态信息后,JobTracker要为TaskTracker构造一个HeartbeatResponse对象做为心跳应答。该对象主要有两部份内容:下达给TaskTracker的命令和下次汇报心跳的时间间隔。下面分别对它们进行介绍:
1. 下达命令
JobTracker将下达给TaskTracker的命 令封装成TaskTrackerAction类,主要包括ReinitTrackerAction(从新初始化)、LauchTaskAction(运行 新任务)、KillTaskAction(杀死任务)、KillJobAction(杀死做业)和CommitTaskAction(提交任务)五种。下 面依次对这几个命令进行介绍。
1) ReinitTrackerAction
JobTracker收到TaskTracker发送过来 的心跳信息后,首先要进行一致性检查,若是发现异常状况,则会要求TaskTracker从新对本身进行初始化,以恢复到一致的状态。当出现如下两种不一 致状况时,JobTracker会向TaskTracker下达ReinitTrackerAction命令。
n 丢失上次心跳应答信息:JobTracker会保存向 每一个TaskTracker发送的最近心跳应答信息,若是JobTracker未刚刚重启且一个TaskTracker并不是初次链接 JobTracker(initialContact!=true),而最近的心跳应答信息丢失了,则这是一种不一致状态。
n 丢失TaskTracker状态信 息:JobTracker接收到任何一个心跳信息后,会将TaskTracker状态(封装在类TaskTrackerStatus中)信息保存起来。如 果一个TaskTracker非初次链接JobTracker但状态信息却不存在,则也是一种不一致状态。
相关代码以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); ... ... } }
2) LauchTaskAction
该类封装了TaskTracker新分配的任务。 TaskTracker接收到该命令后会启动一个子进程运行该任务。Hadoop将一个做业分解后的任务分红两大类:计算型任务和辅助型任务。其中,计算 型任务是处理实际数据的任务,包括Map Task和Reduce Task两种(对应TaskType类中的MAP和REDUCE两种类型),由专门的 任务调度器对它们进行调度;而辅助型任务则不会处理实际的数据,一般用于同步计算型任务或者清理磁盘上无用的目录,包括job-setup task、 job-cleanup task和task-cleanup task三种(对应TaskType类中的JOB_SETUP,JOB_CLEANUP和 TASK_CLEANUP三种类型),其中,job-setup task和job-cleanup task分别用做计算型任务开始运行同步标识和结束 运行同步标识,而task-cleanup task则用于清理失败的计算型任务已经写到磁盘上的部分结果,这种任务由JobTracker负责调度,且 运行优先级高于计算型任务。
若是一个正常(不在黑名单中)的TaskTracker尚 有空闲slot(acceptNewTasks为true),则JobTracker会为该TaskTracker分配新任务,任务选择顺序是:先辅助型 任务,再计算型任务。而对于辅助型任务,选择顺序依次为job-cleanup task、task-cleanup task和job- setup task,具体代码以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); // 若是没有辅助型任务,则选择计算型任务 if (tasks == null ) { // 由任务调度器选择一个或多个计算型任务 tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } // 将分配的任务封装成LauchTAskAction actions.add(new LaunchTaskAction(task)); } ... ... } ... ... }
3) KillTaskAction
该类封装了TaskTracker需杀死的任务。TaskTracker收到该命令后会杀掉对应任务、清理工做目录和释放slot。致使JobTracker向TaskTracker发送该命令的缘由有不少,主要包括如下几个场景:
n 用户使用命令“bin/hadoop job -kill-task”或者“bin/hadoop job -fail-task”杀死一个任务或者使一个任务失败。
n 启用推测执行机制后,同一份数据可能同时由两个Task Attempt处理。当其中一个Task Attempt执行成功后,另一个处理相同数据的Task Attempt将被杀掉。
n 某个做业运行失败,它的全部任务将被杀掉。
n TaskTracker在必定时间内未汇报心跳,则JobTracker认为其死掉,它上面的全部Task均被标注为死亡。
相关代码以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks to be killed List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); } ... ... } }
4) KillJobAciton
该类封装了TaskTracker待清理的做业。TaskTracker接收到该命令后,会清理做业的临时目录。致使JobTracker向TaskTracker发送该命令的缘由有不少,主要包括如下几个场景:
n 用户使用命令“”或者“”杀死一个做业或者是使一个做业失败。
n 做业运行完成,通知TaskTracker清理该做业的工做目录。
n 做业运行失败,即同一个做业失败的Task数目超过必定比例。
相关代码以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for jobs to be killed/cleanedup List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); } ... ... } }
5) CommitTaskAction
该类封装了TaskTracker需提交的任务。为了防止 同一个TaskInProgress的两个同时运行的Task Attempt(好比打开推测执行功能,一个任务可能存在备份任务)同时打开一个文件或者 往一个文件中写数据而产生冲突,Hadoop让每一个Task Attempt写到单独一个文件(以TaskAttemptID命名,好比 attempt_201412031706_0008_r_000000_0)中。一般而言,Hadoop让每一个Task Attempt成功运行完成 后,再将运算结果转移到最终目录${mapred.output.dir}中。Hadoop将一个成功运行完成的Task Attempt结果文件从临时 目录“提高”至最终目录的过程,称为“任务提交”。当TaskInProgress中一个任务被提交后,其余任务将被杀死,同时意味着该 TaskInProgress运行完成。相关代码以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks whose outputs can be saved List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); } ... ... } }
TaskTracker心跳时间间隔大小应该适度,若是过小,则JobTracker须要处理高并发的心跳链接请求,必然产生不小的并发压力;若是太大,空闲的资源不能及时汇报给JobTracker(进而为之分配新的Task),形成资源空闲,进而下降系统吞吐率。2. 调整心跳间隔
TaskTracker汇报心跳的时间间隔并非一成不变 的,它会随着集群规模的动态调整(好比节点死掉或者用户动态添加新节点)而变化,以便可以合理利用JobTracker的并发处理能力。在 Hadoop MapReduce中,只有JobTracker知道某一时刻集群的规模,所以由JobTracker为每一个TaskTracker计算下 一次汇报心跳的时间间隔,并经过心跳机制告诉TaskTracker。
JobTracker容许用户经过参数配置心跳的时间间隔 加速比,即每增长mapred.heartbeats.in.second(默认是100,最小是1)个节点,心跳时间间隔增长 mapreduce.jobtracker.heartbeats.scaling.factor(默认是1,最小是0.01)秒。同时,为了防止用户参 数设置不合理而对JobTracker产生较大负载,JobTracker要示心跳时间间隔至少为3秒。具体计算方法以下:
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean { ... ... /** * Calculates next heartbeat interval using cluster size. * Heartbeat interval is incremented by 1 second for every 100 nodes by default. * @return next heartbeat interval. */ public int getNextHeartbeatInterval() { // get the no of task trackers int clusterSize = getClusterStatus().getTaskTrackers(); int heartbeatInterval = Math.max( (int)(1000 * HEARTBEATS_SCALING_FACTOR * Math.ceil((double)clusterSize /NUM_HEARTBEATS_IN_SECOND)), HEARTBEAT_INTERVAL_MIN) ; return heartbeatInterval; } public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // calculate next heartbeat interval and put in heartbeat response int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); ... ... } ... ... }