您好,登錄后才能下訂單哦!
這篇文章主要介紹“Java RabbitMQ消息隊列常見問題實例分析”,在日常操作中,相信很多人在Java RabbitMQ消息隊列常見問題實例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java RabbitMQ消息隊列常見問題實例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
消息堆積的產生場景:
生產者產生的消息速度大于消費者消費的速度。解決:增加消費者的數量或速度。
沒有消費者進行消費的時候。解決:死信隊列、設置消息有效期。相當于對我們的消息設置有效期,在規定的時間內如果沒有消費的話,自動過期,過期的時候會執行客戶端回調監聽的方法將消息存放到數據庫表記錄,后期實現補償。
1、生產者使用消息確認機制保證消息百分之百能夠將消息投遞到MQ成功。
2、MQ服務器端應該將消息持久化到硬盤
3、消費者使用手動ack機制確認消息消費成功
如果MQ服務器容量滿了怎么辦?
使用死信隊列將消息存到數據庫中去,后期補償消費。
RabbitMQ死信隊列俗稱,備胎隊列;消息中間件因為某種原因拒收該消息后,可以轉移到死信隊列中存放,死信隊列也可以有交換機和路由key等。
產生背景:
消息投遞到MQ中存放 消息已經過期
隊列達到最大的長度 (隊列容器已經滿了)生產者拒絕接收消息
消費者消費多次消息失敗,就會轉移存放到死信隊列中
代碼案例:
maven依賴
<dependencies> <!-- springboot-web組件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
yml配置
server: # 服務啟動端口配置 port: 8081 servlet: # 應用訪問路徑 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####連接地址 host: www.kaicostudy.com ####端口號 port: 5672 ####賬號 username: kaico ####密碼 password: kaico ### 地址 virtual-host: /kaicoStudy ###模擬演示死信隊列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###備胎交換機 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
隊列配置類
@Configuration public class DeadLetterMQConfig { /** * 訂單交換機 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 訂單隊列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 訂單路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交換機 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信隊列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 聲明死信交換機 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 聲明死信隊列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 聲明訂單業務交換機 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 綁定死信隊列到死信交換機 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 聲明訂單隊列,并且綁定死信隊列 * * @return Queue */ @Bean public Queue orderQueue() { // 訂單隊列綁定我們的死信交換機 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 綁定訂單隊列到訂單交換機 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
死信隊列消費者
@Component public class OrderDlxConsumer { /** * 死信隊列監聽隊列回調的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信隊列消費訂單消息" + msg); } }
普通隊列消費者
@Component public class OrderConsumer { /** * 監聽隊列回調的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常訂單消費者消息msg:" + msg); } }
后臺隊列管理頁面如下:
部署方式:死信隊列不能夠和正常隊列存在同一個服務器中,應該分服務器存放。
訂單30分鐘未支付,系統自動超時關閉的實現方案。
基于任務調度實現,效率是非常低。
基于redis過期key實現,key失效時會回調客戶端一個方法。
用戶下單的時候,生成一個令牌(有效期)30分鐘,存放到我們redis;缺點:非常冗余,會在表中存放一個冗余字段。
基于mq的延遲隊列(最佳方案)rabbitmq情況下。
原理:在我們下單的時候,往mq投遞一個消息設置有效期為30分鐘,但該消息失效的時候(沒有被消費的情況下),執行我們客戶端一個方法告訴我們該消息已經失效,這時候查詢這筆訂單是否已經支付。
實現邏輯:
主要使用死信隊列來實現。
想要的代碼:就是正常的消費者不消費消息,或者沒有正常的消費者,在設置的時間后進入死信隊列中,然后死信消費者實現相應的業務邏輯。
當消費者業務邏輯代碼中,拋出異常自動實現重試 (默認是無數次重試)
應該對RabbitMQ重試次數實現限制,比如最多重試5次,每次間隔3s;重試多次還是失敗的情況下,存放到死信隊列或者存放到數據庫表中記錄后期人工補償。因為重試失敗次數之后,隊列會自動刪除這個消息。
消息重試原理: 在重試的過程中,使用aop攔截我們的消費監聽方法,也不會打印這個錯誤日志。如果重試多次還是失敗,達到最大失敗次數的時候才會打印錯誤日志。
如果消費多次還是失敗的情況下:
1、自動刪除該消息;(消息可能丟失)
解決辦法:
如果充實多次還是失敗的情況下,最終存放到死信隊列;
采用表日志記,消費失敗錯誤日志的日志記錄,后期人工自動對該消息實現補償。
消費者獲取消息后,調用第三方接口(HTTP請求),但是調用第三方接口失敗呢?是否需要重試 ?
答:有時是因為網絡異常調用失敗,應該需要重試幾次。
消費者獲取消息后,應該代碼問題拋出數據異常,是否需要重試?
答:不需要重試,代碼異常需要重新修改代碼發布項目。
第一步、springboot項目配置需要開啟ack模式
acknowledge-mode: manual
第二步、消費者Java代碼
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 開啟消息確認機制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
什么是消息冪等性?MQ消費者如何保證冪等性?
產生的原因:就是因為消費者可能會開啟自動重試,重試過程中可能會導致消費者業務邏輯代碼重復執行。此刻消息已經消費了,因為業務報錯導致消息重新消費,這時會出現
解決方案:采用消息全局id根據業務來定,根據業務id(全局唯一id)消費者可以判斷這條消息已經消費了。
消費者代碼邏輯:
分布式事務:在分布式系統中,因為跨服務調用接口,存在多個不同的事務,每個事務都互不影響。就存在分布式事務的問題。
解決分布式事務核心思想:數據最終一致性。
分布式領域中名詞:
強一致性 :要么同步速度非常快或者采用鎖的機制 不允許出現臟讀;
強一致性解決方案:要么數據庫A非常迅速的將數據同步給數據B,或者數據庫A沒有同步完成之前數據庫B不能夠讀取數據。
弱一致性: 允許讀取的數據為原來的臟數據,允許讀取的結果不一致性。
最終一致性: 在我們的分布式系統中,因為數據之間同步通過網絡實現通訊,短暫的數據延遲是允許的,但是最終數據必須要一致性。
基于RabbitMQ解決分布式事務的思路:(采用最終一致性的方案)
確認我們的生產者消息一定要投遞到MQ中(消息確認機制)投遞失敗 就繼續重試
消費者采用手動ack的形式確認消息實現消費 注意冪等性問題,消費失敗的情況下,mq自動幫消費者重試。
保證我們的生產者第一事務先執行,如果執行失敗采用補單隊列(給生產者自己事務補充,確保生產者第一事務執行完成【數據最終一致性】)。
解決思路圖:核心是利用mq發送消息給其他系統將數據修改回來。
到此,關于“Java RabbitMQ消息隊列常見問題實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。