您好,登錄后才能下訂單哦!
這篇文章主要介紹了Apache Kafka分區重分配的實現原理是什么的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Apache Kafka分區重分配的實現原理是什么文章都會有所收獲,下面我們一起來看看吧。
Kafka 是由 Apache 軟件基金會開發的一個開源流處理平臺,旨在提供一個統一的、高吞吐、低延遲的實時數據處理平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規模發布/訂閱消息隊列”,這使它作為企業級基礎設施來處理流式數據非常有價值。
在 Kafka 中,用 topic 來對消息進行分類,每個進入到 Kafka 的信息都會被放到一個 topic 下,同時每個 topic 中的消息又可以分為若干 partition 以此來提高消息的處理效率。存儲消息數據的主機服務器被命名為 broker。通常為了保證數據的可靠性,數據是以多副本的形式保存在不同 broker 的不同磁盤上的。對于每一個 topic 的每一個 partition,如果多個副本之間完成了數據同步,保證了數據的一致性,則此時的多個副本所在的 broker 的集合稱為 Isr。同一時間,某個 topic 的某個 partition 的多個副本中僅有一個對外提供服務,此時對外提供服務的 broker 被認定為該 partition 的 leader,客戶端的請求都集中到 leader 上。
對于 2 副本 3 分區的 topic 其描述信息及存儲狀態如下所示:
test的描述信息: Topic:test PartitionCount:3 ReplicationFactor:2 Configs:min.insync.replic as=1 Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
test的副本分布
健康狀態的 Kafka 集群,對于每個 topic 的每個 partition,其 Isr 都應該等于預期的副本集合(后面均已 Replicas 表示),但在實際場景中,不可避免的存在磁盤/主機故障,或者 由于某些原因需要將部分 broker 節點下線的情況,此時就需要將故障/要下線的 broker 從 Replicas 中移除。對此 Kafka 提供了 kafka-reassign-partitions 工具來進行手動的分區副本遷移。
在 Kafka 的根路徑下,通過執行如下命令,來完成分區副本的重分配:
./bin/kafka‐reassign‐partitions.sh ‐‐zookeeper localhost:2181/kafka ‐‐reassignment‐json‐file reassign‐topic.json ‐‐execute
其中:reassign‐topic.json 文件指定了分區副本的分布情況,示例如下:
{ "version": 1, "partitions": [ { "topic": "test", "partition": 2, "replicas": [ 2, 1 ], "log_dirs": [ "any", "any" ] } }
文件中指明了將 topic=test,partition=2 的分區的兩副本分別移動到 brokerId=2 和 brokerId=1 的節點的任意磁盤路徑上。
下面將結合 2.0.0 版本的 Kafka 源碼簡單的介紹下 Kafka 分區副本重分配的流程和邏輯。
在開始之前先簡單介紹下在 Kafka 分區副本重分配中涉及到的兩個概念:ZooKeeper 和 Kafka Controller。
Kafka 的元數據,是存儲在 ZooKeeper 中的。Apache ZooKeeper 是一個提供高可靠性的分布式協調服務框架。它使用的數據模型類似于文件系統的樹形結構,根目錄也是以“/”開始。該結構上的每個節點被稱為 znode,用來保存一些元數據協調信息。同時 ZooKeeper 賦予客戶端監控 znode 變更的能力,即所謂的 Watch 通知功能。一旦 znode 節點被創建、刪除,子節點數量發生變化,或是 znode 所存的數據本身變更, ZooKeeper 會通過節點變更監聽器 (ChangeHandler) 的方式顯式通知客戶端以便客戶端 觸發對應的處理操作。
Kafka Controller 是 Apache Kafka 的核心組件,它的主要作用是在 Apache ZooKeeper 的幫助下管理和協調整個 Kafka 集群。集群中任意一臺 Broker 都能充當控制器的角色,但是,在運行過程中,只能有一個 Broker 成為控制器,行使其管理和協調的職責。
Kafka 的分區重分配就是在 client、broker 和 controller 的協同運行下完成的。即:
1. 客戶端發起分區重分配任務,在 ZooKeeper 中創建/admin/reassign_partitions 節點,然 后向涉及的 broker 發送 alterReplicaLogDirs 請求
2. controller 監測到 ZooKeeper 中/admin/reassign_partitions 的變化,觸發 Kafka 分區元 數據的變更維護操作
3. broker 接收到客戶端發送的 alterReplicaLogDirs 請求,根據具體任務內容在服務端實際完成分區副本移動
流程總結如下圖所示:
下面將針對這三部分分別展開介紹:
分區重分配任務是由客戶端發起的,其入口主類為 ReassignPartitionsCommand.scala 中,調用 executeAssignment 方法。客戶端的 executeAssignment 方法主要完成了如下操作:
1.解析 json 文件并進行相關校驗
•讀取 json 文件內容,校驗“partitions”的“version”,僅為 1 時,繼續執行副本重分 配
•校驗分區副本數和副本數據路徑數是否一致
•校驗 partition/replica 是否為空/重復
2.檢查待重分配的分區在集群中是否存在(根據 zk 中的/brokers/topics/${topic})
3.檢查確認所有目標 broker 均在線(zk 中/brokers/ids 的子 znode 列表)
4.檢查是否已存在分區副本重分配任務,如果已存在相關任務,則退出
5.將分區重分配任務記錄到 zk 中,即在 zk 中創建/admin/reassign_partitions,以便 controller 可以發現并協調 broker 進行相關操作
6.根據解析的 json 內容,逐個 topic 向相關的 broker 發送 alterReplicaLogDirs 請求
客戶端的處理邏輯可總結為如下流程圖:
在 controller 啟動時會創建 partitionReassignmentHandler,kafkaController 主線程回調 onControllerFailover 時,檢測到/admin/reassign_partitions 發生變化時,觸發分區副本重分配操作,在 maybeTriggerPartitionReassignment 中通過調用 onPartitionReassignment 真正執行分區副本重分配。在 onPartitionReassignment 中定 義了三個概念:
•RAR:指定的分區副本放置策略
•OAR:原始的分區副本放置策略
•AR:當前的分區副本放置策略
onPartitionReassignment 的執行過程可以總結為如下步驟:
檢查指定的分區副本是否處在 isr 中,如果不在則執行以下前 3 步,否則直接執行第 4 步
1.在 zk 中將 AR 更新為 RAR+OAR (/broker/topics/${topicName})
2.向所有副本(RAR+OAR)中發送 LeaderAndIsr 請求
3.將 RAR-OAR 的副本狀態置為 NewReplica,等待 NewReplica 中的數據與 leader 中的數據 完成同步
4.等待直到所有 RAR 中的副本完成與 leader 的同步
5.將所有 RAR 的副本置為 OnlineReplica 狀態
6.將 RAR 作為 AR
7.如果當前的 leader 不在 RAR 中,發送 LeaderAndIsr Request 從 RAR 中選出一個新的 leader;如果當前 leader 在 RAR 中,檢查 leader 狀態,如果 leader 健康則更新 LeaderEpoch,否則重新選擇 leader
8.將 OAR-RAR 的副本置為 Offline 狀態
9.將 OAR-RAR 的副本置為 NonExistentReplica 狀態(真實刪除對應的分區副本)
10.將 zk 中的 AR 置為 RAR(/brokers/topics/${topicName}數據格式:{"version":1,"partitions":{"0":[${brokerId}]}})
11.更新 zk 中/admin/reassign_partitions 的值,將完成遷移的分區刪除
12.同步所有 broker,更新元數據信息
邏輯流程圖如下:
底層數據跨路徑遷移,是由 broker 端完成的,broker 接收到客戶端發來的 ALTER_REPLICA_LOG_DIRS 請求后,調用 alterReplicaLogDirs 方法,相關流程如下:
1.確保目的路徑/待移動分區在線
2.如果當前分區副本的 log 路徑不存在給定的目的路徑并且 futureLogs(用于跨路徑數據遷移的中間過程)也不包含目的路徑,則在內存中記錄當前分區副本和目的 logDir,即標記那些需要進行遷移的分區副本路徑
3.對于需要移動的分區副本,目的 broker 的路徑中創建 future Log
4.停止當前 Log 的清理工作,等待 future Log 同步完再清理
5.創建 ReplicaAlterLogDirsThread,逐個 topic 逐個 partition 獲取 fetchOffset、 logStartOffset 、fetchSize 等數據構造 Fetch 請求
6.通過 ReplicaManager.fetchMessages 從分區副本 leader 獲取數據,完成數據同步
更詳細的處理流程如下圖所示:
關于“Apache Kafka分區重分配的實現原理是什么”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Apache Kafka分區重分配的實現原理是什么”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。