您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關Databricks如何使用Spark Streaming和Delta Lake對流式數據進行數據質量監控,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
小編主要對Databricks如何使用Spark Streaming和Delta Lake對流式數據進行數據質量監控的方法和架構進行了介紹,下面探討了一種數據管理架構,該架構可以在數據到達時,通過主動監控和分析來檢測流式數據中損壞或不良的數據,并且不會造成瓶頸。
構建流式數據分析和監控流程
在Databricks,我們看到客戶中不斷涌現出許多數據處理模式,這些新模式的產生推動了可能的極限,在速度和質量問題上也不例外。為了幫助解決這一矛盾,我們開始考慮使用正確的工具,不僅可以支持所需的數據速度,還可以提供可接受的數據質量水平。Structured Streaming和Delta Lake非常適合用于數據獲取和存儲層,因為他們能夠配合創造一個具有擴展性、容錯性和類實時的系統,并且具有exactly-once處理保證。
為企業數據質量分析找到可接受的工具要困難一些,特別是這個工具需要具有對數據質量指標的狀態匯總的能力。另外,還需要能夠對整個數據集進行檢查(例如檢測出多少比例的記錄為空值),這些都會隨著所提取的數據量的增加而增加計算成本。這對所有流式系統而言都是需要的,這一要求就排除了很多可用的工具。
在我們最初的解決方案中,我們選擇了Amazon的數據質量檢測工具Deequ,因為它能提供簡單而強大的API,有對數據質量指標進行狀態聚合的能力,以及對Scala的支持。將來,其他Spark原生的工具將提供額外的選擇。
我們通過在EC2實例上運行一個小型的Kafka producer來模擬數據流,該實例將模擬的股票交易信息寫入Kafka topic,并使用原生的Databricks連接器將這些數據導入到Delta Lake表當中。為了展示Spark Streaming中數據質量檢查的功能,我們選擇在整個流程中實現Deequ的不同功能:
根據歷史數據生成約束條件;
使用foreachBatch算子對到達的數據進行增量質量分析;
使用foreachBatch算子對到達的數據執行(較小的)單元測試,并將質量不佳的batch隔離到質量不佳記錄表中;
對于每個到達的batch,將最新的狀態指標寫入到Delta表當中;
對整個數據集定期執行(較大的)單元測試,并在MLFlow中跟蹤結果;
根據驗證結果發送通知(如通過電子郵件或Slack);
捕獲MLFlow中的指標以進行可視化和記錄。
我們結合了MLFlow來跟蹤一段時間內數據性能指標的質量、Delta表的版本迭代以及結合了一個用于通知和告警的Slack連接器。整個流程可以用如下的圖片進行表示:
由于Spark中具有統一的批處理/流式處理接口,因此我們能夠在這個流程的任何位置提取報告、告警和指標,作為實時更新或批處理快照。這對于設置觸發器或限制特別有用,因此,如果某個指標超過了閾值,則可以執行數據質量改善措施。還要注意的是,我們并沒有對初始到達的原始數據造成影響,這些數據將立即提交到我們的Delta表,這意味著我們不會限制數據輸入的速率。下游系統可以直接從該表中讀取數據,如果超過了上述任何觸發條件或質量閾值,則可能會中斷。此外,我們可以輕松地創建一個排除質量不佳記錄的view以提供一個干凈的表。
在一個較高的層次,執行我們的數據質量跟蹤和驗證的代碼如下所示:
spark.readStream
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// reassign our current state to the previous next state
val stateStoreCurr = stateStoreNext
// run analysis on the current batch, aggregate with saved state
val metricsResult = AnalysisRunner.run(data=batchDF, ...)
// verify the validity of our current microbatch
val verificationResult = VerificationSuite()
.onData(batchDF)
.addCheck(...).run()
// if verification fails, write batch to bad records table
if (verificationResult.status != CheckStatus.Success) {...}
// write the current results into the metrics table
Metric_results.write
.format("delta")
.mode("overwrite")
.saveAsTable("deequ_metrics")
}
.start()
使用數據質量工具Deequ
在Databricks中使用Deequ是相對比較容易的事情,你需要首先定義一個analyzer,然后在dataframe上運行該analyzer。例如,我們可以跟蹤Deequ本地提供的幾個相關指標檢查,包括檢查數量和價格是否為非負數、原始IP地址是否不為空以及符號字段在所有事務中的唯一性。Deequ的StateProvider對象在流式數據配置中特別有用,它能允許用戶將我們指標的狀態保存在內存或磁盤中,并在以后匯總這些指標。這意味著每個處理的批次僅分析該批次中的數據記錄,而不會分析整個表。即使隨著數據大小的增長,這也可以使性能保持相對穩定,這在長時間運行的生產環境中很重要,因為生產環境需要在任意數量的數據上保持一致。
MLFlow還可以很好地跟蹤指標隨時間的演變,在我們的notebook中,我們跟蹤在foreachBatch代碼中分析的所有Deequ約束作為指標,并使用Delta的versionID和時間戳作為參數。在Databricks的notebook中,集成的MLFlow服務對于指標跟蹤特別方便。
通過使用Structured Streaming、Delta Lake和Deequ,我們能夠消除傳統情況下數據質量和速度之間的權衡,而專注于實現兩者的可接受水平。這里特別重要的是靈活性——不僅在如何處理不良記錄(隔離、報錯、告警等),而且在體系結構上(例如何時以及在何處執行檢查?)和生態上(如何使用我們的數據?)。開源技術(如Delta Lake、Structured Streaming和Deequ)是這種靈活性的關鍵。隨著技術的發展,能夠使用最新最、最強大的解決方案是提升其競爭優勢的驅動力。最重要的是,你的數據的速度和質量一定不能對立,而要保持一致,尤其是在流式數據處理越來越靠近核心業務運營時。很快,這將不會是一種選擇,而是一種期望和要求,我們正朝著這個未來方向一次一小步地不斷前進。
以上就是Databricks如何使用Spark Streaming和Delta Lake對流式數據進行數據質量監控,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。