您好,登錄后才能下訂單哦!
這篇文章主要介紹“Clojure的Map-Reduce怎么理解”,在日常操作中,相信很多人在Clojure的Map-Reduce怎么理解問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Clojure的Map-Reduce怎么理解”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
一種看起來和用起來跟 clojure.core 都很像的 map-reduce 語言
可以把 map-reduce 的查詢當成程序來寫,而不是當成腳本來寫
為單元測試和迭代部署提供強大的支持
注意:如果你對 Clojure 不是很熟悉,我們強烈推薦你試下這里,這里 或者 這里 的教程來了解一些 基礎。
如果你會 Clojure,你就已經會 PigPen 了
PigPen 的主要目標是要把語言帶出等式的行列。PigPen 的操作符設計的和 Clojure 里盡可能的相似,沒有特殊的用戶自定義函數(UDFs)。只需要定義函數(匿名的或者命名的),然后你就能像在 Clojure 程序里一樣使用它們。
這里有個常用的 word count 的例子:
(require '[pigpen.core :as pig]) (defn word-count [lines] (->> lines (pig/mapcat #(-> % first (clojure.string/lower-case) (clojure.string/replace #"[^\w\s]" "") (clojure.string/split #"\s+"))) (pig/group-by identity) (pig/map (fn [[word occurrences]] [word (count occurrences)]))))
這段代碼定義了一個函數,這個函數返回一個 PigPen 的查詢表達式。這個查詢接受一系列的行作為輸入,返回每個單詞出現的次數。你可以看到這只是一個 word count 的邏輯,并沒有設計到一些外部的東西,比如數據從哪里來的,會產生哪些輸出。
當然。PigPen 的查詢是寫成函數的組合——數據輸入、輸出。只需要寫一次,不需要到處復制、粘貼。
現在我們利用以上定義的 word-count 函數,加上 load 和 store 命令,組成一個 PigPen 的查詢:
(defn word-count-query [input output] (->> (pig/load-tsv input) (word-count) (pig/store-tsv output)))
這個函數返回查詢的 PigPen 表示,他自己不會做什么,我們需要從本地執行它,或者生成一個腳本(之后會講)。
利用 PigPen,你可以 mock 輸入數據來為你的查詢寫單元測試。再也不需要交叉著手指想象提交到 cluster 上后會發生什么,也不需要截出部分文件來測試輸入輸出。
Mock 數據真的很容易,通過 pig/return 和 pig/constantly,你可以在你的腳本里注入任意的數據作為起始點。
一個常用的模式是利用 pig/take 來從實際數據源中抽樣出幾行,用 pig/return 把結果包一層,就得到了 mock 數據。
(use 'clojure.test) (deftest test-word-count (let [data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])] (is (= (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]))))
pig/dump 操作符會在本地執行查詢。
向你的查詢傳參數很麻煩,所有函數范圍內的變量或者 let 的綁定在函數里都可用。
(defn reusable-fn [lower-bound data] (let [upper-bound (+ lower-bound 10)] (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))
注意 lower-bound 和 upper-bound 在生成腳本的時候就有了,在 cluster 上執行函數的時候也能使用。
只要告訴 PigPen 哪里會把一個查詢寫成一個 Pig 腳本:
(pig/write-script "word-count.pig" (word-count-query "input.tsv" "output.tsv"))
這樣你就能得到一個可以提交到 cluster 上運行的 Pig 腳本。這個腳本會用到 pigpen.jar,這是一個加入所有依賴的 uberjar,所以要保證這個 jar 也一起被提交了。還可以把你的整個 project 打包成一個 uberjar 然后提交,提交之前記得先重命名。怎么打包成 uberjar 請參照教程。
之前看到,我們可以用 pig/dump 來本地運行查詢,返回 Clojure 數據:
=> (def data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])) #'pigpen-demo/data => (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]
如果你現在就像開始,請參照 getting started & tutorials。
Map-Reduce 對于處理單臺機器搞不定的數據是很有用,有了 PigPen,你可以像在本地處理數據一樣處理海量數據。Map-Reduce 通過把數據分散到可能成千上萬的集群節點來達到這一目的,這些節點每個都會處理少量的數據,所有的處理都是并行的,這樣完成一個任務就比單臺機器快得多。像 join 和 group 這樣的操作,需要多個節點數據集的協調,這種情況會通過公共的 join key 把數據分到同一個分區計算,join key 的同一個值會送到同一個指定的機器。一旦機器上得到了所有可能的值,就能做 join 的操作或者做其他有意思的事。
想看看 PigPen 怎么做 join 的話,就來看看 pig/cogroup 吧。cogroup 接受任意數量的數據集然后根據一個共同的 key 來分組。假設我們有這樣的數據:
foo: {:id 1, :a "abc"} {:id 1, :a "def"} {:id 2, :a "abc"} bar: [1 42] [2 37] [2 3.14] baz: {:my_id "1", :c [1 2 3]]}
如果想要根據 id 分組,可以這樣:
(pig/cogroup (foo by :id) (bar by first) (baz by #(-> % :my_id Long/valueOf)) (fn [id foos bars bazs] ...))
前三個參數是要 join 的數據集,每一個都會指定一個函數來從數據源中選出 key。最后的一個參數是一個函數,用來把分組結果結合起來。在我們的例子中,這個函數會被調用兩次:
[1 ({:id 1, :a "abc"}, {:id 1, :a "def"}) ([1 42]) ({:my_id "1", :c [1 2 3]]})] [2 ({:id 2, :a "abc"}) ([2 37] [2 3.14]) ()]
這把所有 id 為 1 的值和 id 為 2 的值結合在了一起。不同的鍵值被獨立的分配到不同的機器。默認情況下,key 可以不在數據源中出現,但是有選項可以指定必須出現。
Hadoop 提供了底層的接口做 map-reduce job,但即便如此還是有限制的,即一次只會運行一輪 map-reduce,沒有數據流和復雜查詢的概念。Pig 在 Hadoop 上抽象出一層,但到目前為止,它仍舊只是一門腳本語言,你還是需要用 UDF 來對數據做一些有意思的事情。PigPen 更進一步的做了抽象,把 map-reduce 做成了一門語言。
如果你剛接觸 map-reduce,我們推薦你看下這里。
**代碼重用。**我們希望能定義一段邏輯,然后通過穿參數把它用到不同的 job 里。
**代碼一體化。**我們不想在腳本和不同語言寫的 UDF。 之間換來換去,不想考慮不同數據類型在不同語言中的對應關系。
**組織好代碼。**我們想把代碼寫在多個文件里,想怎么組織怎么組織,不要被約束在文件所屬的 job 里。
**單元測試。**我們想讓我們的抽樣數據關聯上我們的單元測試,我們想讓我們的單元測試在不存取數據的情況下測試業務邏輯。
**快速迭代。**我們想能夠在任何時候注入 mock data,我們想在不用等 JVM 啟動的情況下測試一個查詢。
**只給想要命名的東西命名。**大部分 map-reduce 語言對中間結果要求命名和指定數據結構,這使得用 mock data 來測試單獨的 job 變得困難。我們想要在我們覺得合適的地方組織業務邏輯并命名,而不是受語言的指使。
我們受夠了寫腳本,我們想要寫程序。
注意:PigPen 不是 一個 Clojure 對 Pig 腳本的封裝,很有可能產生的腳本是人看不懂的。
PigPen 設計的和 Clojure 盡可能保持一致。Map-Reduce 是函數式編程,那為什么不利用一門已存在的強大的函數式編程語言呢?這樣不光學習曲線低,而且大多數概念也能更容易的應用到大數據上。
在 PigPen 中,查詢被當做 expression tree 處理,每個操作符都被表示需要的行為信息的 map,這些 map 可以嵌套在一起組成一個復雜查詢的樹形表式。每個命令包含了指向祖命令的引用。在執行的時候,查詢樹會被轉化成一個有向無環的查詢圖。這可以很容易的合并重復的命令,優化相關命令的順序,并且可以利用 debug 信息調試查詢。
去重 當我們把查詢表示成操作圖的時候,去重是一件很麻煩的事。Clojure 提供了值相等的操作,即如果連個對象的內容相同,它們就相等。如果兩個操作有相同的表示,那它們完全相同,所以在寫查詢的時候不用擔心重復的命令,它們在執行之前都會被優化。
舉個例子,假設我們有這樣兩個查詢:
(let [even-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter even?) (pig/store-clj "even-squares.clj")) odd-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter odd?) (pig/store-clj "odd-squares.clj"))] (pig/script even-squares odd-squares))
在這個查詢中,我們從一個文件加載數據,計算每個數的平方,然后分成偶數和奇數,操作圖看起來是這樣: 在此輸入圖片描述
這符合我們的查詢,但是做了很多額外的工作。我們加載了 input.clj
兩次,所有數的平方也都計算了兩次。這看上去可能沒有很多工作,但是當你對很多數據做這樣的事情,簡單的操作累加起來就很多。為了優化這個查詢,我們可以找出相同的操作。看第一眼發現我們計算平方的操作可能是一個候選,但是他們有不同的父節點,因此不能把他們合并在一起。但是我們可以把加載函數合并,因為他們沒有父節點,而且他們加載相同的文件。
現在我們的圖看起來是這樣:
現在我們值加載一次數據,這會省一些時間,但還是要計算兩次平方。因為我們現在只有一個加載的命令,我們的 map 操作現在相同,可以合并:
這樣我們就得到了一個優化過的查詢,每個操作都是唯一的。因為我們每次只會合并一個命令,我們不會修改查詢的邏輯。你可以很容易的生成查詢,而不用擔心重復的執行,PigPen 對重復的部分只會執行一次。
序列化 當我們用 Clojure 處理完數據以后,數據必須序列化成二進制字節,Pig 才能在集群的機器間傳數據。這對 PigPen 是一個很昂貴但是必須的過程。幸運的是一個腳本中經常有很多連續的操作可以合成一個操作,這對于不必要的序列化和反序列化節省了很多時間。例如,任意連續的 map,filter 和 mapcat 操作都可以被重寫成一個單獨的 mapcat 操作。
我們通過一些例子來說明:
在這個例子中,我們從一個序列化的值(藍色)4開始,對它反序列化(橙色),執行我們的 map 函數,然后再把它序列化。
現在我們來試一個稍微復雜一點的(更現實的)例子。在這個例子中,我們執行一個 map,一個 mapcat 和一個 filter 函數。
如果你以前沒用過 mapcat,我可以告訴你這是對一個值運行一個函數然后返回一串值的操作。那個序列會被 flatten,每個值都會傳給下一步使用。在 Clojure 里,那是 map 和 concat 聯合之后的結果,在 Scala 里,這叫做 flatMap,而在 C# 里叫 selectMany。
在下圖中,左邊的流程是我們優化之前的查詢,右邊的是優化之后的。和第一個例子一樣,我們同樣從 4 開始,計算平方,然后對這個值做減一的操作,返回本身和加一的操作。Pig 會得到這個值的集合然后做 flatten,使每個值都成為下一步的輸入。注意在和 Pig 交互的時候我們要序列化和反序列化。第三步,也就是最后一步對數據進行過濾,在這個例子中我們只保留奇數值。如圖所示,我們在任意兩步之間都序列化和反序列化數據。
右邊的圖顯示了優化后的結果。每個操作都返回了一個元素序列。map 操作返回一個只有單元素 16 的序列,mapcat 也一樣,過濾操作返回 0 元素或單元素的序列。通過是這些命令保持一致,我們可以很容易的把他們合并到一起。我們在一套命令中flattrn 了更多的值序列,但是在步驟之間沒有序列化的消耗。雖然卡起來更復雜,但是這個優化是每個步驟都執行的更快了。
交互式開發,測試,以及可調試性是 PigPen 的關鍵功能。如果你有一個一次運行好幾天的 job,那你最不想看到的是跑了十一個小時后冒出來一個 bug。PigPen 有個基于 rx 的本地運行模式。這可以讓我們對查詢寫單元測試。這樣我們可以更有把握的知道運行的時候不會掛掉,并且能返回期待的值。更牛逼的是這個功能可以讓我們進行交互式的開發。
通常情況下,我們剛開始會從數據源中選一些記錄來做單元測試。因為 PigPen 在 REPL 中返回數據,我們不需要額外構造測試數據。這樣,通過 REPL,我們可以根據需要對 mock 數據做 map,filter,join 和 reduce 的操作。每個步驟都可以驗證結果是不是我們想要的。這種方法相對于寫一長串腳本然后憑空想象能產生更可靠的數據。還有一個有用的地方是可以把復雜的查詢寫成幾個較小的函數單元。Map-reduce 查詢隨著數據源的量級可能產生劇烈的增加或減少。當你把腳本作為一個整體測試的時候,你可能要讀一大堆數據,最后產生一小撮數據。通過把查詢細化成較小的單元,你可以對讀 100 行,產生 2 行這樣子來測試一個單元,然后測試第二個單元的時候可以用這兩行作為模板來產生 100 多個數據。
調試模式對于解決異常很有用,啟用后會在正常輸出的同時,把腳本中每個操作的結果寫到磁盤上。這對于像 Hadoop 這樣的環境很有用,在這種情況下,你沒法單步跟蹤代碼,而且每個步驟都可能花好幾個小時。調試模式還可以可視化流程圖。這樣可以可視化的把執行計劃的和實際操作的輸出關聯起來。
要啟用調試模式,請參考 pig/write-script 和 pig/generate-script 的選項,這會在指定的目錄下寫額外的調試輸出。
啟用調試模式的例子:
(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)
要啟用可視化模式,可以看看 pig/show 和 pig/dump&show。
可視化的例子:
(pig/show my-pigpen-query) ;; Shows a graph of the query (pig/dump&show my-pigpen-query) ;; Shows a graph and runs it locally
PigPen 有個好用的功能是可以很容易的創建自己的操作符。例如,我們可以定義像求差集和交集這樣的集合和多集合的操作符,這些只是像 co-group
這樣的操作符的變體,但是如果能定義,測試它們,然后再也不去想這些邏輯怎么實現的,那就更好了。
這對更復雜的操作也是很有用的。對于集合數據我們有 sum
,avg
,min
,max
,sd
和 quantiles
這些可重用的統計操作符,還有 pivot
這樣的操作符可以把多維數據分組然后對每組計數。
這些操作本身都是簡單的操作,但是當你把它們從你的查詢中抽象出來之后,你的查詢也會變的簡單很多。這時候你可以花更多的時間去想怎么解決問題,而不是每次都重復寫基本的統計方法。
我們選擇 Pig 是因為我們不想把 Pig 已有的優化的邏輯重寫一遍,不考慮語言層面的東西的話,Pig 在移動大數據方面做得很好。我們的策略是利用 Pig 的 DataByteArray 二進制格式來移動序列化的 Clojure 數據。在大多數情況下,Pig 不需要知道數據的底層展現形式。Byte array 可以很快的做比較,這樣對于 join 和 group 操作,Pig 只需要簡單的比較序列化的二進制,如果序列化的輸出一致,在 Clojure 中值就相等。不過這對于數據排序不適用。二進制的排序其實沒什么用,而且和原始數據的排序結果也不一樣。要想排序,還得把數據轉化回去,而且只能對簡單類型排序。這也是 Pig 強加給 PigPen 的為數不多的一個缺陷。
我們在決定做 PigPen 之前也評估過其他語言。第一個要求就是那必須是一門編程語言,并不是一種腳本語言加上一堆 UDF。我們簡單看過 Scalding,它看上去很有前途,但是我們的團隊主要是用的 Clojure。 可以這么說,PigPen 對于 Clojure 就像是 Scalding 對于 Scala。Cascalog 是用 Clojure 寫 map-reduce 通常會用的語言,但是從過去的經驗來看,Cascalog 對于日常工作其實沒什么用,你需要學一套復雜的新語法和很多概念,通過變量名對齊來做隱式 join 也不是理想的方案,如果把操作符順序弄錯了會造成很大的性能問題,Cascalog 會 flatten 數據結果(這可能很浪費),而且組合查詢讓人感覺很別扭。
我們也考慮過對 PigPen 用一門宿主語言。這樣也能在 Hive 之上構建類似的抽象,但是對每個中間產物都定義 schema 跟 Clojure 的理念不符。而且 Hive 類似與 SQL,使得從功能性語言翻譯更難。像 SQL 和 Hive 這樣的關系模型語言與像 Clojure 和 Pig 這樣的功能性語言之間有著巨大的差。最后,最直接的解決辦法就是在 Pig 之上做一層抽象。
到此,關于“Clojure的Map-Reduce怎么理解”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。