JobTracker同时具有了资源管理和做业控制两个功能,制约了hadoop集群扩展性前端
资源利用率低,mr1采用了基于槽位的资源分配模型,槽位slot是一种细粒度的资源划分单位,一个任务task不会用完槽位对应的资源,其余任务也没法使用这些空闲资源。,hadoop将槽位分为map slit和 reduce slot,不容许他们之间共享资源。java
负责资源管理和做业控制:linux
统一资源管理和调度的平台典型表明shi yarn (yet another Resource Negotiator)apache
yarn实际上采用的是拉式通讯模型。后端
对于maptask 它的生命周期为scheduled->assigned->completedapi
对于redice task,它的生命周期为pending->scheduled-assigned->completed,数组
Yarn上运行mapreduce须要解决两个关键问题,如何肯定reduce task 启动时机以及如何完成shuffle功能。缓存
mapreduce通讯协议类关系图安全
JobSubmissionProtocol是Cient与HJobTracker之间的通讯协议,经过该协议查看做业运行状态。框架
/**
* jobName 做业id,client能够为做业得到惟一的id
* jobsubmitdir为做业文件所在的目录,hdfs上的一个目录,ts是该做业分哦诶到的秘钥或者是安全令牌
*
* @author user
*
*/
public interface JobClient {
// 做业提交
public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException;
// 做业控制
// 修改做业优先级setJobPriority函数;杀死一个做业killJob,杀死一个任务killTask
// 查看系统状态和做业运行状态
// 当前集群状态slot总数,全部正在运行的task数目
public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
// 得到某个做业的运行状态
public JobStatus getJobStatus(JobID jobid) throws IOException;
//得到全部做业运行状态
public JobStatus[] getAllJobs() throws IOException;
}
是taskTracker和JobTracker之间的通讯协议,向jobTracker汇报所在节点的资源使用状况和任务的运行状况,接收并执行jobTracker发送的命令。
heartbeat周期性地被调用,造成了jobTracker和TaskTracker之间的心跳
//输入TaskTrackerStatus封装所在节点资源使用状况和任务的运行状况
//输出HeartbeatResponse包含一个TaskTrakcerAtion类型的数组,包含了jobtracker向taskTracker传达的各类命令
HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact,boolean acceptNewTasks, boolean acceptNewTasks, short responseId) throws IOException;
TaskUmbilicalProtocol 通讯协议
Task和taskTracker之间的通讯协议,经过该协议汇报本身的运行状态或者出错信息。
//周期性调用方法
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext jbmContext) throws Exception;
boolean ping(TaskAttemptID taskId, JvmContext jbmContext) throws IOException;
//按需调用方法
//task初始化,收到启动命令,LaunchTaskAction,子程序去调用getTask()方法领取对应的task.
//task运行中 reportDiagnosticInfo、fsError/fatalError分别汇报出现的Exception/FSEror/Throwble异常和错误,对于ReduceTask提供shuffleError汇报shuffle阶段出现的 错误
//reprotNextRecordRange getMapCompletionEvent从TaskTracker得到一已经完成的map task列表
//task运行完成 commitPending,canCommit ,done
hadoop白名单和黑名单, bin/hadoop mradmin –refreshNodes
步骤:
1 用户提交做业
2 将做业配置信息,将运行须要的文件上传到jobTracker文件系统中
3JobClient调用RPC接口向JobTracker提交做业
4收到做业后后告知TakScheduler,对做业进行初始化、
DisbutedCache负责在hdfs文件系统中文件的上传和下载。
jobClient中文件的split:
InputSplit org.apache.hadoop.maperduce.split
JobSplit ,JobSplitWriter
SplitMetaInfoReader
做业提交到JobTracker
JobClient调用rpc方法submitJob将祖业提交到JobTracker端,JobTracker的submitJon中,会有如下操做:
1为每一个做业建立jobInProgress对象
2 检查用户是否有指定队列的做业提交权限
3 做业配置的内存使用量是否合理 map task,reduce task使用的内存配置
4.通知TaskScheduler初始化做业,按必定的策略去初始化做业
// 为每一个做业建立jobInProgress对象,在做业运行时一直存在,跟踪正在运行做业的运行状态和进度
private ConcurrentHashMap<String, Object> jobs;
private Schedulable taskScheduler;
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws Exception {
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for(JobInProgressListener listener:jobInProgressListeners){
listener.jobAdded(job);
}
}
}
}
public synchronized void start() throws IOException{
super.start();
//do otherthing
}
public static JobTracker startTracker(JobConf conf,String identifier) throws Exception{
//do something
result=new JobTracker(conf,indetifier);
result.taskScheduler.setTaskTackerManager(result);
//do something
}
taskScheduler调度器远程调用JobTracker中的initJob()对新做业进行初始化,主要工做是构造mao task和reduce task 并对他们进行初始化。
TaskInProgress对象
setupTask
map task
reduce task: map task 达到必定的数目才开始reduce task
cleanup task 删除运行过程当中的一些临时目录(好比说_temporay目录),一旦该任务运行成功后,做业由Running变为succeeded状态
每个做业运行时都会占用一个slot
Hadoop DistributedCache分发文档到taskTracker节点org.apache.hadoop.filecache
void addCacheArchive(URI uri, Configuration conf);
void setCacheArchives(URI[] archives, Configuration conf);
void addCacheFile(URI uri, Configuration conf);
void setCacheFile(URI[] files, Configuration conf);
其中的一些方法
小结:做业的提交和初始化,涉及到三个重要的组件,JobClinet,jobTracker,TaskScheduler
JobInProgress TaskInProgress,完成后表示整个做业就完成了。
JObTarcker启动过程:
ACLsManager类,权限管理类 队列权限 和做业权限
其中的各类线程
expireTracjersThread 清理
retireJonsThread
expireLaunchingTaskThread
completedJonsStireThread
JobTracker为关键事件记录日志,包括做业提交,做业建立,做业开始运行,做业运行完成,做业运行失败,做业被杀死,经过日志恢复这些做业的运行状态。
JoobTracker和TaskTracker之间是拉模型,
心跳:判断taskTracker是否活着
及时让JObTracker得到各个节点上的资源使用状况和任务运行状态
为TaskTracjer分配任务
jobTracker会赋予每一个做业一个惟一的ID,id由三部分组成:做业前缀字符串,JobTracker启动时间和做业提交顺序,各部分经过+_+链接起来组成一个完整的做业ID :job ,201208071506,009(jobTracker运行以来的第9个做业)。
jobinprogress 做业静态信息,已经肯定好的属性信息,
做业动态信息
taskinprogress
taskAttempt
jobTracker容错机制
IFile存储格式:支持行压缩的存储格式
排序:map task 和reduce task都会有排序
map task 中会将结果暂时放到缓存区中,是环形的,当缓存区使用率达到必定的阈值后,再对缓存区的数据进行一次排序,将有序的数据以IFile文件的形式写到磁盘,完成后,会将全部文件进行一次合并,成为一个有序的大文件。
Reduce task 从远程每一个maptask拷贝相应的数据,先放到内存,达到阈值后再合并生成更大的文件,若是内存中文件大小和数目超过必定的阈值,就会将数据写到磁盘,当全部数据拷贝完成后,对全部数据进行一次合并。
java和hadoop中计数器的实现
基于枚举类型和计数器类型的计数器api
public abstract void incrCounter(Enum<?> key,long amount);
public abstract void incrCounter(String group,String counter,long amount);
HadoopPipes::TaskContext::Counter*mapCounter;//定义
mapCOunter=context.getCounter(“counterGroup”,”mapCounter”)//注册
cotnext.incrementCounter(mapCounter,1);//使用
单向缓存区,双向缓存区,环形缓存区
key是排序的关键字,一般须要交给RawConparator排序,要求排序关键字在内存咋红必须连续存储
经过内存复制解决不连续的问题,复制到前端,复制到后端,可能key或者value太大,以致于整个缓存区都不能容纳它,抛出异常,并将该记录单独输出到一个文件中
是有SpillThread线程完成。是kvbuffer的消费者
spillLock.lock();
while(true){
spollDone.signal();
while(kvstart=kvend){
spillReady.await();
}
spillLock.unlock();
sortAndSpill();
spillLock.lock();
if(bufend<bufindex&&bufindex<bufstart){
bufvoid=kvbuffer.length;
}
vstart=kvend;
bufstart=bufend;
}
spiilLock.unlock();
当全部数据完成拷贝后,再对全部数据进行一次排序,并将key相同的记录分组依次交给reduce()程序处理。
从几个角度来看:
管理员角度:
硬件
linux操做系统参数调优
JVM参数调优
从用户角度进行调优
合理使用DIstributedCache
当应用程序须要使用外部文件时,获得外部文件的方法有两种,一种是与jar包一块儿放到客户端,看成业提交时传到hdfs的某个目录下,而后经过DistributedCache分发到各个节点上,另外一个方法是将外部文件直接放到hdfs上,第二种方法更高效。
跳过坏记录
提升做业优先级