中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

發布時間:2020-08-08 10:56:38 來源:ITPUB博客 閱讀:311 作者:芊寶寶最可愛 欄目:MySQL數據庫
導讀:當今生活節奏日益加快,企業面對不斷增加的海量信息,其信息篩選和處理效率低下的困擾與日俱增。由于用戶營銷不夠細化,企業 App 中許多不合時宜或不合偏好的消息推送很大程度上影響了用戶體驗,甚至引發了用戶流失。在此背景下,友信金服公司推行全域的數據體系戰略,通過打通和整合集團各個業務線數據,利用大數據、人工智能等技術構建統一的數據資產,如 ID-Mapping、用戶標簽等。友信金服用戶畫像項目正是以此為背景成立,旨在實現“數據驅動業務與運營”的集團戰略。目前該系統支持日處理數據量超 10 億,接入上百種合規數據源。

一、技術選型

傳統基于 Hadoop 生態的離線數據存儲計算方案已在業界大規模應用,但受制于離線計算的高時延性,越來越多的數據應用場景已從離線轉為實時。這里引用一張表格對目前主流的實時計算框架做個對比。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

Apache Storm 的容錯機制需要對每條數據進行應答(ACK),因此其吞吐量備受影響,在數據大吞吐量的場景下會有問題,因此不適用此項目的需求。

Apache Spark 總體生態更為完善,且在機器學習的集成和應用性暫時領先,但 Spark 底層還是采用微批(Micro Batching)處理的形式。

Apache Flink 在流式計算上有明顯優勢:首先其流式計算屬于真正意義上的單條處理,即每一條數據都會觸發計算。在這一點上明顯與 Spark 的微批流式處理方式不同。其次,Flink 的容錯機制較為輕量,對吞吐量影響較小,使得 Flink 可以達到很高的吞吐量。最后 Flink 還擁有易用性高,部署簡單等優勢。相比之下我們最終決定采用基于 Flink 的架構方案。

二、用戶畫像業務架構

用戶畫像系統目前為集團線上業務提供用戶實時標簽數據服務。為此我們的服務需要打通多種數據源,對海量的數字信息進行實時不間斷的數據清洗、聚類、分析,從而將它們抽象成標簽,并最終為應用方提供高質量的標簽服務。在此背景下,我們設計用戶畫像系統的整體架構如下圖所示:

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

整體架構分為五層:

  1. 接入層:接入原始數據并對其進行處理,如 Kafka、Hive、文件等。
  2. 計算層:選用 Flink 作為實時計算框架,對實時數據進行清洗,關聯等操作。
  3. 存儲層:對清洗完成的數據進行數據存儲,我們對此進行了實時用戶畫像的模型分層與構建,將不同應用場景的數據分別存儲在如 Phoenix,HBase,HDFS,Kafka 等。
  4. 服務層:對外提供統一的數據查詢服務,支持從底層明細數據到聚合層數據的多維計算服務。
  5. 應用層:以統一查詢服務對各個業務線數據場景進行支撐。目前業務主要包含用戶興趣分、用戶質量分、用戶的事實信息等數據。

三、用戶畫像數據處理流程

在整體架構設計方案設計完成之后,我們針對數據也設計了詳盡的處理方案。在數據處理階段,鑒于 Kafka 高吞吐量、高穩定性的特點,我們的用戶畫像系統統一采用 Kafka 作為分布式發布訂閱消息系統。數據清洗階段利用 Flink 來實現用戶唯一性識別、行為數據的清洗等,去除冗余數據。這一過程支持交互計算和多種復雜算法,并支持數據實時 / 離線計算。目前我們數據處理流程迭代了兩版,具體方案如下:

1.0 版數據處理流程

數據接入、計算、存儲三層處理流程

整體數據來源包含兩種:

  1. 歷史數據:從外部數據源接入的海量歷史業務數據。接入后經過 ETL 處理,進入用戶畫像底層數據表。
  2. 實時數據:從外部數據源接入的實時業務數據,如用戶行為埋點數據,風控數據等。

