中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink執行引擎中流批一體的示例分析

發布時間:2021-12-31 10:53:39 來源:億速云 閱讀:229 作者:小新 欄目:大數據

這篇文章主要為大家展示了“Flink執行引擎中流批一體的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Flink執行引擎中流批一體的示例分析”這篇文章吧。


                       

一、背景


                     

Flink執行引擎中流批一體的示例分析


                     
隨著互聯網和移動互聯網的不斷發展,各行各業都積累海量的業務數據。  而企業為了改善用戶體驗,提升產品在市場上的競爭力,都采取了實時化方式來處理大數據。  社交媒體的實時大屏、電商的實時推薦、城市大腦的實時交通預測、金融行業的實時反欺詐,這些產品的成功都在說明大數據處理的實時化已經成為一個勢不可擋的潮流。  

Flink執行引擎中流批一體的示例分析

在實時化的大趨勢下,Flink 已經成為實時計算行業的事實標準。我們看到,不光是阿里巴巴,國內外各個領域的頭部廠商,都把 Flink 做為實時計算的技術底座,國內有字節跳動、騰訊、華為,國外有 Netflix、Uber 等等。

而業務實時化只是一個起點,Flink 的目標之一就是給用戶提供實時離線一體化的用戶體驗。其實很多用戶不僅需要實時的數據統計,為了確認運營或產品的策略的效果,用戶同時還需要和歷史(昨天,甚至是去年的同期)數據比較。而從用戶的角度來看,原有的流、批獨立方案存在一些痛點:

  • 人力成本比較高 。由于流和批是兩套系統,相同的邏輯需要兩個團隊開發兩遍。
  • 數據鏈路冗余 。在很多的場景下,流和批計算內容其實是一致,但是由于是兩套系統,所以相同邏輯還是需要運行兩遍,產生一定的資源浪費。
  • 數據口徑不一致 。這個是用戶遇到的最重要的問題。兩套系統、兩套算子,兩套 UDF,一定會產生不同程度的誤差,這些誤差給業務方帶來了非常大的困擾。這些誤差不是簡單依靠人力或者資源的投入就可以解決的。

Flink執行引擎中流批一體的示例分析

2020 年的雙十一,在實時洪峰到達 40 億的歷史新高的同時,Flink 團隊與 DT 團隊一起推出了基于 Flink 的全鏈路流批一體的數倉架構,很好解決了 Lambda 的架構所帶來的一系列問題:流批作業使用同一 SQL,使研發效率提升了 3~4 倍;一套引擎確保了數據口徑天然一致;流批作業在同一集群運行,削峰填谷大幅提升了資源效率。

Flink 流批一體的成功,離不開 Flink 開源社區的健康蓬勃發展。從 Apache 軟件基金會 2020 年度報告可以看出,在反映開源社區繁榮情況的三個關鍵指標上 Flink 都名列前茅:用戶郵件列表活躍度,Flink 排名第一;開發者提交次數 Flink 排名第二,Github 用戶訪問量排名第二。這些數據并不局限于大數據領域,而是 Apache 開源基金會下屬的所有項目。

Flink執行引擎中流批一體的示例分析

2020 年也是 Blink 反哺社區的第二年,這兩年我們把 Blink 在集團內積累的經驗逐步貢獻回社區,讓 Flink 成為真正意義上的流批一體平臺。我希望通過這篇文章給大家分享下這兩年 Flink 在執行引擎流批融合方面做了哪些工作。同時也希望 Flink 的老用戶和新朋友可以進一步了解 Flink 流批一體架構的“前世今生”。
 

二、流批一體的分層架構

Flink執行引擎中流批一體的示例分析

