中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行Flink 1.11 Unaligned Checkpoint 解析

發布時間:2021-12-09 11:12:43 來源:億速云 閱讀:119 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何進行Flink 1.11 Unaligned Checkpoint 解析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

作為 Flink 最基礎也是最關鍵的容錯機制,Checkpoint 快照機制很好地保證了 Flink 應用從異常狀態恢復后的數據準確性。同時 Checkpoint 相關的 metrics 也是診斷 Flink 應用健康狀態最為重要的指標,成功且耗時較短的 Checkpoint 表明作業運行狀況良好,沒有異常或反壓。  然而,由于 Checkpoint 與反壓的耦合,反壓反過來也會作用于 Checkpoint,導致 Checkpoint 的種種問題。

針對于此,Flink 在 1.11 引入 Unaligned Checkpint 來解耦 Checkpoint 機制與反壓機制,優化高反壓情況下的 Checkpoint 表現。

當前 Checkpoint 機制簡述

相信不少讀者對 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已經比較熟悉,該節簡單回顧下算法的基礎邏輯,熟悉算法的讀者可放心跳過。

Chandy-Lamport 算法將分布式系統抽象成 DAG(暫時不考慮有閉環的圖),節點表示進程,邊表示兩個進程間通信的管道。分布式快照的目的是記錄下整個系統的狀態,即可以分為節點的狀態(進程的狀態)和邊的狀態(信道的狀態,即傳輸中的數據)。因為系統狀態是由輸入的消息序列驅動變化的,我們可以將輸入的消息序列分為多個較短的子序列,圖的每個節點或邊先后處理完某個子序列后,都會進入同一個穩定的全局統狀態。利用這個特性,系統的進程和信道在子序列的邊界點分別進行本地快照,即使各部分的快照時間點不同,最終也可以組合成一個有意義的全局快照。

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖1. Checkpoint Barrier  

從實現上看,Flink 通過在 DAG 數據源定時向數據流注入名為 Barrier 的特殊元素,將連續的數據流切分為多個有限序列,對應多個 Checkpoint 周期。每當接收到 Barrier,算子進行本地的 Checkpoint 快照,并在完成后異步上傳本地快照,同時將 Barrier 以廣播方式發送至下游。當某個 Checkpoint 的所有 Barrier 到達 DAG 末端且所有算子完成快照,則標志著全局快照的成功。

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖2. Barrier Alignment

在有多個輸入 Channel 的情況下,為了數據準確性,算子會等待所有流的 Barrier 都到達之后才會開始本地的快照,這種機制被稱為 Barrier 對齊。在對齊的過程中,算子只會繼續處理的來自未出現 Barrier Channel 的數據,而其余 Channel 的數據會被寫入輸入隊列,直至在隊列滿后被阻塞。當所有 Barrier 到達后,算子進行本地快照,輸出 Barrier 到下游并恢復正常處理。

比起其他分布式快照,該算法的優勢在于輔以 Copy-On-Write 技術的情況下不需要 “Stop The World” 影響應用吞吐量,同時基本不用持久化處理中的數據,只用保存進程的狀態信息,大大減小了快照的大小。

Checkpoint 與反壓的耦合

目前的 Checkpoint 算法在大多數情況下運行良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導致作業的不穩定。

首先, Chandy-Lamport 分布式快照的結束依賴于 Marker 的流動,而反壓則會限制 Marker 的流動,導致快照的完成時間變長甚至超時。無論是哪種情況,都會導致 Checkpoint 的時間點落后于實際數據流較多。這時作業的計算進度是沒有被持久化的,處于一個比較脆弱的狀態,如果作業出于異常被動重啟或者被用戶主動重啟,作業會回滾丟失一定的進度。如果 Checkpoint 連續超時且沒有很好的監控,回滾丟失的進度可能高達一天以上,對于實時業務這通常是不可接受的。更糟糕的是,回滾后的作業落后的 Lag 更大,通常帶來更大的反壓,形成一個惡性循環。

