hadoop mapreduce过程分析学习

mapreduce框架学习

第一代mapreduce局限性

扩展性差:

JobTracker同时具有了资源管理和做业控制两个功能,制约了hadoop集群扩展性前端

资源利用率低,mr1采用了基于槽位的资源分配模型,槽位slot是一种细粒度的资源划分单位,一个任务task不会用完槽位对应的资源,其余任务也没法使用这些空闲资源。,hadoop将槽位分为map slit和 reduce slot,不容许他们之间共享资源。java

不支持多种计算框架,包括内存计算框架,流式框架,迭代计算框架

 

 

JobTracker不够灵活,

 

负责资源管理和做业控制:linux

 

统一资源管理和调度的平台典型表明shi yarn (yet another Resource Negotiator)apache

 

RM和AM,

 

yarn实际上采用的是拉式通讯模型。后端

对于maptask 它的生命周期为scheduled->assigned->completedapi

对于redice task,它的生命周期为pending->scheduled-assigned->completed,数组

 

Yarn上运行mapreduce须要解决两个关键问题,如何肯定reduce task 启动时机以及如何完成shuffle功能。缓存

 

 

 

 

 

 

 

 

 

 

mapreduce通讯协议类关系图安全

 

 

 

JobSubmissionProtocol是Cient与HJobTracker之间的通讯协议,经过该协议查看做业运行状态。框架

/**

 * jobName 做业id,client能够为做业得到惟一的id

 * jobsubmitdir为做业文件所在的目录,hdfs上的一个目录,ts是该做业分哦诶到的秘钥或者是安全令牌

 *

 * @author user

 *

 */

public interface JobClient {

         // 做业提交

         public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException;

 

         // 做业控制

         // 修改做业优先级setJobPriority函数;杀死一个做业killJob,杀死一个任务killTask

         // 查看系统状态和做业运行状态

         // 当前集群状态slot总数,全部正在运行的task数目

         public ClusterStatus getClusterStatus(boolean detailed) throws IOException;

 

         // 得到某个做业的运行状态

         public JobStatus getJobStatus(JobID jobid) throws IOException;

         //得到全部做业运行状态

         public JobStatus[] getAllJobs() throws IOException;

 

}

 

InterTarckerProtocol通讯协议

是taskTracker和JobTracker之间的通讯协议,向jobTracker汇报所在节点的资源使用状况和任务的运行状况,接收并执行jobTracker发送的命令。

heartbeat周期性地被调用,造成了jobTracker和TaskTracker之间的心跳

//输入TaskTrackerStatus封装所在节点资源使用状况和任务的运行状况

         //输出HeartbeatResponse包含一个TaskTrakcerAtion类型的数组,包含了jobtracker向taskTracker传达的各类命令

         HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact,boolean acceptNewTasks, boolean acceptNewTasks, short responseId) throws IOException;

 

TaskUmbilicalProtocol 通讯协议

Task和taskTracker之间的通讯协议,经过该协议汇报本身的运行状态或者出错信息。

 

//周期性调用方法

         boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext jbmContext) throws Exception;

 

         boolean ping(TaskAttemptID taskId, JvmContext jbmContext) throws IOException;

         //按需调用方法

        

         //task初始化,收到启动命令,LaunchTaskAction,子程序去调用getTask()方法领取对应的task.

        

        

         //task运行中 reportDiagnosticInfo、fsError/fatalError分别汇报出现的Exception/FSEror/Throwble异常和错误,对于ReduceTask提供shuffleError汇报shuffle阶段出现的 错误

         //reprotNextRecordRange getMapCompletionEvent从TaskTracker得到一已经完成的map task列表

         //task运行完成 commitPending,canCommit ,done

hadoop白名单和黑名单, bin/hadoop mradmin –refreshNodes

 

 

 

 

做业提交和初始化

步骤:

1 用户提交做业

2 将做业配置信息,将运行须要的文件上传到jobTracker文件系统中

3JobClient调用RPC接口向JobTracker提交做业

4收到做业后后告知TakScheduler,对做业进行初始化、

 

 

 

DisbutedCache负责在hdfs文件系统中文件的上传和下载。

 

jobClient中文件的split:

InputSplit org.apache.hadoop.maperduce.split

 JobSplit ,JobSplitWriter

 SplitMetaInfoReader

 

做业提交到JobTracker

 

 

JobClient调用rpc方法submitJob将祖业提交到JobTracker端,JobTracker的submitJon中,会有如下操做:

1为每一个做业建立jobInProgress对象

2 检查用户是否有指定队列的做业提交权限

3 做业配置的内存使用量是否合理  map task,reduce task使用的内存配置

4.通知TaskScheduler初始化做业,按必定的策略去初始化做业

 

 

// 为每一个做业建立jobInProgress对象,在做业运行时一直存在,跟踪正在运行做业的运行状态和进度

         private ConcurrentHashMap<String, Object> jobs;

         private Schedulable taskScheduler;

 

         private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws Exception {

                   synchronized (jobs) {

                            synchronized (taskScheduler) {

                                     jobs.put(job.getProfile().getJobID(), job);

                                     for(JobInProgressListener listener:jobInProgressListeners){

                                               listener.jobAdded(job);

                                     }

                            }

                   }

         }

         public synchronized void start() throws IOException{

                   super.start();

                   //do otherthing

                  

         }

         public static JobTracker startTracker(JobConf conf,String identifier) throws Exception{

                   //do something

                   result=new JobTracker(conf,indetifier);

                   result.taskScheduler.setTaskTackerManager(result);

                   //do something

         }

 