總體來說,Flink 的核心引擎主要分為如下三層:

  • SDK 層 。Flink 的 SDK 主要有兩類,第一類是關系型 Relational SDK 也就是  SQL/Table,第二類是物理型 Physical SDK 也就是 DataStream。這兩類 SDK 都是流批統一,即不管是 SQL 還是 DataStream,用戶的業務邏輯只要開發一遍,就可以同時在流和批的兩種場景下使用;
  • 執行引擎層 。執行引擎提供了統一的 DAG,用來描述數據處理流程 Data Processing Pipeline(Logical Plan)。不管是流任務還是批任務,用戶的業務邏輯在執行前,都會先轉化為此 DAG 圖。執行引擎通過 Unified DAG Scheduler 把這個邏輯 DAG 轉化成在分布式環境下執行的Task。Task 之間通過 Shuffle 傳輸數據,我們通過 Pluggable Unified Shuffle 架構,同時支持流批兩種 Shuffle 方式;
  • 狀態存儲 。狀態存儲層負責存儲算子的狀態執行狀態。針對流作業有開源  RocksdbStatebackend、MemoryStatebackend,也有商業化的版本的GemniStateBackend;針對批作業我們在社區版本引入了 BatchStateBackend。
 
本文主要分享如下幾個方面的內容:

  1. 流批一體的 DataStream 介紹了如何通過流批一體的 DataStream 來解決 Flink SDK 當前面臨的挑戰;
  2. 流批一體的 DAG Scheduler 介紹了如何通過統一的 Pipeline Region 機制充分挖掘流式引擎的性能優勢;如何通過動態調整執行計劃的方式來改善引擎的易用性,提高系統的資源利用率;
  3. 流批一體的 Shuffle 架構介紹如何通過一套統一的 Shuffle 架構既可以滿足不同  Shuffle 在策略上的定制化需求,同時還能避免在共性需求上的重復開發;
  4. 流批一體的容錯策略介紹了如何通過統一的容錯策略既滿足批場景下容錯又可以提升流場景下的容錯效果。
 

三、流批一體 DataStream

SDK 分析以及面臨的挑戰


Flink執行引擎中流批一體的示例分析

如上圖所示,目前 Flink 提供的 SDK 主要有三類:

  1. Table/SQL 是一種 Relational 的高級 SDK,主要用在一些數據分析的場景中,既可以支持 Bounded 也可以支持 Unbounded 的輸入。由于 Table/SQL 是  Declarative 的,所以系統可以幫助用戶進行很多優化,例如根據用戶提供的Schema,可以進行 Filter Push Down 謂詞下推、按需反序列二進制數據等優化。目前 Table/SQL 可以支持 Batch 和 Streaming 兩種執行模式。[1]
  2. DataStream 屬于一種 Physical SDK。Relatinal SDK 功能雖然強大,但也存在一些局限:不支持對 State、Timer 的操作;由于 Optimizer 的升級,可能導致用相同的 SQL 在兩個版本中出現物理執行計劃不兼容的情況。而 DataStream SDK,既可以支持 State、Timer 維度 Low Level 的操作,同時由于 DataStream 是一種  Imperative SDK,所以對物理執行計劃有很好的“掌控力”,從而也不存在版本升級導致的不兼容。DataStream 目前在社區仍有很大用戶群,例如目前未 Closed 的 DataStream issue 依然有近 500 個左右。雖然 DataStream 即可以支持 Bounded  又可以支持 Unbounded Input 用 DataStream 寫的 Application,但是在 Flink-1.12 之前只支持 Streaming 的執行模式。
  3. DataSet 是一種僅支持 Bounded 輸入的 Physical SDK,會根據 Bounded 的特性對某些算子進行做一定的優化,但是不支持 EventTime 和 State 等操作。雖然  DataSet 是 Flink 提供最早的一種 SDK,但是隨著實時化和數據分析場景的不斷發展,相比于 DataStream 和 SQL,DataSet 在社區的影響力在逐步下降。
 
目前 Table/SQL 對于流批統一的場景支持已經比較成熟,但是對于 Phyiscal SDK 來說還面臨的一些挑戰,主要有兩個方面:

  1. 利用已有 Physical SDK 無法寫出一個真正生產可以用的流批一體的 Application。例如用戶寫一個程序用來處理 Kafka 中的實時數據,那么利用相同的程序來處理存儲在 OSS/S3/HDFS 上的歷史數據也是非常自然的事情。但是目前不管是 DataSet 還是 DataStream 都無法滿足用戶這個“簡單”的訴求。大家可能覺得奇怪,DataStream 不是既支持 Bounded 的 Input 又支持 Unbounded 的 Input,為什么還會有問題呢?其實“魔鬼藏在細節中”,我會在 Unified DataStream 這一節中會做進一的闡述。
  2. 學習和理解的成本比較高。隨著 Flink 不斷壯大,越來越多的新用戶加入 Flink 社區,但是對于這些新用戶來說就要學習兩種 Physical SDK。和其他引擎相比,用戶入門的學習成本是相對比較高的;兩種 SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對于用戶來說,理解兩套機制的門檻也不小;由于這兩 SDK 還不兼容,一個新用戶一旦選擇錯誤,將會面臨很大的切換成本。
 

