中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ一行代碼造成大量消息丟失該怎么解決

發布時間:2021-12-09 09:09:00 來源:億速云 閱讀:209 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關RocketMQ一行代碼造成大量消息丟失該怎么解決,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。


1、問題現象


首先接到項目反饋使用 RocketMQ 會出現如下錯誤:

RocketMQ一行代碼造成大量消息丟失該怎么解決  

 
錯誤信息關鍵點:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start flow control for a while,period in queue:205ms,size of queue:880。

由于項目組并沒有對消息發送失敗做任何補償,導致丟失消息發送失敗,故需要對這個問題進行深層次的探討,并加以解決。

 

2、問題分析


首先我們根據關鍵字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查詢,去探究在什么時候會拋出如上錯誤。根據全文搜索如下圖所示:

RocketMQ一行代碼造成大量消息丟失該怎么解決  

 
該方法是在 BrokerFastFailure 中定義的,通過名稱即可以看成其設計目的:Broker端快速失敗機制。

Broker 端快速失敗其原理圖如下:

RocketMQ一行代碼造成大量消息丟失該怎么解決  
  • 消息發送者向 Broker 發送消息寫入請求,Broker 端在接收到請求后會首先放入一個隊列中(SendThreadPoolQueue),默認容量為 10000。

  • Broker 會專門使用一個線程池(SendMessageExecutor)去從隊列中獲取任務并執行消息寫入請求,為了保證消息的順序處理,該線程池默認線程個數為1。

如果 Broker 端受到垃圾回收等等因素造成單條寫入數據發生抖動,單個 Broker 端積壓的請求太多從而得不到及時處理,會極大的造成客戶端消息發送的時間延長。

設想一下,如果由于 Broker 壓力增大,寫入一條消息需要500ms甚至超過1s,并且隊列中積壓了5000條消息,消息發送端的默認超時時間為3s,如果按照這樣的速度,這些請求在輪到 Broker 執行寫入請求時,客戶端已經將這個請求超時了,這樣不僅會造成大量的無效處理,還會導致客戶端發送超時。

故 RocketMQ 為了解決該問題,引入 Broker 端快速失敗機制,即開啟一個定時調度線程,每隔10毫秒去檢查隊列中的第一個排隊節點,如果該節點的排隊時間已經超過了 200ms,就會取消該隊列中所有已超過 200ms 的請求,立即向客戶端返回失敗,這樣客戶端能盡快進行重試,因為 Broker 都是集群部署,下次重試可以發送到其他 Broker 上,這樣能最大程度保證消息發送在默認 3s 的時間內經過重試機制,能有效避免某一臺 Broker 由于瞬時壓力大而造成的消息發送不可用,從而實現消息發送的高可用。

從 Broker 端快速失敗機制引入的初衷來看,快速失敗后會發起重試,除非同一時刻集群內所有的 Broker 都繁忙,不然消息會發送成功,用戶是不會感知這個錯誤的,那為什么用戶感知了呢?難道 TIMEOUT_ CLEAN _ QUEUE 錯誤,Broker 不重試?

為了解開這個謎團,接下來會采用源碼分析的手段去探究真相。接下來將以消息同步發送為例揭示其消息發送處理流程中的核心關鍵點。

MQ Client 消息發送端首先會利用網絡通道將請求發送到 Broker,然后接收到請求結果后并調用 processSendResponse 方法對響應結果進行解析,如下圖所示:

RocketMQ一行代碼造成大量消息丟失該怎么解決  
在這里返回的 code 為 RemotingSysResponseCode . SYSTEM_BUSY。

我們從 proccessSendResponse 方法中可以得知如果 code 為 SYSTEM_BUSY,該方法會拋出 MQBrokerException,響應 code 為 SYSTEM_BUSY,其錯誤描述為開頭部分的錯誤信息。

那我們沿著該方法的調用鏈路,可以找到其直接調用方:DefaultMQProducerImpl 的 sendKernelImpl,我們重點考慮如果底層方法拋出  MQBrokerException 該方法會如何處理。

其關鍵代碼如下圖所示:

RocketMQ一行代碼造成大量消息丟失該怎么解決  
可以看出在 sendKernelImpl 方法中首先會捕捉異常,先執行注冊的鉤子函數,即就算執行失敗,對應的消息發送后置鉤子函數也會執行,然后再原封不動的將該異常向上拋出。

sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法調用,下面是其核心實現截圖:

RocketMQ一行代碼造成大量消息丟失該怎么解決  
從這里可以看出 RocketMQ 消息發送高可用設計一個非常關鍵的點,重試機制,其實現是在 for 循環中 使用 try catch 將 sendKernelImpl 方法包裹,就可以保證該方法拋出異常后能繼續重試。從上文可知,如果 SYSTEM_BUSY 會拋出 MQBrokerException,但發現只有上述幾個錯誤碼才會重試,因為如果不是上述錯誤碼,會繼續向外拋出異常,此時 for 循環會被中斷,即不會重試。

這里非常令人意外的是連 SYSTEM_ERROR 都會重試,卻沒有包含 SYSTEM_BUSY,顯然違背了快速失敗的設計初衷,故筆者斷定,這是 RocketMQ 的一個BUG,將 SYSTEM_BUSY 遺漏了,后續會提一個 PR,增加一行代碼,將 SYSTEM_BUSY 加上即可。

問題分析到這里,該問題應該就非常明了。

 

3、解決方案


如果大家在網上搜索 TIMEOUT_CLEAN_QUEUE 的解決方法,大家不約而同提出的解決方案是增加 waitTimeMillsInSendQueue 的值,該值默認為 200ms,例如將其設置為 1000s 等等,以前我是反對的,因為我的認知里 Broker 會重試,但現在發現 Broker 不會重試,所以我現在認為該 BUG未解決的情況下適當提高該值能有效的緩解。

但這是并不是好的解決方案,我會在近期向官方提交一個PR,將這個問題修復,建議大家在公司盡量對自己使用的版本進行修改,重新打一個包即可,因為這已經違背了 Broker 端快速失敗的設計初衷。

但在消息發送的業務方,盡量自己實現消息的重試機制,即不依賴 RocketMQ 本身提供的重試機制,因為受制于網絡等因素,消息發送不可能百分之百成功,建議大家在消息發送時捕獲一下異常,如果發送失敗,可以將消息存入數據庫,再結合定時任務對消息進行重試,盡最大程度保證消息不丟失。

以上就是RocketMQ一行代碼造成大量消息丟失該怎么解決,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

徐闻县| 鹤山市| 保靖县| 荔波县| 新昌县| 饶阳县| 中西区| 重庆市| 琼结县| 修武县| 安西县| 湘潭县| 白城市| 鹿泉市| 读书| 南阳市| 石家庄市| 信丰县| 贺州市| 浦县| 新兴县| 奈曼旗| 浦城县| 仙居县| 腾冲县| 丰顺县| 睢宁县| 阜平县| 山阳县| 玉林市| 阳泉市| 米林县| 绩溪县| 堆龙德庆县| 惠州市| 曲沃县| 哈巴河县| 华容县| 葵青区| 许昌市| 嘉峪关市|