您好,登錄后才能下訂單哦!
本篇內容介紹了“KAFKA的ISR的伸縮過程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
知識點總結
我們了解ISR列表是不斷伸縮的,在副本失效后及時踢出ISR列表,在副本趕上進度之后重新將副本加入到ISR列表中,后面我們就會按照這個思路來看下其中細節。
功能失效:節點宕機,在該節點上的副本都屬于功能失效副本。
同步失效:follower副本所在的broker因為帶寬或者負載等因素無法及時完成同步,導致被踢出ISR。
在0.9x版本之前,有一個控制參數:replica.lag.max.messages
默認值為4000,表示如果follower的消息個數落后leader個數4000,那么就會被踢出ISR列表;
我們可以想一下這種直接指定條數的方式是否合理呢?顯然是不合理的,原因入下:
高吞吐的場景:瞬間就幾萬條消息,可能follower就滯后個幾秒鐘就被判定為失效從而被踢出,可能導致ISR列表頻繁的變動,以及元數據的頻繁更新。
低吞吐的場景:可能一天就幾條消息,那可能follower都滯后好幾天了依舊存在于ISR中,那ISR不就失去意義了嗎?
因此0.9x版本開始,移除了該參數,取而代之的參數是replica.lag.time.max.ms
該參數默認值是10000ms,也就是10s。
也就是說如果follower在10s都沒能追上leader的LEO,就會被認定為失效,從而踢出IS列表。
我們知道了ISR是如何判定失效副本后,再來看下,到底是怎么把這個失效的副本踢出去的呢?
1、每個broker在啟動的時候都會啟動兩個定時任務:
isr-expiration:定時檢查當前broker上的eader對應的副本失效信息,也就是看當前Leader的ISR列表中是否存在失效副本,默認執行周期為replica.lag.time.max.ms / 2 = 5s
。
isr-change-propagation:定時檢查內存isrChangeSet中是否有新的變更數據,固定執行周期為2.5s
2、判斷副本失效:
isr-expiration任務會根據當前時間now,減去某follower的 lastCaughtUpTimeMs
,如果大于replica.lag.time.max.ms
值,則說明失效。
而lastCaughtUpTimeMs
這個值,在follower的LEO與leader的LEO相等時(Leader中維護了follower的LEO信息),被更新。
也就是說,只有當follower完全追上了Leader才更新,而不是每Fetch一次就更新。
關于為什么不是每次Fetch的時候就更新該值呢?
我們試想一下,如果leader的寫入速率遠大于follower的同步速率,可能leader已經寫了10w條數據了,follower由于網絡/負載為原因還在慢悠悠的同步,但是因為Fetch請求是正常發送的,就每次都更新lastCaughtUpTimeMs
值,從而認為該follower是有效的,那這不就導致leader和follower之間在這種場景下存在巨大的數據差異了嘛?從而影響數據的可靠性。
3、這個ISR變化的信息如何傳遞呢?
由leader所在的broker的isr-expiration
定時任務,去檢查失效副本和更新zk的/state節點數據,同時寫入isrChangeSet
。
isr-change-propagation
去檢查isrChangeSet是否有新增數據,如果有,則往zk中的/isr_change_notification
節點下創建子節點。
而Controller對這個節點有一個Watcher,如果發現新增了子節點,那么Controller就會重新從zk中獲取到最新的元數據,然后通知所有Broker更新元數據。
從上述過程中,我們還可以知道,實際上這個變更的數據會在內存中停留一段時間,假如這個時候我們對應的broker宕機了,那么不就是改了zk卻沒有讓其他broker更新元數據嗎?
其實不是,因為這種情況下,broker宕機會觸發controller在zk下的brokers/ids下對應的節點被刪除,因此Controller也會讓其他的broker更新元數據,所以無論如何都會更新。
最后我們來總結一下整個ISR剔除的過程:
每個leader在啟動的時候都會啟動兩個定時檢查任務,每隔一段時間檢查是否存在失效副本。
如果某個follower的lastCaughtUpTimeMs > 10s
那么就會被判定為失效副本
如果定時任務掃描到存在失效副本時,就會往zk的/state節點下更新最新的ISR列表數據,同時將變更數據寫入到內存中的isrChangeSet
中。
然后另外一個傳播任務會定時檢查isrChangeSet
是否存在需要變更的任務,如果感知到就往zk的/isr_change_notification
節點下創建子節點。
最終由Controller感知到節點的變化,然后從zk中獲取最新的元數據,然后通知所有的Broker更新元數據,完成整個ISR列表的數據更新。
在看完第五小節之后,第六小節就會顯得非常簡單,無非是需要知道什么時候一個副本會重新判定為同步副本呢? 那就是:當前失效follower的LEO等于leaderHW的時候,即被判斷可以重新加入ISR。
那么隨之而來的一個問題就是在哪兒去判斷followerLEO == leaderHW
的呢?
這里和上面的剔除ISR成員不一樣,并不是由定時任務去檢測的,而是在處理完Fetch請求的時候,如果判斷Fetch請求是follower發過來的的(replicaId >= 0),那么就會去看下當前這個follower的LEO是多少(其實就是Fetch請求帶過來的),是不是趕上了當前的leaderHW,如果是的那么就執行擴張ISR操作。
擴張ISR操作流程就和上面流程一樣了,先寫zk下的/state數據,然后寫isrChangeSet,最后由Controller感知到數據變化,更新集群元數據。
我們所需要記住的主要差別點在于,ISR列表的擴張是在Fetch請求的時候去判斷和執行的。
最后,我們用圖示來加深一點印象。
1、失效副本(圖源:《深入理解kafka》):
2、踢出ISR列表:
我們由上可知,ISR的伸縮是需要涉及到zk和Controller以及各個Broker的元數據更新的,因此如果太過頻繁會造成性能問題。
所以kafka在在判斷ISR伸縮之前,還會判斷兩個條件,以此來降低頻率:
上次ISR集合發生變化距離現在已經超過5s。
上一次寫入zk的時候,距離現在已經超過60s。
如果一個副本剛追上Leader加入到ISR,但是因為短時間內沒有追上LEO,5s之后又被檢查到是失效副本,不是又要被踢出去,要更新元數據,這樣就太頻繁了。 因此就有了上面兩個限制,就起碼給了多60s的讓新加入的follower去追上Leader的LEO。
“KAFKA的ISR的伸縮過程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。