Unified Physical SDK

 
Flink執行引擎中流批一體的示例分析

為了解決上述 Physical SDK 所面臨的挑戰,我們把 Unified DataStream SDK 作為  Flink 統一的 Physical SDK。這個部分主要解決兩個問題:

  1. 為什么選擇 DataStream 作為 Unified Physical SDK?

  2. Unified DataStream 比“老”的 DataStream 提供了哪些能力讓用戶可以寫出一個真正生產可以用的流批一體 Application?


為什么不是 Unified DataSet


為了解決學習和理解成本比較高的問題,最自然最簡單的方案就是從 DataStream 和  DataSet 中選擇一個作為 Flink 的唯一的 Physical SDK。那為什么我們選擇了  DataStream 而不是 DataSet 呢?主要有兩個原因:

  1. 用戶收益 。在前邊已經分析過,隨著 Flink 社區的發展,目前 DataSet 在社區的影響力逐漸下降。如果選擇使用 DataSet 作為 Unified Physical SDK,那么用戶之前在 DataStream 大量“投資”就會作廢。而選擇 DataStream,可以讓許多用戶的已有 DataStream “投資”得到額外的回報;
  2. 開發成本 。DataSet 過于古老,缺乏大量對于現代實時計算引擎基本概念的支持,例如 EventTime、Watermark、State、Unbounded Source 等。另外一個更深層的原因是現有 DataSet 算子的實現,在流的場景完全無法復用,例如 Join 等。而對于 DataStream 則不然,可以進行大量的復用。那么如何在流批兩種場景下復用  DataStream 的算子呢?
 

Unified DataStream


很多對 Flink 有一定了解的用戶可能會問:DataStream 是同時支持 Bounded/Unbounded 的輸入,為什么我們會說:用 DataStream 無法寫出一個真正生產可以用的流批一體 Application 呢?簡單來說,DataStream 原本主要設計目標是給 Unbounded 場景使用的,所以導致在 Bounded 的場景下在效率、可用性、易用性方面和傳統的批引擎還有一定距離。具體來說體現在如下兩個方面:

  • 效率

先給大家看一個例子,下邊是一個跑同樣規模的 WordCount 的作業,DataStream 和  DataSet 的性能對比圖。從這個例子可以看出,DataSet 的性能是 DataStream 將近 5  倍。

Flink執行引擎中流批一體的示例分析

很明顯,要讓 DataStream 在生產中既可以支持流的場景又要支持批的場景,就一定要大幅提高 DataStream 在 Bounded 場景下效率。那么為什么 DataStream 的效率要比  DataSet 的效率低呢?

前面我們已經提到,DataStream 原本主要設計目標是給  Unbounded 的場景下使用的,而 Unounded 場景下一個主要的特點就是亂序,也就是說任何一個 DataStream 的算子無法假設處理的 Record 是按照什么順序進行的,所以許多算子會用一個 K/V 存儲來緩存這些亂序的數據,等到合適的時候再從 K/V 存儲中取出這些數據進行處理并且進行輸出。一般情況下,算子對 K/V 存儲訪問涉及大量的序列化和反序列化,同時也會引發隨機磁盤 I/O;而在 DataSet 中,假設數據是有界的,也就是可以通過優化來避免隨機的磁盤 I/O 訪問,同時也對序列化和反序列化做了相關優化。這也是為什么用 DataSet 寫的 WorkerCount 要比用 DataStream 寫的 WordCount  快 5 倍主要原因。 

