您好,登錄后才能下訂單哦!
這篇文章主要介紹了apache spark中怎么實現端對端的 exactly once,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Exactly Once的實現
如果實時作業要實現端對端的 exactly once 則需要數據源、數據處理與數據存儲的三個階段都保證 exactly once 的語義。目前基 于Kafka Direct API加上Spark RDD 算子精確一次的保證能夠實現端對端的 exactly once 的語義。
在數據存儲階段一般實現 exactly once 需要保證存儲的過程是冪等操作或事務操作。很多系統本身就支持了冪等操作,比如相同數據寫 hdfs 同一個文件,這本身就是冪等操作,保證了多次操作最終獲取的值還是相同;HBase、ElasticSearch 與 redis 等都能夠實現冪等操作。對于關系型數據庫的操作一般都是能夠支持事務性操作。
官方在創建 DirectKafkaInputStream 時只需要輸入消費 Kafka 的 From Offset,然后其自行獲取本次消費的 End Offset,也就是當前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消費從上次的 End Offset 開始消費。
當程序宕機或重啟任務后,這其中存在一些問題。如果在數據處理完成前存儲 Offset,則可能存在作業處理數據失敗與作業宕機等情況,重啟后會無法追溯上次處理的數據導致數據出現丟失。如果在數據處理完成后存儲 Offset,但是存儲 Offset 過程中發生失敗或作業宕機等情況,則在重啟后會重復消費上次已經消費過的數據。
而且此時又無法保證重啟后消費的數據與宕機前的數據量相同數據相當,這又會引入另外一個問題,如果是基于聚合統計指標作更新操作,這會帶來無法判斷上次數據是否已經更新成功。
所以在 muise spark core 中我們加入了自己的實現用以保證 Exactly once 的語義。具體的實現是我們對 Spark 源碼進行了改造,保證在創建 DirectKafkaInputStream 可以同時輸入 From Offset 與 End Offset,并且我們在存儲 Kafka Offset 的時候保存了每個批次的起始Offset 與結束 Offset,具體格式如下:
如此做的用意在于能夠確保無論是宕機還是人為重啟,重啟后的第一個批次與重啟前的最后一個批次數據一模一樣。這樣的設計使得后面用戶在后面對于第一個批次的數據處理非常靈活可變,如果用戶直接忽略第一個批次的數據,那此時保證的是 at most once 的語義,因為我們無法獲知重啟前的最后一個批次數據操作是否有成功完成。
如果用戶依照原有邏輯處理第一個批次的數據,不對其做去重操作,那此時保證的是 at least once 的語義,最終結果中可能存在重復數據;最后如果用戶想要實現 exactly once,muise spark core 提供了根據topic、partition 與 offset 生成 UID 的功能。
只要確保兩個批次消費的 Offset 相同,則最終生成的 UID 也相同,用戶可以根據此 UID 作為判斷上個批次數據是否有存儲成功的依據。下面簡單的給出了重啟后第一個批次操作的行為。
002
Metrics系統
Musie spark core 基于 Spark 本身的 metrics 系統進行了改造,添加了許多定制的 metrics,并且向用戶暴露了 metrics 注冊接口,用戶可以非常方便的注冊自己的 metrics 并在程序中更新 metrics 的數值。最后所有的 metrics 會根據作業設定的批次間隔寫入 Graphite,基于公司定制的預警系統進行報警,前端可以通過 Grafana 展現各項 metrics 指標。
Muise spark core本身定制的metrics包含以下三種:
Fail 批次時間內 spark task 失敗次數超過4次便報警,用于監控程序的運行狀態。
Ack 批次時間內 spark streaming 處理的數據量小0便報警,用于監控程序是否在正常消費數據。
Lag 批次時間內數據消費延遲大于設定值便報警。
其中由于我們大部分作業開啟了 Back Pressure 功能,這就導致在Spark UI 中看到每個批次數據都能在正常時間內消費完成,然而可能此時 kafka 中已經積壓了大量數據,故每個批次我們都會計算當前消費時間與數據本身時間的一個平均差值,如果這個差值大于批次時間,說明本身數據消費就已經存在了延遲。
下圖展現了預警系統中,基于用戶自定義注冊的Metrics以及系統定制的Metrics進行預警。
003
容錯
其實在上面 Exactly Once 一章中已經詳細的描述了 muise spark core如何在程序宕機后能夠保證數據正確的處理。但是為了能夠讓 Spark Sreaming 能夠長時間穩定的運行在Yarn集群上,還需要添加許多配置,感興趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。
除了上述容錯保證之外,Muise Portal(后面會講)也提供了對 Spark Streaming 作業定時檢測的功能。目前每過5分鐘對當前所有數據庫中狀態標記為 Running 的 Spark Streaming 作業進行狀態檢測,通過Yarn提供的REST APIs可以根據每個作業的Application Id查詢作業在Yarn上的狀態,如果狀態處于非運行狀態,則會嘗試重啟作業。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“apache spark中怎么實現端對端的 exactly once”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。