其次,Barrier 對齊本身可能成為一個反壓的源頭,影響上游算子的效率,而這在某些情況下是不必要的。比如典型的情況是一個的作業讀取多個 Source,分別進行不同的聚合計算,然后將計算完的結果分別寫入不同的 Sink。通常來說,這些不同的 Sink 會復用公共的算子以減少重復計算,但并不希望不同 Source 間相互影響。

如何進行Flink 1.11 Unaligned Checkpoint 解析圖3. Barrier Alignment 阻塞上游 Task


假設一個作業要分別統計 A 和 B 兩個業務線的以天為粒度指標,同時還需要統計所有業務線以周為單位的指標,拓撲如上圖所示。如果 B 業務線某天的業務量突漲,使得 Checkpoint Barrier 有延遲,那么會導致公用的 Window Aggregate 進行 Barrier 對齊,進而阻塞業務 A 的 FlatMap,最終令業務 A 的計算也出現延遲。

當然這種情況可以通過拆分作業等方式優化,但難免引入更多開發維護成本,而且更重要的是這本來就符合 Flink 用戶常規的開發思路,應該在框架內盡量減小出現用戶意料之外的行為的可能性。

Unaligned Checkpoint

為了解決這個問題,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 論文中對于 Marker 處理規則的描述:

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖4. Chandy-Lamport Marker 處理


其中關鍵是 if q has not recorded its state,也就是接收到 Marker 時算子是否已經進行過本地快照。一直以來 Flink 的 Aligned Checkpoint 通過 Barrier 對齊,將本地快照延遲至所有 Barrier 到達,因而這個條件是永真的,從而巧妙地避免了對算子輸入隊列的狀態進行快照,但代價是比較不可控的 Checkpoint 時長和吞吐量的降低。實際上這和 Chandy-Lamport 算法是有一定出入的。

舉個例子,假設我們對兩個數據流進行 equal-join,輸出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系統的狀態變化如下(圖中不同顏色的元素代表屬于不同的 Checkpoint 周期):

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖5. Aligned Checkpoint 狀態變化

  • 圖 a: 輸入 Channel 1 存在 3 個元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個元素,其中 2、9、7在 Barrier 前面。
  • 圖 b: 算子分別讀取 Channel 一個元素,輸出 2。隨后接收到 Channel 1 的 Barrier,停止處理 Channel 1 后續的數據,只處理 Channel 2 的數據。
  • 圖 c: 算子再消費 2 個自 Channel 2 的元素,接收到 Barrier,開始本地快照并輸出 Barrier。

對于相同的情況,Chandy-Lamport 算法的狀態變化如下:

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖6. Chandy-Lamport 狀態變化

  • 圖 a: 同上。
  • 圖 b: 算子分別處理兩個 Channel 一個元素,輸出結果 2。此后接收到 Channel 1 的 Barrier,算子開始本地快照記錄自己的狀態,并輸出 Barrier。
  • 圖 c: 算子繼續正常處理兩個 Channel 的輸入,輸出 9。特別的地方是 Channel 2 后續元素會被保存下來,直到 Channel 2 的 Barrier 出現(即 Channel 2 的 9 和 7)。保存的數據會作為 Channel 的狀態成為快照的一部分。

兩者的差異主要可以總結為兩點:

  1. 快照的觸發是在接收到第一個 Barrier 時還是在接收到最后一個 Barrier 時。
  2. 是否需要阻塞已經接收到 Barrier 的 Channel 的計算。

從這兩點來看,新的 Unaligned Checkpoint 將快照的觸發改為第一個 Barrier 且取消阻塞 Channel 的計算,算法上與 Chandy-Lamport 基本一致,同時在實現細節方面結合 Flink 的定位做了幾個改進。

首先,不同于 Chandy-Lamport 模型的只需要考慮算子輸入 Channel 的狀態,Flink 的算子有輸入和輸出兩種 Channel,在快照時兩者的狀態都需要被考慮。

其次,無論在 Chandy-Lamport 還是 Flink Aligned Checkpoint 算法中,Barrier 都必須遵循其在數據流中的位置,算子需要等待 Barrier 被實際處理才開始快照。而 Unaligned Checkpoint 改變了這個設定,允許算子優先攝入并優先輸出 Barrier。如此一來,第一個到達 Barrier 會在算子的緩存數據隊列(包括輸入 Channel 和輸出 Channel)中往前跳躍一段距離,而被”插隊”的數據和其他輸入 Channel 在其 Barrier 之前的數據會被寫入快照中(圖中黃色部分)。