知道到了原因,是不是要把所有的 DataStream 的算子,都重寫一遍就可以了呢?理論上沒問題,但是 DataStream 有大量的算子需要重寫,有些算子還比較復雜,例如與  Window 相關的一系列算子。可以想象到,如果都全部重寫,工程量是非常之巨大的。所以我們通過單 Key 的 BatchStateBackend 幾乎完全避免了對所有算子重寫,同時還得到了非常不錯的效果。

  • 一致性
對于 Flink 有一定了解的同學應該都知道,原來用 DataStream 寫的 Application 都采用  Streaming 的執行模式,在這種模式下是通過 Checkpoint 的方式保持端到端的 Exactly Once 的語義,具體來說一個作業的 Sink 只有當全圖的所有算子(包括 Sink 自己)都做完各自的 Snapshot 之后,Sink 才會把數據 Commit 到外部系統中,這是一個典型的依賴  Flink Checkpoint 機制的 2PC 協議。
 
而在 Bounded 的場景下雖然也可以采用 Streaming 的方式但是對于用戶來說可能會存在一些問題:

  1. 資源消耗大 : 使用 Streaming 方式,需要同時拿到所有的資源。在某些情況下,用戶可能沒有這么多資源;
  2. 容錯成本高 : 在 Bounded 場景下,為了效率一些算子可能無法支持 Snapshot 操作,一旦出錯可能需要重新執行整個作業。 
 
所以在 Bounded 場景下,用戶希望 Application 可以采用 Batch 執行模式,因為利用  Batch 執行的模式可以非常自然的解決上述兩個問題。在 Bounded 場景下支持 Batch 的執行模式是比較簡單的,但是卻引入了一個非常棘手的問題——利用已有的 Sink API  無法保證端到端的 Exactly Once 語義。這是由于 Bounded 場景下是沒有 Checkpoint  的,而原有 Sink 都是依賴 Checkpoint 保證端到端的 ExactlyOnce 的。同時我們不希望開發者針對 Sink 在不同模式下開發兩套不同的實現,因為這樣非常不利用 Flink 和其他生態的對接。

實際上,一個 Transactional 的 Sink 主要解決如下 4 個問題:

  1. What to commit?
  2. How to commit?
  3. Where to commit?
  4. When to commit?
 
而 Flink 應該讓 Sink 開發者提供 What to commit 和 How to commit,而系統應該根據不同的執行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。最終我們提出了一個全新 Unified Sink API,從而讓開發者只開發一套 Sink 就可以同時運行在 Streaming 和 Batch 執行模式下。這里介紹的只是主要思路,在有限流的場景下如何保證 End to End 的一致性;如何對接 Hive、Iceberg 等外部生態,實際上還是存在一定挑戰。


四、流批一體 DAG Scheduler

Unified DAG Scheduler 要解決什么問題


原來 Flink 有兩種調度的模式:

  1. 一種是流的調度模式,在這種模式下,Scheduler 會申請到一個作業所需要的全部資源,然后同時調度這個作業的全部 Task,所有的 Task 之間采取 Pipeline 的方式進行通信。批作業也可以采取這種方式,并且在性能上也會有很大的提升。但是對于運行比較長的 Batch 作業來說來說,這種模式還是存在一定的問題:規模比較大的情況下,同時消耗的資源比較多,對于某些用戶來說,他可能沒有這么多的資源;容錯代價比較高,例如一旦發生錯誤,整個作業都需要重新運行。
  2. 一種是批的調度模式。這種模式和傳統的批引擎類似,所有 Task 都是可以獨立申請資源,Task 之間都是通過 Batch Shuffle 進行通訊。這種方式的好處是容錯代價比較小。但是這種運行方式也存在一些短板。例如,Task 之間的數據都是通過磁盤來進行交互,引發了大量的磁盤 IO。
 
