您好,登錄后才能下訂單哦!
本篇內容主要講解“ Kafka順序消費線程模型的優化方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“ Kafka順序消費線程模型的優化方法是什么”吧!
各類消息中間件對順序消息實現的做法是將具有順序性的一類消息發往相同的主題分區中,只需要將這類消息設置相同的 Key 即可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監聽消費,因此可在消費時按分區進行順序消費,保證每個分區的消息具備局部順序性。由于需要確保分區消息的順序性,并不能并發地消費消費,對消費的吞吐量會造成一定的影響。那么,如何在保證消息順序性的前提下,最大限度的提高消費者的消費能力?
本文將會對 Kafka 消費者拉取消息流程進行深度分析之后,對 Kafka 消費者順序消費線程模型進行一次實踐與優化。
在講實現 Kafka 順序消費線程模型之前,我們需要先深入分析 Kafka 消費者的消息拉取機制,只有當你對 Kafka 消費者拉取消息的整個流程有深入的了解之后,你才能夠很好地理解本次線程模型改造的方案。
我先給大家模擬一下消息拉取的實際現象,這里 max.poll.records = 500。
1、消息沒有堆積時:
可以發現,在消息沒有堆積時,消費者拉取時,如果某個分區沒有的消息不足 500 條,會從其他分區湊夠 500 條后再返回。
2、多個分區都有堆積時:
在消息有堆積時,可以發現每次返回的都是同一個分區的消息,但經過不斷 debug,消費者在拉取過程中并不是等某個分區消費完沒有堆積了,再拉取下一個分區的消息,而是不斷循環的拉取各個分區的消息,但是這個循環并不是說分區 p0 拉取完 500 條,后面一定會拉取分區 p1 的消息,很有可能后面還會拉取 p0 分區的消息,為了弄明白這種現象,我仔細閱讀了相關源碼。
org.apache.kafka.clients.consumer.KafkaConsumer#poll
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { try {// poll for new data until the timeout expiresdo { // 客戶端拉取消息核心邏輯 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) {// 在返回數據之前, 發送下次的 fetch 請求, 避免用戶在下次獲取數據時線程阻塞if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // 調用 ConsumerNetworkClient#poll 方法將 FetchRequest 發送出去。 client.pollNoWakeup(); }return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired());return ConsumerRecords.empty(); } finally { release(); } }
我們使用 Kafka consumer 進行消費的時候通常會給一個時間,比如:
consumer.poll(Duration.ofMillis(3000));
從以上代碼邏輯可以看出來,用戶給定的這個時間,目的是為了等待消息湊夠 max.poll.records 條消息后再返回,即使消息條數不夠 max.poll.records 消息,時間到了用戶給定的等待時間后,也會返回。
pollForFetches 方法是客戶端拉取消息核心邏輯,但并不是真正去 broker 中拉取,而是從緩存中去獲取消息。在 pollForFetches 拉取消息后,如果消息不為零,還會調用 fetcher.sendFetches() 與 client.pollNoWakeup(),調用這兩個方法究竟有什么用呢?
fetcher.sendFetches() 經過源碼閱讀后,得知該方法目的是為了構建拉取請求 FetchRequest 并進行發送,但是這里的發送并不是真正的發送,而是將 FetchRequest 請求對象存放在 unsend 緩存當中,然后會在 ConsumerNetworkClient#poll 方法調用時才會被真正地執行發送。
fetcher.sendFetches() 在構建 FetchRequest 前,會對當前可拉取分區進行篩選,而這個也是決定多分區拉取消息規律的核心,后面我會講到。
從 KafkaConsumer#poll 方法源碼可以看出來,其實 Kafka 消費者在拉取消息過程中,有兩條線程在工作,其中用戶主線程調用 pollForFetches 方法從緩存中獲取消息消費,在獲取消息后,會再調用 ConsumerNetworkClient#poll 方法從 Broker 發送拉取請求,然后將拉取到的消息緩存到本地,這里為什么在拉取完消息后,會主動調用 ConsumerNetworkClient#poll 方法呢?我想這里的目的是為了下次 poll 的時候可以立即從緩存中拉取消息。
pollForFetches 方法會調用 Fetcher#fetchedRecords 方法從緩存中獲取并解析消息:
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); int recordsRemaining = maxPollRecords; try {while (recordsRemaining > 0) { // 如果當前獲取消息的 PartitionRecords 為空,或者已經拉取完畢 // 則需要從 completedFetches 重新獲取 completedFetch 并解析成 PartitionRecords if (nextInLineRecords == null || nextInLineRecords.isFetched) {// 如果上一個分區緩存中的數據已經拉取完了,直接中斷本次循環拉取,并返回空的消息列表// 直至有緩存數據為止CompletedFetch completedFetch = completedFetches.peek();if (completedFetch == null) break;try { // CompletedFetch 即拉取消息的本地緩存數據 // 緩存數據中 CompletedFetch 解析成 PartitionRecords nextInLineRecords = parseCompletedFetch(completedFetch); } catch (Exception e) { // ...} completedFetches.poll(); } else {// 從分區緩存中獲取指定條數的消息List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);// ...fetched.put(partition, records); recordsRemaining -= records.size(); } } } } catch (KafkaException e) { // ...}return fetched; }
completedFetches 是拉取到的消息緩存,以上代碼邏輯就是圍繞著如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯可以看出:
maxPollRecords 為本次拉取的最大消息數量,該值可通過 max.poll.records 參數配置,默認為 500 條,該方法每次從 completedFetches 中取出一個 CompletedFetch 并解析成可以拉取的 PartitionRecords 對象,即方法中的 nextInLineRecords,請注意,PartitionRecords 中的消息數量可能大與 500 條,因此可能本次可能一次性從 PartitionRecords 獲取 500 條消息后即返回,如果 PartitionRecords 中消息數量不足 500 條,會從 completedFetches 緩存中取出下一個要拉取的分區消息,recordsRemaining 會記錄本次剩余還有多少消息沒拉取,通過循環不斷地從 completedFetches 緩存中取消息,直至 recordsRemaining 為 0。
以上代碼即可解釋為什么消息有堆積的情況下,每次拉取的消息很大概率是同一個分區的消息,因為緩存 CompletedFetch 緩存中的消息很大概率會多余每次拉取消息數量,Kafka 客戶端每次從 Broker 拉取的消息數據并不是通過 max.poll.records 決定的,該參數僅決定用戶每次從本地緩存中獲取多少條數據,真正決定從 Broker 拉取的消息數據量是通過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等參數決定的。
我們再想一下,假設某個分區的消息一直都處于堆積狀態,Kafka 會每次都拉取這個分區直至將該分區消費完畢嗎?(根據假設,Kafka 消費者每次都會從這個分區拉取消息,并將消息存到分區關聯的 CompletedFetch 緩存中,根據以上代碼邏輯,nextInLineRecords 一直處于還沒拉取完的狀態,導致每次拉取都會從該分區中拉取消息。)
答案顯然不會,不信你打開 Kafka-manager 觀察每個分區的消費進度情況,每個分區都會有消費者在消費中。
那 Kafka 消費者是如何循環地拉取它監聽的分區呢?我們接著往下分析。
發送拉取請求邏輯:
org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches
public synchronized int sendFetches() { // 解析本次可拉取的分區 Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {final Node fetchTarget = entry.getKey();final FetchSessionHandler.FetchRequestData data = entry.getValue();// 構建請求對象final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) .toForget(data.toForget());// 發送請求,但不是真的發送,而是將請求保存在 unsent 中client.send(fetchTarget, request) .addListener(new RequestFutureListener<ClientResponse>() {@Overridepublic void onSuccess(ClientResponse resp) { synchronized (Fetcher.this) {// ... ...// 創建 CompletedFetch, 并緩存到 completedFetches 隊列中completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); } } } // ... ... }); } return fetchRequestMap.size(); }
以上代碼邏輯很好理解,在發送拉取請求前,先檢查哪些分區可拉取,接著為每個分區構建一個 FetchRequest 對象,FetchRequest 中的 minBytes 和 maxBytes,分別可通過 fetch.min.bytes 和 fetch.max.bytes 參數設置。這也是每次從 Broker 中拉取的消息不一定等于 max.poll.records 的原因。
prepareFetchRequests 方法會調用 Fetcher#fetchablePartitions 篩選可拉取的分區,我們來看下 Kafka 消費者是如何進行篩選的:
org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions
private List<TopicPartition> fetchablePartitions() { Set<TopicPartition> exclude = new HashSet<>(); List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); if (nextInLineRecords != null && !nextInLineRecords.isFetched) { exclude.add(nextInLineRecords.partition); } for (CompletedFetch completedFetch : completedFetches) { exclude.add(completedFetch.partition); } fetchable.removeAll(exclude); return fetchable; }
nextInLineRecords 即我們上面提到的根據某個分區緩存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的緩存還沒拉取完,則不從 broker 中拉取消息了,以及如果此時 completedFetches 緩存中存在該分區的緩存,也不進行拉取消息。
我們可以很清楚的得出結論:
當緩存中還存在中還存在某個分區的消息數據時,消費者不會繼續對該分區進行拉取請求,直到該分區的本地緩存被消費完,才會繼續發送拉取請求。
為了更加清晰的表達這段邏輯,我舉個例子并將整個流程用圖表達出來:
假設某消費者監聽三個分區,每個分區每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:
這種消費模型創建多個 KafkaConsumer 對象,每個線程維護一個 KafkaConsumer,從而實現線程隔離消費,由于每個分區同一時刻只能有一個消費者消費,所以這種消費模型天然支持順序消費。
但是缺點是無法提升單個分區的消費能力,如果一個主題分區數量很多,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數量過多,導致項目 Socket 連接開銷巨大,項目中一般不用該線程模型去消費。
2、單 KafkaConsumer 實例 + 多 worker 線程
首先在初始化的時候,會對消費線程池進行初始化,具體是根據 threadsNumMax 的數量創建若干個單個線程的線程池,單個線程的線程池就是為了保證每個分區取模后拿到線程池是串行消費的,但這里創建 threadsNumMax 個線程池是不合理的,后面我會說到。
com.zto.consumer.KafkaConsumerProxy#submitRecords
以上就是目前 ZMS 順序消費的線程模型,用圖表示以上代碼邏輯:
在消息流量大的時候,順序消息消費時卻退化成單線程消費了。
經過對 ZMS 的消費線程模型以及對 Kafka 消費者拉取消息流程的深入了解之后,我想到了如下幾個方面對 ZMS 的消費線程模型進行優化:
1、細化消息順序粒度
之前的做法是將每個分區單獨一條線程消費,無法再繼續在分區之上增加消費能力,我們知道業務方發送順序消息時,會將同一類型具有順序性的消息給一個相同的 Key,以保證這類消息發送到同一個分區進行消費,從而達到消息順序消費的目的,而同一個分區會接收多種類型(即不同 Key)的消息,每次拉取的消息具有很大可能是不同類型的,那么我們就可以將同一個分區的消息,分配一個獨立的線程池,再利用消息 Key 進行取模放入對應的線程中消費,達到并發消費的目的,且不打亂消息的順序性。
2、細化位移提交粒度
由于 ZMS 目前是手動提交位移,目前每次拉取消息必須先消費完才能進行位移提交,既然已經對分區消息進行指定的線程池消費了,由于分區之間的位移先后提交不影響,那么我們可以將位移提交交給每個分區進行管理,這樣拉取主線程不必等到是否消費完才進行下一輪的消息拉取。
3、異步拉取與限流
異步拉取有個問題,就是如果節點消費跟不上,而拉取消息過多地保存在本地,很可能會造成內存溢出,因此我們需要對消息拉取進行限流,當本地消息緩存量達到一定量時,阻止消息拉取。
上面在分析 Kafka 消費者拉取消息流程時,我們知道消費者在發送拉取請求時,首先會判斷本地緩存中是否存在該分區的緩存,如果存在,則不發送拉取請求,但由于 ZMS 需要改造成異步拉取的形式,由于 Comsumer#poll 不再等待消息消費完再進行下一輪拉取,因此 Kafka 的本地緩存中幾乎不會存在數據了,導致 Kafka 每次都會發送拉取請求,相當于將 Kafka 的本地緩存放到 ZMS 中,因此我們需要 ZMS 層面上對消息拉取進行限流,Kafka 消費者有兩個方法可以設置訂閱的分區是否可以發送拉取請求:
// 暫停分區消費(即暫停該分區發送拉取消息請求)org.apache.kafka.clients.consumer.KafkaConsumer#pause// 恢復分區消費(即恢復該分區發送拉取消息請求)org.apache.kafka.clients.consumer.KafkaConsumer#resume
以上兩個方法,其實就是改變了消費者的訂閱分區的狀態值 paused,當 paused = true 時,暫停分區消費,當 paused = false 時,恢復分區消費,這個參數是在哪里使用到呢?上面在分析 Kafka 消費者拉取消息流程時我們有提到發送拉取請求之前,會對可拉取的分區進行篩選,其中一個條件即分區 paused = false:
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable
private boolean isFetchable() { return !paused && hasValidPosition(); }
由于 KafkaConsumer 是非線程安全的,如果我們在異步線程 KafkaConsumer 相關的類,會報如下錯誤:
KafkaConsumer is not safe for multi-threaded access
只需要確保 KafkaConsumer 相關方法在 KafkaConsumer#poll 方法線程中調用即可,具體做法可以設置一個線程安全上下文容器,異步線程操作 KafkaConsumer 相關方法是,只需要將具體的分區放到上下文容器即可,后續統一由 poll 線程執行。
因此我們只需要利用好這個特性,就可以實現拉取限流,消費者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時不會造成兩次 poll 之間的時間過大導致消費者被踢出消費組。
以上優化改造的核心是在不打亂消息順序的前提下利用消息 Key 盡可能地并發消費,但如果遇到分區中的消息都是相同 Key,并且在有一定的積壓下每次拉取都是同一個分區的消息時,以上模型可能沒有理想情況下的那么好。這時是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 參數設置小一點,讓每個分區的本地緩存都不足 500 條,這樣每次 poll 的消息列表都可以包含多個分區的消息了,但這樣又會導致 RPC 請求增多,這就需要針對業務消息大小,對這些參數進行調優。
以上線程模型,需要增加一個參數 orderlyConsumePartitionParallelism,用于設置分區消費并行度,假設某個消費組被分配 5 個分區進行消費,則每個分區默認啟動一條線程消費,一共 5 * 1 = 5 條消費線程,當 orderlyConsumePartitionParallelism = 3,則每個分區啟動 3 條線程消費,一共 5 * 3 = 15 條消費線程。orderlyConsumePartitionParallelism = 1 時,則說明該分區所有消息都處在順序(串行)消費;當 orderlyConsumePartitionParallelism > 1 時,則根據分區消息的 Key 進行取模分配線程消費,保證不了整個分區順序消費,但保證相同 Key 的消息順序消費。
注意,當 orderlyConsumePartitionParallelism > 1 時,分區消費線程的有效使用率取決于該分區消息的 Key:
1、如果該分區所有消息的 Key 都相同,則消費的 Key 取模都分配都同一條線程當中,并行度退化成 orderlyConsumePartitionParallelism = 1;
2、如果該分區相同 Key 的消息過于集中,會導致每次拉取都是相同 key 的一批消息,同樣并行度退化成 orderlyConsumePartitionParallelism = 1。
綜合對比:
優化前,ZMS 可保證整個分區消息的順序性,優化后可根據消息 Key 在分區的基礎上不打亂相同 Key 消息的順序性前提下進行并發消費,有效地提升了單分區的消費吞吐量;優化前,有很大的概率會退化成同一時刻單線程消費,優化后盡可能至少保證每個分區一條線程消費,情況好的時候每個分區可多條線程消費。
通過以上場景分析,該優化方案不是提高順序消費吞吐量的銀彈,它有很大的局限性,用戶在業務的實現上不能重度依賴順序消費去實現,以免影響業務性能上的需求。
到此,相信大家對“ Kafka順序消費線程模型的優化方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。