Giraph 源码分析(五)—— 加载数据+同步总结

做者|白松node

关于Giraph 共有九个章节,本文第五个章节。app

环境:在单机上(机器名:giraphx)启动了2个workers。ide

输入:SSSP文件夹,里面有1.txt和2.txt两个文件。工具

一、在Worker向Master汇报健康情况后,就开始等待Master建立InputSplit。oop

方法:每一个Worker经过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就经过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master建立该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码以下:源码分析

Giraph 源码分析(五)—— 加载数据+同步总结
二、Master调用createInputSplits()方法建立InputSplit。线程

Giraph 源码分析(五)—— 加载数据+同步总结

在generateInputSplits()方法中,根据用户设定的VertexInputFormat得到InputSplits。代码以下:3d

Giraph 源码分析(五)—— 加载数据+同步总结

其中minSplitCountHint为建立split的最小数目,其值以下:orm

minSplitCountHint = Workers数目 * NUM_INPUT_THREADS对象

NUM_INPUT_THREADS表示 每一个Input split loading的线程数目,默认值为1 。 经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。

若是获得的splits.size小于minSplitCountHint,那么有些worker就没被用上。

获得split信息后,要把这些信息写到Zookeeper上,以便其余workers访问。上面获得的split信息以下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍历splits List,为每一个split建立一个Znode,值为split的信息。如为split-0建立Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

为split-1建立znode(以下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最后建立znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示全部splits都建立好了。

三、Master根据splits建立Partitions。首先肯定partition的数目。

Giraph 源码分析(五)—— 加载数据+同步总结

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法以下:

Giraph 源码分析(五)—— 加载数据+同步总结

上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式以下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。

可见,partitionCount值为4(122)。建立的partitionOwnerList信息以下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

四、Master建立Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。

五、Master最后在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(做为Znode的data),该Znode的路径为: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master调用barrierOnWorkerList()方法开始等待各个Worker完成数据加载。调用关系以下:

Giraph 源码分析(五)—— 加载数据+同步总结

barrierOnWorkerList中建立znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。而后检查该znode的子节点数目是否等于workers的数目,若不等于,则线程陷入等待状态。后面某个worker完成数据加载后,会建立子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)来激活该线程继续判断。

六、当Master建立第5步的znode后,会激活worker。

每一个worker从znode上读出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,而后各个worker开始加载数据。

把partitionOwnerList复制给BSPServiceWorker类中的workerGraphPartitioner(默认为HashWorkerPartitioner类型)对象的partitionOwnerList变量,后续每一个顶点把根据vertexID经过workerGraphPartitioner对象获取其对应的partitionOwner。

Giraph 源码分析(五)—— 加载数据+同步总结

每一个Worker从znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir获取子节点,获得inputSplitPathList,内容以下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

而后每一个Worker建立N个InputsCallable线程读取数据。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默认值为1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那么,默认每一个worker就是建立一个线程来加载数据。

在InputSplitsHandler类中的reserveInputSplit()方法中,每一个worker都是遍历inputSplitPathList,经过建立znode来保留(标识要处理)的split。代码及注释以下:

Giraph 源码分析(五)—— 加载数据+同步总结

当用reserveInputSplit()方法获取某个znode后,loadSplitsCallable类的loadInputSplit方法就开始经过该znode获取其HDFS的路径信息,而后读入数据、重分布数据。

Giraph 源码分析(五)—— 加载数据+同步总结

Giraph 源码分析(五)—— 加载数据+同步总结

VertexInputSplitsCallable类的readInputSplit()方法以下:

Giraph 源码分析(五)—— 加载数据+同步总结

七、每一个worker加载完数据后,调用waitForOtherWorkers()方法等待其余workers都处理完split。

Giraph 源码分析(五)—— 加载数据+同步总结

策略以下,每一个worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目录下建立子节点,后面追加本身的worker信息,如worker一、worker2建立的子节点分别以下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

建立完后,而后等待master建立/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

八、从第5步骤可知,若master发现/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子节点数目等于workers的总数目,就会在coordinateInputSplits()方法中建立

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告诉每一个worker,全部的worker都处理完了split。

九、最后就是就行全局同步。

master建立znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,而后再调用barrierOnWorkerList方法检查该znode的子节点数目是否等于workers的数目,若不等于,则线程陷入等待状态。等待worker建立子节点来激活该线程继续判断。

每一个worker获取自身的Partition Stats,进入finishSuperStep方法中,等待全部的Request都被处理完;把自身的Aggregator信息发送给master;建立子节点,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data为该worker的partitionStatsList和workerSentMessages统计量;

最后调用waitForOtherWorkers()方法等待master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点。

master发现/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子节点数目等于workers数目后,根据/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子节点上的data收集每一个worker发送的aggregator信息,汇总为globalStats。

Master若发现全局信息中(1)全部顶点都voteHalt且没有消息传递,或(2)达到最大迭代次数 时,设置 globalStats.setHaltComputation(true)。告诉works结束迭代。

master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点,data为globalStats。告诉全部workers当前超级步结束。

每一个Worker检测到master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点后,读出该znode的数据,即全局的统计信息。而后决定是否继续下一次迭代。

十、同步以后开始下一个超级步。

十一、master和workers同步过程总结。

(1)master建立znode A,而后检测A的子节点数目是否等于workers数目,不等于就陷入等待。某个worker建立一个子节点后,就会唤醒master进行检测一次。

(2)每一个worker进行本身的工做,完成后,建立A的子节点A1。而后等待master建立znode B。

(3)若master检测到A的子节点数目等于workers的数目时,建立Znode B

(4)master建立B 节点后,会激活各个worker。同步结束,各个worker就能够开始下一个超步。

本质是经过znode B来进行全局同步的。

相关文章
相关标签/搜索