總的來說,有了這兩種調度方式是可以基本滿足流批一體的場景需求,但是也存在著很大的改進空間,具體來說體現在三個方面:

  1. 架構不一致、維護成本高 。調度的本質就是進行資源的分配,換句話說就是要解決  When to deploy which tasks to where 的問題。原有兩種調度模式,在資源分配的時機和粒度上都有一定的差異,最終導致了調度架構上無法完全統一,需要開發人員維護兩套邏輯。例如,流的調度模式,資源分配的粒度是整個物理執行計劃的全部 Task;批的調度模式,資源分配的粒度是單個任務,當 Scheduler 拿到一個資源的時候,就需要根據作業類型走兩套不同的處理邏輯;
  2. 性能 。傳統的批調度方式,雖然容錯代價比較小,但是引入大量的磁盤 I/O,并且性能也不是最佳,無法發揮出 Flink 流式引擎的優勢。實際上在資源相對充足的場景下,可以采取“流”的調度方式來運行 Batch 作業,從而避免額外的磁盤 I/O,提高作業的執行效率。尤其是在夜間,流作業可以釋放出一定資源,這就為批作業按照“Streaming”的方式運行提供了可能。
  3. 自適應 。目前兩種調度方式的物理執行計劃是靜態的,靜態生成物理執行計劃存在調優人力成本高、資源利用率低等問題。
 

基于 Pipeline Region 的統一調度


Flink執行引擎中流批一體的示例分析

為了既能發揮流引擎的優勢,同時避免全圖同時調度存在的一些短板,我們引入  Pipeline Region 的概念。Unified DAG Scheduler 允許在一個 DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進行通訊。這些由 Pipeline 的數據交換方式連接的 Task 被稱為一個 Pipeline Region。基于以上概念,Flink 引入 Pipeline Region 的概念,不管是流作業還是批作業,都是按照 Pipeline Region 粒度來申請資源和調度任務。細心的讀者可以發現,其實原有的兩種模式都是 Pipeline Region 調度的特例。

Flink執行引擎中流批一體的示例分析

即便可以資源上滿足“流”的調度模式,那么哪些任務可以采取“流”的方式調度呢?

有同學還是會擔心采取“流”的調度方式容錯代價會比較高,因為在“流”的調度方式下,一個 Task 發生錯誤,和他聯通的所有 Task 都會 Fail,然后重新運行。

在 Flink 中,不同 Task 之間有兩種連接方式[2],一種是 All-to-All 的連接方式,上游 Task 會和下游的所有的 Task 進行連接;一種是 PointWise 的鏈接方式,上游的 Task 只會和下游的部分 Task 進行連接。

如果一個作業的所有 Task 之間都是通過 All-to-All 方式進行連接,一旦采取“流”的調度方式,那么整個物理拓撲都需要同時被調度,那么確實存在  FailOver 代價比較高的問題[3]。但是在實際 Batch 作業的拓撲中,Task 之間不都是通過 All-to-All 的邊連接,Batch 作業中存在的大量 Task 通過 PointWise 的邊連接,通過“流”的方式調度由 PointWise 連接的 Task 連通圖,在減少作業的容錯成本的同時,可以提高作業的執行效率,如下圖所示在,在全量的 10T TPC-DS 測試中,開啟所有  PointWise 邊都采用 Pipeline 的鏈接方式就可以讓整性能有 20% 以上的性能提升。

Flink執行引擎中流批一體的示例分析

上述只是 Schduler 提供的劃分 Pipeline Region 的 4 種策略中的一種[4],實際上  Planner 可以根據實際運行場景,定制哪些 Task 之間采取 Pipeline 的傳輸方式,哪些  Task 之間采取 Batch 的傳輸方式方式。

自適應調度

 
調度的本質是給物理執行計劃進行資源分配的決策過程。Pipeline Region 解決了物理執行計劃確定之后,流作業和批作業可以統一按照 Pipeline Region 的粒度進行調度。對于批作業來說靜態生成物理執行計劃存在一些問題[5]:

  1. 配置人力成本高 。對于批作業來說,雖然理論上可以根據統計信息推斷出物理執行計劃中每個階段的并發度,但是由于存在大量的 UDF 或者統計信息的缺失等問題,導致靜態決策結果可能會出現嚴重不準確的情況;為了保障業務作業的 SLA,在大促期間,業務的同學需要根據大促的流量估計,手動調整高優批作業的并發度,由于業務變化快,一旦業務邏輯發生變化,又要不斷的重復這個過程。整個調優過程都需要業務的同學手動操作,人力成本比較高,即便這樣也可能會出現誤判的情況導致無法滿足用戶 SLA;
  2. 資源利用率低 。由于人工配置并發度成本比較高,所以不可能對所有的作業都手動配置并發度。對于中低優先級的作業,業務同學會選取一些默認值作為并發度,但是在大多數情況下這些默認值都偏大,造成資源的浪費;而且雖然高優先級的作業可以進行手工并發配置,由于配置方式比較繁瑣,所以大促過后,雖然流量已經下降但是業務方仍然會使用大促期間的配置,也造成大量的資源浪費現象;
  3. 穩定性差 。資源浪費的情況最終導致資源的超額申請現象。目前大多數批作業都是采取和流作業集群混跑的方式,具體來說申請的資源都是非保障資源,一旦資源緊張或者出現機器熱點,這些非保障資源都是優先被調整的對象。