根據不同業務的指標需求我們直接從集團數據倉庫抽取數據并落入 Kafka,或者直接從業務端以 CDC(Capture Data Change)的方式寫入 Kafka。在計算層,數據被導入到 Flink 中,通過 DataStream 生成 ID-Mapping、用戶標簽碎片等數據,然后將生成數據存入 JanusGraph(JanusGraph 是以 HBase 作為后端存儲的圖數據庫介質)與 Kafka,并由 Flink 消費落入 Kafka 的用戶標簽碎片數據,進行聚合生成最新的用戶標簽碎片(用戶標簽碎片是由用戶畫像系統獲取來自多種渠道的碎片化數據塊處理后生成的)。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

數據服務層處理流程

服務層將存儲層存儲的用戶標簽碎片數據,通過 JanusGraph Spark On Yarn 模式,執行 TinkerPop OLAP 計算生成全量用戶 Yids 列表文件。Yid 是用戶畫像系統中定義的集團級用戶 ID 標識。結合 Yids 列表文件,在 Flink 中批量讀取 HBase 聚合成完整用戶畫像數據,生成 HDFS 文件,再通過 Flink 批量操作新生成的數據生成用戶評分預測標簽,將用戶評分預測標簽落入 Phoenix,之后數據便可通過統一數據服務接口進行獲取。下圖完整地展示了這一流程。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

ID-Mapping 數據結構

為了實現用戶標簽的整合,用戶 ID 之間的強打通,我們將用戶 ID 標識看成圖的頂點、ID pair 關系看作圖的邊,比如已經識別瀏覽器 Cookie 的用戶使用手機號登陸了公司網站就形成了對應關系。這樣所有用戶 ID 標識就構成了一張大圖,其中每個小的連通子圖 / 連通分支就是一個用戶的全部標識 ID 信息。

ID-Mapping 數據由圖結構模型構建,圖節點包含 UserKey、Device、IdCard、Phone 等類型,分別表示用戶的業務 ID、設備 ID、身份證以及電話等信息。節點之間邊的生成規則是通過解析數據流中包含的節點信息,以一定的優先級順序進行節點之間的連接,從而生成節點之間的邊。比如,識別了用戶手機系統的 Android_ID,之后用戶使用郵箱登陸了公司 App,在系統中找到了業務線 UID 就形成了和關系的 ID pair,然后系統根據節點類型進行優先級排序,生成 Android_ID、mail、UID 的關系圖。數據圖結構模型如下圖所示:

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

Gephi

1.0 版本數據處理流程性能瓶頸

1.0 版本數據處理流程在系統初期較好地滿足了我們的日常需求,但隨著數據量的增長,該方案遇到了一些性能瓶頸:

  1. 首先,這版的數據處理使用了自研的 Java 程序來實現。隨著數據量上漲,自研 JAVA 程序由于數據量暴增導致 JVM 內存大小不可控,同時它的維護成本很高,因此我們決定在新版本中將處理邏輯全部遷移至 Flink 中。
  2. 其次,在生成用戶標簽過程中,ID-Mapping 出現很多大的連通子圖(如下圖所示)。這通常是因為用戶的行為數據比較隨機離散,導致部分節點間連接混亂。這不僅增加了數據的維護難度,也導致部分數據被“污染”。另外這類異常大的子圖會嚴重降低 JanusGraph 與 HBase 的查詢性能。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

Gephi

  1. 最后,該版方案中數據經 Protocol Buffer(PB)序列化之后存入 HBase,這會導致合并 / 更新用戶畫像標簽碎片的次數過多,使得一個標簽需要讀取多次 JanusGraph 與 HBase,這無疑會加重 HBase 讀取壓力。此外,由于數據經過了 PB 序列化,使得其原始存儲格式不可讀,增加了排查問題的難度。

鑒于這些問題,我們提出了 2.0 版本的解決方案。在 2.0 版本中,我們通過利用 HBase 列式存儲、修改圖數據結構等優化方案嘗試解決以上三個問題。

2.0 版數據處理流程

版本流程優化點

如下圖所示,2.0 版本數據處理流程大部分承襲了 1.0 版本。新版本數據處理流程在以下幾個方面做了優化:

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