如何進行Flink 1.11 Unaligned Checkpoint 解析

圖7. Barrier 越過數據

這樣的主要好處是,如果本身算子的處理就是瓶頸,Chandy-Lamport 的 Barrier 仍會被阻塞,但 Unaligned Checkpoint 則可以在 Barrier 進入輸入 Channel 就馬上開始快照。這可以從很大程度上加快 Barrier 流經整個 DAG 的速度,從而降低 Checkpoint 整體時長。

回到之前的例子,用 Unaligned Checkpoint 來實現,狀態變化如下:

如何進行Flink 1.11 Unaligned Checkpoint 解析圖8. Unaligned-Checkpoint 狀態變化


  • 圖 a: 輸入 Channel 1 存在 3 個元素,其中 2 在 Barrier 前面;Channel 2 存在 4 個元素,其中 2、9、7在 Barrier 前面。輸出 Channel 已存在結果數據 1。
  • 圖 b: 算子優先處理輸入 Channel 1 的 Barrier,開始本地快照記錄自己的狀態,并將 Barrier 插到輸出 Channel 末端。
  • 圖 c: 算子繼續正常處理兩個 Channel 的輸入,輸出 2、9。同時算子會將 Barrier 越過的數據(即輸入 Channel 1 的 2 和輸出 Channel 的 1)寫入 Checkpoint,并將輸入 Channel 2 后續早于 Barrier 的數據(即 2、9、7)持續寫入 Checkpoint。

比起 Aligned Checkpoint 中不同 Checkpoint 周期的數據以算子快照為界限分隔得很清晰,Unaligned Checkpoint 進行快照和輸出 Barrier 時,部分本屬于當前 Checkpoint 的輸入數據還未計算(因此未反映到當前算子狀態中),而部分屬于當前 Checkpoint 的輸出數據卻落到 Barrier 之后(因此未反映到下游算子的狀態中)。

這也正是 Unaligned 的含義: 不同 Checkpoint 周期的數據沒有對齊,包括不同輸入 Channel 之間的不對齊,以及輸入和輸出間的不對齊。而這部分不對齊的數據會被快照記錄下來,以在恢復狀態時重放。換句話說,從 Checkpoint 恢復時,不對齊的數據并不能由 Source 端重放的數據計算得出,同時也沒有反映到算子狀態中,但因為它們會被 Checkpoint 恢復到對應 Channel 中,所以依然能提供只計算一次的準確結果。

當然,Unaligned Checkpoint 并不是百分百優于 Aligned Checkpoint,它會帶來的已知問題就有:

  1. 由于要持久化緩存數據,State Size 會有比較大的增長,磁盤負載會加重。
  2. 隨著 State Size 增長,作業恢復時間可能增長,運維管理難度增加。

目前看來,Unaligned Checkpoint 更適合容易產生高反壓同時又比較重要的復雜作業。對于像數據 ETL 同步等簡單作業,更輕量級的 Aligned Checkpoint 顯然是更好的選擇。
Flink 1.11 的 Unaligned Checkpoint 主要解決在高反壓情況下作業難以完成 Checkpoint 的問題,同時它以磁盤資源為代價,避免了 Checkpoint 可能帶來的阻塞,有利于提升 Flink 的資源利用率。

看完上述內容,你們對如何進行Flink 1.11 Unaligned Checkpoint 解析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

汤原县| 六枝特区| 游戏| 缙云县| 北安市| 台北县| 兴宁市| 嘉善县| 临漳县| 滕州市| 孟连| 溧阳市| 南开区| 驻马店市| 工布江达县| 东兰县| 刚察县| 逊克县| 新泰市| 淮阳县| 甘孜县| 措美县| 德令哈市| 天峻县| 乐至县| 河西区| 互助| 永福县| 邵东县| 日照市| 大埔区| 福贡县| 民县| 高阳县| 威远县| 高邮市| 布尔津县| 开江县| 瑞安市| 盐源县| 连山|