Flink執行引擎中流批一體的示例分析

為了解決靜態生成物理執行存在這些問題,我們為批作業引入了自適應調度功能[6],和原來的靜態物理執行計劃相比,利用這個特性可以大幅提高用戶資源利用率。 Adaptive Scheduler 可以根據一個 JobVertex 的上游 JobVertex 的執行情況,動態決定當前 JobVertex 的并發度。在未來,我們也可以根據上游 JobVertex 產出的數據,動態決定下游采用什么樣的算子。

五、流批一體的 Shuffle 架構

Flink 是一個流批一體的平臺,因此引擎對于不同的執行模式要分別提供 Streaming 和Batch 兩種類型的 Shuffle。雖然 Streaming Shuffle 和 Batch Shuffle 在具體的策略上存在一定的差異,但是本質上都是為了對數據進行重新劃分(re-partition),因此不同的  Shuffle 之間還存在一定的共性。所以我們的目標是提供一套統一的 Shuffle 架構,既可以滿足不同 Shuffle 在策略上的定制,同時還能避免在共性需求上進行重復開發。

總體來說,Shuffle 架構可以劃分成如下圖所示的四個層次。流和批的 Shuffle 需求在各層上有一定差異,也有大量的共性,下邊我做了一些簡要分析。

Flink執行引擎中流批一體的示例分析

流批 Shuffle 之間的差異


大家都知道,批作業和流作業對 Shuffle 的需求是有差異的,具體可以體現在如下 3 個方面:

  1. Shuffle 數據的生命周期 。流作業的 Shuffle 數據和 Task 的生命周期基本是一致的;而批作業的 Shuffle 數據和 Task 生命周期是解耦的;
  2. Shuffle 數據的存儲介質 。因為流作業的 Shuffle 數據生命周期比較短,所以可以把流作業的 Shuffle 數據存儲在內存中;而批作業的 Shuffle 數據生命周期有一定的不確定性,所以需要把批作業的 Shuffle 數據存儲在磁盤中;
  3. Shuffle 部署方式 [7]。把 Shuffle 服務和計算節點部署在一起,對流作業來說這種部署方式是有優勢的,因為這樣會減少不必要網絡開銷,從而減少 Latency。但對于批作業來說,這種部署方式在資源利用率、性能、穩定性上都存在一定的問題。[8]

流批 Shuffle 之間的共性


批作業和流作業的 Shuffle 有差異也有共性,共性主要體現在:

  1. 數據的 Meta 管理 。所謂 Shuffle Meta 是指邏輯數據劃分到數據物理位置的映射。不管是流還是批的場景,在正常情況下都需要從 Meta 中找出自己的讀取或者寫入數據的物理位置;在異常情況下,為了減少容錯代價,通常也會對 Shuffle Meta 數據進行持久化;
  2. 數據傳輸 。從邏輯上講,流作業和批作業的 Shuffle 都是為了對數據進行重新劃分(re-partition/re-distribution)。在分布式系統中,對數據的重新劃分都涉及到跨線程、進程、機器的數據傳輸。

流批一體的 Shuffle 架構

 
Flink執行引擎中流批一體的示例分析
 