2.0 版本數據處理流程

  1. 歷史數據的離線補錄方式由 JAVA 服務變更為使用 Flink 實現。
  2. 優化用戶畫像圖數據結構模型,主要是對邊的連接方式進行了修改。之前我們會判斷節點的類型并根據預設的優先級順序將多個節點進行連接,新方案則采用以 UserKey 為中心的連接方式。做此修改后,之前的大的連通子圖(圖 6)優化為下面的小的連通子圖(圖 8),同時解決了數據污染問題,保證了數據準確性。另外,1.0 版本中一條數據需要平均讀取十多次 HBase 的情況也得到極大緩解。采用新方案之后平均一條數據只需讀取三次 HBase,從而降低 HBase 六七倍的讀取壓力(此處優化是數據計算層優化)。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

Gephi

  1. 舊版本是用 Protocol Buffer 作為用戶畫像數據的存儲對象,生成用戶畫像數據后作為一個列整體存入 HBase。新版本使用 Map 存儲用戶畫像標簽數據,Map 的每對 KV 都是單獨的標簽,KV 在存入 HBase 后也是單獨的列。新版本存儲模式利用 HBase 做列的擴展與合并,直接生成完整用戶畫像數據,去掉 Flink 合并 / 更新用戶畫像標簽過程,優化數據加工流程。使用此方案后,存入 HBase 的標簽數據具備了即席查詢功能。數據具備即席查詢是指在 HBase 中可用特定條件直接查看指定標簽數據詳情的功能,它是數據治理可以實現校驗數據質量、數據生命周期、數據安全等功能的基礎條件。
  2. 在數據服務層,我們利用 Flink 批量讀取 HBase 的 Hive 外部表生成用戶質量分等數據,之后將其存入 Phoenix。相比于舊方案中 Spark 全量讀 HBase 導致其讀壓力過大,從而會出現集群節點宕機的問題,新方案能夠有效地降低 HBase 的讀取壓力。經過我們線上驗證,新方案對 HBase 的讀負載下降了數十倍(此處優化與 2 優化不同,屬于服務層優化)。

四、問題

目前,線上部署的用戶畫像系統中的數據絕大部分是來自于 Kafka 的實時數據。隨著數據量越來越多,系統的壓力也越來越大,以至于出現了 Flink 背壓與 Checkpoint 超時等問題,導致 Flink 提交 Kafka 位移失敗,從而影響了數據一致性。這些線上出現的問題讓我們開始關注 Flink 的可靠性、穩定性以及性能。針對這些問題,我們進行了詳細的分析并結合自身的業務特點,探索并實踐出了一些相應的解決方案。

CheckPointing 流程分析與性能優化方案

CheckPointing 流程分析

下圖展示了 Flink 中 checkpointing 執行流程圖:

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

Flink 中 checkpointing 執行流程

  1. Coordinator 向所有 Source 節點發出 Barrier。
  2. Task 從輸入中收到所有 Barrier 后,將自己的狀態寫入持久化存儲中,并向自己的下游繼續傳遞 Barrier。
  3. 當 Task 完成狀態持久化之后將存儲后的狀態地址通知到 Coordinator。
  4. 當 Coordinator 匯總所有 Task 的狀態,并將這些數據的存放路徑寫入持久化存儲中,完成 CheckPointing。

性能優化方案

通過以上流程分析,我們通過三種方式來提高 Checkpointing 性能。這些方案分別是:

  1. 選擇合適的 Checkpoint 存儲方式
  2. 合理增加算子(Task)并行度
  3. 縮短算子鏈(Operator Chains)長度

選擇合適的 Checkpoint 存儲方式

CheckPoint 存儲方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,MemoryStateBackend 適合應用于測試環境,線上環境則最好選擇 RocksDBStateBackend。

這有兩個原因:首先,RocksDBStateBackend 是外部存儲,其他兩種 Checkpoint 存儲方式都是 JVM 堆存儲。受限于 JVM 堆內存的大小,Checkpoint 狀態大小以及安全性可能會受到一定的制約;其次,RocksDBStateBackend 支持增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀態。與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢復時間。

合理增加算子(Task)并行度

Checkpointing 需要對每個 Task 進行數據狀態采集。單個 Task 狀態數據越多則 Checkpointing 越慢。所以我們可以通過增加 Task 并行度,減少單個 Task 狀態數據的數量來達到縮短 CheckPointing 時間的效果。

縮短算子鏈(Operator Chains)長度