做业初始化过程详解

taskScheduler调度器远程调用JobTracker中的initJob()对新做业进行初始化,主要工做是构造mao task和reduce task 并对他们进行初始化。

 

TaskInProgress对象

setupTask

map task

reduce task: map task 达到必定的数目才开始reduce task

cleanup task 删除运行过程当中的一些临时目录(好比说_temporay目录),一旦该任务运行成功后,做业由Running变为succeeded状态

每个做业运行时都会占用一个slot

 

Hadoop DistributedCache分发文档到taskTracker节点org.apache.hadoop.filecache

         void addCacheArchive(URI uri, Configuration conf);

 

         void setCacheArchives(URI[] archives, Configuration conf);

 

         void addCacheFile(URI uri, Configuration conf);

        

         void setCacheFile(URI[] files, Configuration conf);

其中的一些方法

 

 

小结:做业的提交和初始化,涉及到三个重要的组件,JobClinet,jobTracker,TaskScheduler

 

 

JobTracker内部实现

 

 

JobInProgress TaskInProgress,完成后表示整个做业就完成了。

JObTarcker启动过程:

 

ACLsManager类,权限管理类 队列权限 和做业权限

 

其中的各类线程

expireTracjersThread 清理

retireJonsThread

expireLaunchingTaskThread

completedJonsStireThread

JobTracker为关键事件记录日志,包括做业提交,做业建立,做业开始运行,做业运行完成,做业运行失败,做业被杀死,经过日志恢复这些做业的运行状态。

 

JoobTracker和TaskTracker之间是拉模型,

心跳:判断taskTracker是否活着

及时让JObTracker得到各个节点上的资源使用状况和任务运行状态

为TaskTracjer分配任务

jobTracker会赋予每一个做业一个惟一的ID,id由三部分组成:做业前缀字符串,JobTracker启动时间和做业提交顺序,各部分经过+_+链接起来组成一个完整的做业ID :job ,201208071506,009(jobTracker运行以来的第9个做业)。

 

jobinprogress 做业静态信息,已经肯定好的属性信息,

做业动态信息

 

 taskinprogress

 

 taskAttempt

 

 

 

 

 

 

 

 

jobTracker容错机制

 

 

Task运行过程分析

IFile存储格式:支持行压缩的存储格式

排序:map task 和reduce task都会有排序

map task 中会将结果暂时放到缓存区中,是环形的,当缓存区使用率达到必定的阈值后,再对缓存区的数据进行一次排序,将有序的数据以IFile文件的形式写到磁盘,完成后,会将全部文件进行一次合并,成为一个有序的大文件。

Reduce task 从远程每一个maptask拷贝相应的数据,先放到内存,达到阈值后再合并生成更大的文件,若是内存中文件大小和数目超过必定的阈值,就会将数据写到磁盘,当全部数据拷贝完成后,对全部数据进行一次合并。

 

 

 

 

计数器

java和hadoop中计数器的实现

基于枚举类型和计数器类型的计数器api

public abstract void incrCounter(Enum<?> key,long amount);

public abstract void incrCounter(String group,String counter,long amount);

HadoopPipes::TaskContext::Counter*mapCounter;//定义

mapCOunter=context.getCounter(“counterGroup”,”mapCounter”)//注册

cotnext.incrementCounter(mapCounter,1);//使用

MapTask

单向缓存区,双向缓存区,环形缓存区

key是排序的关键字,一般须要交给RawConparator排序,要求排序关键字在内存咋红必须连续存储

经过内存复制解决不连续的问题,复制到前端,复制到后端,可能key或者value太大,以致于整个缓存区都不能容纳它,抛出异常,并将该记录单独输出到一个文件中

Spill溢写过程

是有SpillThread线程完成。是kvbuffer的消费者

 

spillLock.lock();

         while(true){

                   spollDone.signal();

                   while(kvstart=kvend){

                            spillReady.await();

                   }

                   spillLock.unlock();

                   sortAndSpill();

                   spillLock.lock();

                   if(bufend<bufindex&&bufindex<bufstart){

                            bufvoid=kvbuffer.length;

                   }

                   vstart=kvend;

                   bufstart=bufend;

                 

         }

         spiilLock.unlock();

 

当全部数据完成拷贝后,再对全部数据进行一次排序,并将key相同的记录分组依次交给reduce()程序处理。

Hadoop性能调优

从几个角度来看:

管理员角度:

硬件

linux操做系统参数调优

JVM参数调优

 

从用户角度进行调优

合理使用DIstributedCache

当应用程序须要使用外部文件时,获得外部文件的方法有两种,一种是与jar包一块儿放到客户端,看成业提交时传到hdfs的某个目录下,而后经过DistributedCache分发到各个节点上,另外一个方法是将外部文件直接放到hdfs上,第二种方法更高效。

跳过坏记录

提升做业优先级

 

 

HADOOP安全机制

sasl

相关文章
相关标签/搜索