Unified Shuffle 架構抽象出三個組件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個組件交互完成算子間的數據的重新劃分。通過這三個組件可以滿足不同Shuffle插件在具體策略上的差異:

  1. Shuffle Master 資源申請和資源釋放。也就是說插件需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
  2. Shuffle Writer 上游的算子利用 Writer 把數據寫入 Shuffle Service——Streaming Shuffle 會把數據寫入內存;External/Remote Batch Shuffle 可以把數據寫入到外部存儲中;
  3. Shuffle Reader 下游的算子可以通過 Reader 讀取 Shuffle 數據;
 
同時,我們也為流批 Shuffle 的共性——Meta 管理、數據傳輸、服務部署[10]——提供了架構層面的支持,從而避免對復雜組件的重復開發。高效穩定的數據傳輸,是分布式系統最復雜的子系統之一,例如在傳輸中都要解決上下游反壓、數據壓縮、內存零拷貝等問題,在新的架構中只要開發一遍,就可以同時在流和批兩種場景下共同使用,大大減少了開發和維護的成本。


六、流批一體的容錯策略

 

Flink 原有容錯策略是以檢查點為前提的,具體來說無論是單個 Task 出現失敗還是JobMaster 失敗,Flink 都會按照最近的檢查點重啟整個作業。雖然這種策略存在一定的優化空間,但是總的來說對于流的場景是基本是接受的。目前,Flink Batch 運行模式下不會開啟檢查點[11],這也意味一旦出現任何錯誤,整個作業都要從頭執行。

雖然原有策略在理論上可以保證最終一定會產出正確的結果,但是明顯大多數客戶都無法接受這種容錯策略所付出的代價。為了解決這些問題,我們分別對 Task 和 JM 的容錯都做了相應的改進。

Pipeline Region Failover


雖然在 Batch 執行模式下沒有定時的 Checkpoint,但是在 Batch 執行模式下,Flink允許 Task 之間通過 Blocking Shuffle 進行通信。對于讀取 Blocking Shuffle 的 Task 發生失敗之后,由于 Blocking Shuffle 中存儲了這個 Task 所需要的全部數據,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可,而不需要重啟整個作業。

總的來說,Pipeline Region Failover 策略和 Scheduler 在進行正常調度的時候一樣,都是把一個 DAG 拆分成由若干 Pipeline shuffle 連接的一些 Pipeline Region,每當一個 Task 發生 FailOver 的時候,只會重啟這個 Task 所在的 Region 即可。

Flink執行引擎中流批一體的示例分析

JM Failover


JM 是一個作業的控制中心,包含了作業的各種執行狀態。Flink 利用這些狀態對任務進行調度和部署。一旦 JM 發生錯誤之后,這些狀態將會全部丟失。如果沒有這些信息,即便所有的工作節點都沒有發生故障,新 JM 仍然無法繼續調度原來的作業。例如,由于任務的結束信息都已丟失,一個任務結束之后,新 JM 無法判斷現有的狀態是否滿足調度下游任務的條件——所有的輸入數據都已經產生。

從上邊的分析可以看出,JM Failover 的關鍵就是如何讓一個 JM“恢復記憶”。在 VVR[12] 中我們通過基于 Operation Log 機制恢復 JM 的關鍵狀態。

Flink執行引擎中流批一體的示例分析

細心的同學可能已經發現了,雖然這兩個改進的出發點是為了批的場景,但是實際上對于流的作業容也同樣有效。上邊只是簡要的介紹了兩種容錯策略的思路,實際上還有很多值得思考的內容。例如 Blocking 上游數據丟失了我們應該如何處理?JM 中有哪些關鍵的狀態需要恢復?
以上是“Flink執行引擎中流批一體的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

长岭县| 庆城县| 义马市| 呼图壁县| 离岛区| 商河县| 雷波县| 子长县| 阿拉尔市| 鄂尔多斯市| 桦南县| 五常市| 木里| 沾益县| 平塘县| 陆河县| 福清市| 永济市| 南江县| 登封市| 阿克苏市| 都兰县| 林甸县| 泾川县| 乌兰察布市| 鄂伦春自治旗| 芷江| 伊春市| 光泽县| 南涧| 宁海县| 驻马店市| 石景山区| 高邮市| 西平县| 吴江市| 南漳县| 寻乌县| 丹阳市| 宁河县| 宜宾市|