您好,登錄后才能下訂單哦!
這篇文章主要講解了“MapReduce編程模型是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“MapReduce編程模型是什么”吧!
MapReduce:大型集群上的簡單數據處理
摘要
MapReduce是一個設計模型,也是一個處理和產生海量數據的一個相關實現。用戶指定一個用于處理一個鍵值(key-value)對生成一組key/value對形式的中間結果的map函數,以及一個將中間結果鍵相同的鍵值對合并到一起的reduce函數。許多現實世界的任務都能滿足這個模型,如這篇文章所示。
使用這個功能形式實現的程序能夠在大量的普通機器上并行執行。這個運行程序的系統關心下面的這些細節:輸入數據的分區、一組機器上調度程序執行、處理機器失敗問題,以及管理所需的機器內部的通信。這使沒有任何并行處理和分布式系統經驗的程序員能夠利用這個大型分布式系統的資源。
我們的MapReduce實現運行在一個由普通機器組成的大規模集群上,具有很高的可擴展性:一個典型的MapReduce計算會在幾千臺機器上處理許多TB的數據。程序員們發現這個系統很容易使用:目前已經實現了幾百個MapReduce程序,在Google的集群上,每天有超過一千個的MapReduce工作在運行。
一、 介紹
在過去的5年中,本文作者和許多Google的程序員已經實現了數百個特定用途的計算程序,處理了海量的原始數據,包括抓取到的文檔、網頁請求日志等,計算各種衍生出來的數據,如反向索引、網頁文檔的圖形結構的各種表示、每個host下抓取到的頁面數量的總計、一個給定日期內的最頻繁查詢的集合等。大多數這種計算概念明確。然而,輸入數據通常都很大,并且計算必須分布到數百或數千臺機器上以確保在一個合理的時間內完成。如何并行計算、分布數據、處理錯誤等問題使這個起初很簡單的計算,由于增加了處理這些問題的很多代碼而變得十分復雜。
為了解決這個復雜問題,我們設計了一個新的抽象模型,它允許我們將想要執行的計算簡單的表示出來,而隱藏其中并行計算、容錯、數據分布和負載均衡等很麻煩的細節。我們的抽象概念是受最早出現在lisp和其它結構性語言中的map和reduce啟發的。我們認識到,大多數的計算包含對每個在輸入數據中的邏輯記錄執行一個map操作以獲取一組中間key/value對,然后對含有相同key的所有中間值執行一個reduce操作,以此適當的合并之前的衍生數據。由用戶指定map和reduce操作的功能模型允許我們能夠簡單的進行并行海量計算,并使用re-execution作為主要的容錯機制。
這項工作的最大貢獻是提供了一個簡單的、強大的接口,使我們能夠自動的進行并行和分布式的大規模計算,通過在由普通PC組成的大規模集群上實現高性能的接口來進行合并。
第二章描述了基本的編程模型,并給出了幾個例子。第三章描述了一個為我們的聚類計算環境定制的MapReduce接口實現。第四章描述了我們發現對程序模型很有用的幾個優化。第六章探索了MapReduce在Google內部的使用,包括我們在將它作為生產索引系統重寫的基礎的一些經驗。第七章討論了相關的和未來的工作。
二、 編程模型
這個計算輸入一個key/value對集合,產生一組輸出key/value對。MapReduce庫的用戶通過兩個函數來標識這個計算:Map和Reduce。
Map,由用戶編寫,接收一個輸入對,產生一組中間key/value對。MapReduce庫將具有相同中間key I的聚合到一起,然后將它們發送給Reduce函數。
Reduce,也是由用戶編寫的,接收中間key I和這個key的值的集合,將這些值合并起來,形成一個盡可能小的集合。通常,每個Reduce調用只產生0或1個輸出值。這些中間值經過一個迭代器(iterator)提供給用戶的reduce函數。這允許我們可以處理由于數據量過大而無法載入內存的值的鏈表。
2.1 例子
考慮一個海量文件集中的每個單詞出現次數的問題,用戶會寫出類似于下面的偽碼:
Map函數對每個單詞增加一個相應的出現次數(在這個例子中僅僅為“1”)。Reduce函數將一個指定單詞所有的計數加到一起。
此外,用戶使用輸入和輸出文件的名字、可選的調節參數編寫代碼,來填充一個mapreduce規格對象,然后調用MapReduce函數,并把這個對象傳給它。用戶的代碼與MapReduce庫(C++實現)連接到一起。。附錄A包含了這個例子的整個程序。
2.2 類型
盡管之前的偽代碼中使用了字符串格式的輸入和輸出,但是在概念上,用戶定義的map和reduce函數需要相關聯的類型:
map (k1, v1) --> list(k2, v2)
reduce (k2, list(v2)) --> list(v2)
也就是說,輸入的鍵和值和輸出的鍵和值來自不同的域。此外,中間結果的鍵和值與輸出的鍵和值有相同的域。
MapReduce的C++實現與用戶定義的函數使用字符串類型進行參數傳遞,將類型轉換的工作留給用戶的代碼來處理。
2.3 更多的例子
這里有幾個簡單有趣的程序,能夠使用MapReduce計算簡單的表示出來。
分布式字符串查找(Distributed Grep):map函數將匹配一個模式的行找出來。Reduce函數是一個恒等函數,只是將中間值拷貝到輸出上。
URL訪問頻率計數(Count of URL Access Frequency):map函數處理web頁面請求的日志,并輸出<URL, 1>。Reduce函數將相同URL的值累加到一起,生成一個<URL, total count>對。
翻轉網頁連接圖(Reverse Web-Link Graph):map函數為在一個名為source的頁面中指向目標(target)URL的每個鏈接輸出<target, source>對。Reduce函數將一個給定目標URL相關的所有源(source)URLs連接成一個鏈表,并生成對:<target, list(source)>。
主機關鍵向量指標(Term-Vector per Host):一個檢索詞向量將出現在一個文檔或是一組文檔中最重要的單詞概述為一個<word, frequency>對鏈表。Map函數為每個輸入文檔產生一個<hostname, term vector>(hostname來自文檔中的URL)。Reduce函數接收一個給定hostname的所有文檔檢索詞向量,它將這些向量累加到一起,將罕見的向量丟掉,然后生成一個最終的<hostname, term vector>對。
倒排索引(Inverted Index):map函數解析每個文檔,并生成一個<word, document ID>序列。Reduce函數接收一個給定單詞的所有鍵值對,所有的輸出對形成一個簡單的倒排索引。可以通過對計算的修改來保持對單詞位置的追蹤。
分布式排序(Distributed Sort):map函數將每個記錄的key抽取出來,并生成一個<key, record>對。Reduce函數不會改變任何的鍵值對。這個計算依賴了在4.1節提到的分區功能和4.2節提到的排序屬性。
三、 實現
MapReduce接口有很多不同的實現,需要根據環境來做出合適的選擇。比如,一個實現可能適用于一個小的共享內存機器,而另一個實現則適合一個大的NUMA多處理器機器,再另一個可能適合一個更大的網絡機器集合。
這一章主要描述了針對在Google內部廣泛使用的計算環境的一個實現:通過交換以太網將大量的普通PC連接到一起的集群。在我們的環境中:
(1) 機器通常是雙核x86處理器、運行Linux操作系統、有2-4G的內存。
(2) 使用普通的網絡硬件—通常是100Mb/s或者是1Gb/s的機器帶寬,但是平均值遠小于帶寬的一半。
(3) 由數百臺或者數千臺機器組成的集群,因此機器故障是很平常的事
(4) 存儲是由直接裝在不同機器上的便宜的IDE磁盤提供。一個內部的分布式文件系統用來管理存儲這些磁盤上的數據。文件系統在不可靠的硬件上使用副本機制提供了可用性和可靠性。
(5) 用戶將工作提交給一個調度系統,每個工作由一個任務集組成,通過調度者映射到集群中可用機器的集合上。
3.1 執行概述
通過自動的將輸入數據分區成M個分片,Map調用被分配到多臺機器上運行。數據的分片能夠在不同的機器上并行處理。使用分區函數(如,hash(key) mod R)將中間結果的key進行分區成R個分片,Reduce調用也被分配到多臺機器上運行。分區的數量(R)和分區函數是由用戶指定的。
獨立的工作機器的計數器值周期性的傳送到master(附在ping的響應上)master將從成功的map和reduce任務上獲取的計數器值進行匯總,當MapReduce操作完成時,將它們返回給用戶的代碼。當前的計數器值也被顯示在了master的狀態頁面上,使人們能夠看到當前計算的進度。當匯總計數器值時,master通過去掉同一個map或reduce任務的多次執行所造成的影響來防止重復計數。(重復執行可能會在我們使用備用任務和重新執行失敗的任務時出現。)
一些計數器的值是由MapReduce庫自動維護的,如已處理的輸入key/value對的數量和已生成的輸出key/value對的數量。
用戶發現計數器對檢查MapReduce操作的行為很有用處。例如,在一些MapReduce操作中,用戶代碼可能想要確保生成的輸出對的數量是否精確的等于已處理的輸入對的數量,或者已處理的德國的文檔數量在已處理的所有文檔數量中是否被容忍。
五、 性能
在這章中,我們測試兩個運行在一個大規模集群上的MapReduce計算的性能。一個計算在大約1TB的數據中進行特定的模式匹配,另一個計算對大約1TB的數據進行排序。
這兩個程序能夠代表實際中大量的由用戶編寫的MapReduce程序,一類程序將數據從一種表示方式轉換成另一種形式;另一類程序是從海里的數據集中抽取一小部分感興趣的數據。
5.1 集群配置
所有的程序運行在一個由將近1800臺機器組成的集群上。每個機器有兩個2GHz、支持超線程的Intel Xeon處理器、4GB的內存、兩個160GB的IDE磁盤和一個1Gbps的以太網鏈路,這些機器部署在一個兩層的樹狀交換網絡中,在根節點處有大約100-200Gbps的帶寬。所有的機器都采用相同的部署,因此任意兩個機器間的RTT都小于1ms。
在4GB內存里,有接近1-1.5GB用于運行在集群上的其它任務。程序在一個周末的下午開始執行,這時主機的CPU、磁盤和網絡基本都是空閑的。
5.2 字符串查找(Grep)
這個grep程序掃描了大概1010個100字節大小的記錄,查找出現概率相對較小的3個字符的模式(這個模式出現在92337個記錄中)。輸入被分割成接近64MB的片(M=15000),整個輸出被放到一個文件中(R=1)。
圖3:對于排序程序的不同執行過程隨時間的數據傳輸速率
圖3(a)顯示了排序程序的正常執行過程。左上方的圖顯示了輸入讀取的速率,這個速率峰值大約為13GB/s,因為所有的map任務執行完成,速率也在200秒前下降到了0。注意,這里的輸入速率比字符串查找的要小,這是因為排序程序的map任務花費了大約一半的處理時間和I/O帶寬將終結結果輸出到它們的本地磁盤上,字符串查找相應的中間結果輸出幾乎可以忽略。
左邊中間的圖顯示了數據通過網絡從map任務發往reduce任務的速率。這個緩慢的數據移動在第一個map任務完成時會盡快開始。圖中的第一個峰值是啟動了第一批大概1700個reduce任務(整個MapReduce被分配到大約1700臺機器上,每個機器每次最多只執行一個reduce任務)。這個計算執行大概300秒后,第一批reduce任務中的一些執行完成,我們開始執行剩下的reduce任務進行數據處理。所有的處理在計算開始后的大約600秒后完成。
左邊下方的圖顯示了reduce任務就愛那個排序后的數據寫到最終的輸出文件的速率。在第一個處理周期完成到寫入周期開始間有一個延遲,因為機器正在忙于對中間數據進行排序。寫入的速率會在2-4GB/s上持續一段時間。所有的寫操作會在計算開始后的大約850秒后完成。包括啟動的開銷,整個計算耗時891秒,這與TeraSort benchmark中的最好記錄1057秒相似。
一些事情需要注意:因為我們的位置優化策略,大多數數據從本地磁盤中讀取,繞開了網絡帶寬的顯示,所以輸入速率比處理速率和輸出速率要高。處理速率要高于輸出速率,因為輸出過程要將排序后的數據寫入到兩個拷貝中(為了可靠性和可用性,我們將數據寫入到兩個副本中)。我們將數據寫入兩個副本,因為我們的底層文件系統為了可靠性和可用性提供了相應的機制。如果底層文件系統使用容錯編碼(erasure coding)而不是復制,寫數據的網絡帶寬需求會降低。
5.4 備用任務的作用
在圖3(b)中,我們顯示了一個禁用備用任務的排序程序的執行過程。執行的流程與如3(a)中所顯示的相似,除了有一個很長的尾巴,在這期間幾乎沒有寫入行為發生。在960秒后,除了5個reduce任務的所有任務都執行完成。然而,這些落后者只到300秒后才執行完成。整個計算任務耗時1283秒,增加了大約44%的時間。
5.5 機器故障
在圖3(c)中,我們顯示了一個排序程序的執行過程,在計算過程開始都的幾分鐘后,我們故意kill掉了1746個工作進程中的200個。底層的調度者會迅速在這些機器上重啟新的工作進程(因為只有進程被殺掉,機器本身運行正常)。
工作進程死掉會出現負的輸入速率,因為一些之前已經完成的map工作消失了(因為香港的map工作進程被kill掉了),并且需要重新執行。這個map任務會相當快的重新執行。整個計算過程在933秒后完成,包括了啟動開銷(僅僅比普通情況多花費了5%的時間)。
六、 經驗
我們在2003年2月完成了MapReduce庫的第一個版本,并在2003年8月做了重大的改進,包括位置優化、任務在工作機器上的動態負載均衡執行等。從那時起,我們驚喜的發現,MapReduce庫能夠廣泛的用于我們工作中的各種問題。它已經被用于Google內部廣泛的領域,包括:
大規模機器學習問題
Google新聞和Froogle產品的集群問題
抽取數據用于公眾查詢的產品報告
從大量新應用和新產品的網頁中抽取特性(如,從大量的位置查詢頁面中抽取地理位置信息)
大規模圖形計算
表1: 2004年8月運行的MapReduce任務
在每個工作的最后,MapReduce庫統計了工作使用的計算資源。在表1中,我們看到一些2004年8月在Google內部運行的MapReduce工作的一些統計數據。
6.1 大規模索引
目前為止,MapReduce最重要的應用之一就是完成了對生產索引系統的重寫,它生成了用于Google網頁搜索服務的數據結構。索引系統的輸入數據是通過我們的爬取系統檢索到的海量文檔,存儲為就一個GFS文件集合。這些文件的原始內容還有超過20TB的數據。索引程序是一個包含了5-10個MapReduce操作的序列。使用MapReduce(代替了之前版本的索引系統中的adhoc分布式處理)有幾個優點:
索引程序代碼是一個簡單、短小、易于理解的代碼,因為容錯、分布式和并行處理都隱藏在了MapReduce庫中。比如,一個計算程序的大小由接近3800行的C++代碼減少到使用MapReduce的大約700行的代碼。
MapReduce庫性能非常好,以至于能夠將概念上不相關的計算分開,來代替將這些計算混合在一起進行,避免額外的數據處理。這會使索引程序易于改變。比如,對之前的索引系統做一個改動大概需要幾個月時間,而對新的系統則只需要幾天時間。
索引程序變得更易于操作,因為大多數由于機器故障、機器處理速度慢和網絡的瞬間阻塞等引起的問題都被MapReduce庫自動的處理掉,而無需人為的介入。
七、 相關工作
許多系統都提供了有限的程序模型,并且對自動的并行計算使用了限制。比如,一個結合函數可以在logN時間內在N個處理器上對一個包含N個元素的數組使用并行前綴計算,來獲取所有的前綴[6,9,13]。MapReduce被認為是這些模型中基于我們對大規模工作計算的經驗的簡化和精華。更為重要的是,我們提供了一個在數千個處理器上的容錯實現。相反的,大多數并行處理系統只在較小規模下實現,并將機器故障的處理細節交給了程序開發者。
Bulk Synchronous Programming和一些MPI源于提供了更高層次的抽象使它更易于讓開發者編寫并行程序。這些系統和MapReduce的一個關鍵不同點是MapReduce開發了一個有限的程序模型來自動的并行執行用戶的程序,并提供了透明的容錯機制。
我們的位置優化機制的靈感來自于移動磁盤技術,計算用于處理靠近本地磁盤的數據,減少數據在I/O子系統或網絡上傳輸的次數。我們的系統運行在掛載幾個磁盤的普通機器上,而不是在磁盤處理器上運行,但是一般方法是類似的。
我們的備用任務機制與Charlotte系統中采用的eager調度機制類似。簡單的Eager調度機制有一個缺點,如果一個給定的任務造成反復的失敗,整個計算將以失敗告終。我們通過跳過損壞計算路的機制,解決了這個問題的一些情況。
MapReduce實現依賴了內部集群管理系統,它負責在一個大規模的共享機器集合中分發和運行用戶的任務。盡管不是本篇文章的焦點,但是集群管理系統在本質上與像Condor的其它系統類似。
排序功能是MapReduce庫的一部分,與NOW-Sort中的操作類似。源機器(map工作進程)將將要排序的數據分區,并將其發送給R個Reduce工作進程中的一個。每個reduce工作進程在本地對這些數據進行排序(如果可能的話就在內存中進行)。當然NOW-Sort沒有使MapReduce庫能夠廣泛使用的用戶定義的Map和Reduce函數。
River提供了一個編程模型,處理進程通過在分布式隊列上發送數據來進行通信。像MapReduce一樣,即使在不均勻的硬件或系統顛簸的情況下,River系統依然試圖提供較好的平均性能。River系統通過小心的磁盤和網絡傳輸調度來平衡完成時間。通過限制編程模型,MapReduce框架能夠將問題分解成很多細顆粒的任務,這些任務在可用的工作進程上動態的調度,以至于越快的工作進程處理越多的任務。這個受限制的編程模型也允許我們在工作將要結束時調度冗余的任務進行處理,這樣可以減少不均勻情況下的完成時間。
BAD-FS與MapReduce有完全不同的編程模型,不像MapReduce,它是用于在廣域網下執行工作的。然而,它們有兩個基本相似點。(1)兩個系統都使用了重新執行的方式來處理因故障而丟失的數據。(2)兩個系統都本地有限調度原則來減少網絡鏈路上發送數據的次數。
TASCC是一個用于簡化結構的高可用性的網絡服務。像MapReduce一樣,它依靠重新執行作為一個容錯機制。
感謝各位的閱讀,以上就是“MapReduce編程模型是什么”的內容了,經過本文的學習后,相信大家對MapReduce編程模型是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。