Flink 算子鏈(Operator Chains)越長,Task 也會越多,相應的狀態數據也就更多,Checkpointing 也會越慢。通過縮短算子鏈長度,可以減少 Task 數量,從而減少系統中的狀態數據總量,間接的達到優化 Checkpointing 的目的。下面展示了 Flink 算子鏈的合并規則:

  1. 上下游的并行度一致
  2. 下游節點的入度為 1
  3. 上下游節點都在同一個 Slot Group 中
  4. 下游節點的 Chain 策略為 ALWAYS
  5. 上游節點的 Chain 策略為 ALWAYS 或 HEAD
  6. 兩個節點間數據分區方式是 Forward
  7. 用戶沒有禁用 Chain

基于以上這些規則,我們在代碼層面上合并了相關度較大的一些 Task,使得平均的操作算子鏈長度至少縮短了 60%~70%。

Flink 背壓產生過程分析及解決方案

背壓產生過程分析

在 Flink 運行過程中,每一個操作算子都會消費一個中間 / 過渡狀態的流,并對它們進行轉換,然后生產一個新的流。這種機制可以類比為:Flink 使用阻塞隊列作為有界的緩沖區。跟 Java 里阻塞隊列一樣,一旦隊列達到容量上限,處理速度較慢的消費者會阻塞生產者向隊列發送新的消息或事件。下圖展示了 Flink 中兩個操作算子之間的數據傳輸以及如何感知到背壓的:

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

首先,Source 中的事件進入 Flink 并被操作算子 1 處理且被序列化到 Buffer 中,然后操作算子 2 從這個 Buffer 中讀出該事件。當操作算子 2 處理能力不足的時候,操作算子 1 中的數據便無法放入 Buffer,從而形成背壓。背壓出現的原因可能有以下兩點:

  1. 下游算子處理能力不足;
  2. 數據發生了傾斜。

背壓解決方案

實踐中我們通過以下方式解決背壓問題。首先,縮短算子鏈會合理的合并算子,節省出資源。其次縮短算子鏈也會減少 Task(線程)之間的切換、消息的序列化 / 反序列化以及數據在緩沖區的交換次數,進而提高系統的整體吞吐量。最后,根據數據特性將不需要或者暫不需要的數據進行過濾,然后根據業務需求將數據分別處理,比如有些數據源需要實時的處理,有些數據是可以延遲的,最后通過使用 keyBy 關鍵字,控制 Flink 時間窗口大小,在上游算子處理邏輯中盡量合并更多數據來達到降低下游算子的處理壓力。

優化結果

經過以上優化,在每天億級數據量下,用戶畫像可以做到實時信息實時處理并無持續背壓,Checkpointing 平均時長穩定在 1 秒以內。

五、未來工作的思考和展望

端到端的實時流處理

目前用戶畫像部分數據都是從 Hive 數據倉庫拿到的,數據倉庫本身是 T+1 模式,數據延時性較大,所以為了提高數據實時性,端到端的實時流處理很有必要。

端到端是指一端采集原始數據,另一端以報表 / 標簽 / 接口的方式對這些對數進行呈現與應用,連接兩端的是中間實時流。在后續的工作中,我們計劃將現有的非實時數據源全部切換到實時數據源,統一經過 Kafka 和 Flink 處理后再導入到 Phoenix/JanusGraph/HBase。強制所有數據源數據進入 Kafka 的一個好處在于它能夠提高整體流程的穩定性與可用性:首先 Kafka 作為下游系統的緩沖,可以避免下游系統的異常影響實時流的計算,起到“削峰填谷”的作用;其次,Flink 自 1.4 版本開始正式支持與 Kafka 的端到端精確一次處理語義,在一致性方面上更有保證。

日處理數據量超10億:友信金服基于Flink構建實時用戶畫像系統的實踐

原文鏈接

本文為阿里云原創內容,未經允許不得轉載。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

太仓市| 平凉市| 静乐县| 尼木县| 类乌齐县| 扎兰屯市| 咸丰县| 皋兰县| 夹江县| 什邡市| 滨海县| 天峻县| 诸城市| 栾城县| 瑞丽市| 定兴县| 乐安县| 美姑县| 报价| 莱西市| 民和| 五原县| 洞口县| 丹棱县| 镇雄县| 福贡县| 柳州市| 资兴市| 龙门县| 景德镇市| 五大连池市| 大余县| 南汇区| 永济市| 嘉义县| 开阳县| 榆树市| 都兰县| 三门县| 泾川县| 金湖县|