中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Giraph 源碼分析(五)—— 加載數據+同步總結

發布時間:2020-06-09 17:17:04 來源:網絡 閱讀:177 作者:數瀾 欄目:大數據

作者|白松

關于Giraph 共有九個章節,本文第五個章節。

環境:在單機上(機器名:giraphx)啟動了2個workers。

輸入:SSSP文件夾,里面有1.txt和2.txt兩個文件。

1、在Worker向Master匯報健康狀況后,就開始等待Master創建InputSplit。

方法:每個Worker通過檢某個Znode節點是否存在,同時在此Znode上設置Watcher。若不存在,就通過BSPEvent的waitForever()方法釋放當前線程的鎖,陷入等待狀態。一直等到master創建該znode。此步驟位于BSPServiceWorker類中的startSuperStep方法中,等待代碼如下:

Giraph 源碼分析(五)—— 加載數據+同步總結cdn.xitu.io/2019/8/8/16c6f1c19ae23057?w=558&h=454&f=png&s=237620">
2、Master調用createInputSplits()方法創建InputSplit。

Giraph 源碼分析(五)—— 加載數據+同步總結

在generateInputSplits()方法中,根據用戶設定的VertexInputFormat獲得InputSplits。代碼如下:

Giraph 源碼分析(五)—— 加載數據+同步總結

其中minSplitCountHint為創建split的最小數目,其值如下:

minSplitCountHint = Workers數目 * NUM_INPUT_THREADS

NUM_INPUT_THREADS表示 每個Input split loading的線程數目,默認值為1 。 經查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint參數被忽略。用戶輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。

如果得到的splits.size小于minSplitCountHint,那么有些worker就沒被用上。

得到split信息后,要把這些信息寫到Zookeeper上,以便其他workers訪問。上面得到的split信息如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍歷splits List,為每個split創建一個Znode,值為split的信息。如為split-0創建Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

為split-1創建znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最后創建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都創建好了。

3、Master根據splits創建Partitions。首先確定partition的數目。

Giraph 源碼分析(五)—— 加載數據+同步總結

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>對象默認為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:

Giraph 源碼分析(五)—— 加載數據+同步總結

上面代碼中是在工具類PartitionUtils計算Partition的數目,計算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默認值為1 。

可見,partitionCount值為4(122)。創建的partitionOwnerList信息如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4、Master創建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。

5、Master最后在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等信息寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master調用barrierOnWorkerList()方法開始等待各個Worker完成數據加載。調用關系如下:

Giraph 源碼分析(五)—— 加載數據+同步總結

barrierOnWorkerList中創建znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然后檢查該znode的子節點數目是否等于workers的數目,若不等于,則線程陷入等待狀態。后面某個worker完成數據加載后,會創建子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來激活該線程繼續判斷。

6、當Master創建第5步的znode后,會激活worker。

每個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各個worker開始加載數據。

把partitionOwnerList復制給BSPServiceWorker類中的workerGraphPartitioner(默認為HashWorkerPartitioner類型)對象的partitionOwnerList變量,后續每個頂點把根據vertexID通過workerGraphPartitioner對象獲取其對應的partitionOwner。

Giraph 源碼分析(五)—— 加載數據+同步總結

每個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節點,得到inputSplitPathList,內容如下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

然后每個Worker創建N個InputsCallable線程讀取數據。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默認值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那么,默認每個worker就是創建一個線程來加載數據。

在InputSplitsHandler類中的reserveInputSplit()方法中,每個worker都是遍歷inputSplitPathList,通過創建znode來保留(標識要處理)的split。代碼及注釋如下:

Giraph 源碼分析(五)—— 加載數據+同步總結

當用reserveInputSplit()方法獲取某個znode后,loadSplitsCallable類的loadInputSplit方法就開始通過該znode獲取其HDFS的路徑信息,然后讀入數據、重分布數據。

Giraph 源碼分析(五)—— 加載數據+同步總結

Giraph 源碼分析(五)—— 加載數據+同步總結

VertexInputSplitsCallable類的readInputSplit()方法如下:

Giraph 源碼分析(五)—— 加載數據+同步總結

7、每個worker加載完數據后,調用waitForOtherWorkers()方法等待其他workers都處理完split。

Giraph 源碼分析(五)—— 加載數據+同步總結

策略如下,每個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下創建子節點,后面追加自己的worker信息,如worker1、worker2創建的子節點分別如下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

創建完后,然后等待master創建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

8、從第5步驟可知,若master發現/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節點數目等于workers的總數目,就會在coordinateInputSplits()方法中創建

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個worker,所有的worker都處理完了split。

9、最后就是就行全局同步。

master創建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再調用barrierOnWorkerList方法檢查該znode的子節點數目是否等于workers的數目,若不等于,則線程陷入等待狀態。等待worker創建子節點來激活該線程繼續判斷。

每個worker獲取自身的Partition Stats,進入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator信息發送給master;創建子節點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統計量;

最后調用waitForOtherWorkers()方法等待master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點。

master發現/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節點數目等于workers數目后,根據/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節點上的data收集每個worker發送的aggregator信息,匯總為globalStats。

Master若發現全局信息中(1)所有頂點都voteHalt且沒有消息傳遞,或(2)達到最大迭代次數 時,設置 globalStats.setHaltComputation(true)。告訴works結束迭代。

master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點,data為globalStats。告訴所有workers當前超級步結束。

每個Worker檢測到master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點后,讀出該znode的數據,即全局的統計信息。然后決定是否繼續下一次迭代。

10、同步之后開始下一個超級步。

11、master和workers同步過程總結。

(1)master創建znode A,然后檢測A的子節點數目是否等于workers數目,不等于就陷入等待。某個worker創建一個子節點后,就會喚醒master進行檢測一次。

(2)每個worker進行自己的工作,完成后,創建A的子節點A1。然后等待master創建znode B。

(3)若master檢測到A的子節點數目等于workers的數目時,創建Znode B

(4)master創建B 節點后,會激活各個worker。同步結束,各個worker就可以開始下一個超步。

本質是通過znode B來進行全局同步的。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

富民县| 武平县| 青田县| 宁陕县| 二连浩特市| 内乡县| 桃江县| 庆元县| 满城县| 大关县| 双峰县| 上饶县| 河津市| 英吉沙县| 通城县| 哈巴河县| 高密市| 泰宁县| 大新县| 玛沁县| 迭部县| 鄯善县| 辽阳县| 寿光市| 天等县| 长泰县| 高阳县| 新晃| 龙里县| 西畴县| 乐都县| 盐源县| 渭南市| 明溪县| 秀山| 监利县| 封丘县| 曲周县| 丹寨县| 柳林县| 连城县|