中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SpringBoot怎么整合RabbitMQ實現延遲隊列

發布時間:2023-05-08 14:18:51 來源:億速云 閱讀:93 作者:zzz 欄目:開發技術

這篇文章主要介紹“SpringBoot怎么整合RabbitMQ實現延遲隊列”,在日常操作中,相信很多人在SpringBoot怎么整合RabbitMQ實現延遲隊列問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”SpringBoot怎么整合RabbitMQ實現延遲隊列”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    如何保證消息不丟失

    rabbitmq消息投遞路徑

    生產者->交換機->隊列->消費者

    總的來說分為三個階段。

    • 1.生產者保證消息投遞可靠性。

    • 2.mq內部消息不丟失。

    • 3.消費者消費成功。

    什么是消息投遞可靠性

    簡單點說就是消息百分百發送到消息隊列中。

    我們可以開啟confirmCallback

    生產者投遞消息后,mq會給生產者一個ack.根據ack,生產者就可以確認這條消息是否發送到mq.

    開啟confirmCallback

    修改配置文件

    #NONE:禁用發布確認模式,是默認值,CORRELATED:發布消息成功到交換器后會觸發回調方法
    spring:
      rabbitmq:
        publisher-confirm-type: correlated

    測試代碼

    @Test  
    public void testConfirmCallback() throws InterruptedException {  
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  
        /**  
        *  
        * @param correlationData 配置  
        * @param ack 交換機是否收到消息,true是成功,false是失敗  
        * @param cause 失敗的原因  
        */  
        @Override  
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
            System.out.println("confirm=====>");  
            System.out.println("confirm==== ack="+ack);  
            System.out.println("confirm==== cause="+cause);  
            //根據ACK狀態做對應的消息更新操作 TODO  
        }  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "雞你太美");  
        Thread.sleep(10000);  
    }

    通過returnCallback保證消息從交換器發送到隊列成功。 修改配置文件

    spring:
      rabbitmq:
        #開啟returnCallback
        publisher-returns: true
        #交換機處理消息到路由失敗,則會返回給生產者
        template:
          mandatory: true

    測試代碼

    @Test  
    void testReturnCallback() {  
        //為true,則交換機處理消息到路由失敗,則會返回給生產者 配置文件指定,則這里不需指定  
        rabbitTemplate.setMandatory(true);  
        //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
            int code = returned.getReplyCode();  
            System.out.println("code="+code);  
            System.out.println("returned="+ returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","測試returnCallback");  
    }

    消費者消費消息時需要通過ack手動確認消息已消費。

    修改配置文件

    spring:
      rabbitmq:
        listener:  
          simple:  
            acknowledge-mode: manual

    編寫測試代碼

    @RabbitHandler  
    public void consumer(String body, Message message, Channel channel) throws IOException {  
        long msgTag = message.getMessageProperties().getDeliveryTag();  
        System.out.println("msgTag="+msgTag);  
        System.out.println("message="+ message);  
        System.out.println("body="+body);  
    
        //成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除  
        channel.basicAck(msgTag,false);  
        // channel.basicNack(msgTag,false,true);  
      
    }

    deliveryTags是消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加

    ttl死信隊列

    什么是死信隊列

    沒有被及時消費的消息存放的隊列

    消息有哪幾種情況成為死信

    • 消費者拒收消息 (basic.reject/ basic.nack) ,并且沒有重新入隊 requeue=false

    • 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)

    • 隊列的消息長度達到極限

    • 結果:消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

    死信隊列經常用來做延遲隊列消費。

    延遲隊列

    生產者投遞到mq中并不希望這條消息立馬被消費,而是等待一段時間后再去消費。

    springboot整合rabbitmq實現訂單超時自動關閉

    package com.fandf.test.rabbit;  
      
    import org.springframework.amqp.core.*;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.context.annotation.Bean;  
    import org.springframework.context.annotation.Configuration;  
      
    import java.util.HashMap;  
    import java.util.Map;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:38  
    */  
    @Configuration  
    public class RabbitMQConfig {  
      
        /**  
        * 訂單交換機  
        */  
        public static final String ORDER_EXCHANGE = "order_exchange";  
        /**  
        * 訂單隊列  
        */  
        public static final String ORDER_QUEUE = "order_queue";  
        /**  
        * 訂單路由key  
        */  
        public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  
    
        /**  
        * 死信交換機  
        */  
        public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";  
        /**  
        * 死信隊列 routingKey  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  
    
        /**  
        * 死信隊列  
        */  
        public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  
    
    
        /**  
        * 創建死信交換機  
        */  
        @Bean("orderDeadLetterExchange")  
        public Exchange orderDeadLetterExchange() {  
            return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 創建死信隊列  
        */  
        @Bean("orderDeadLetterQueue")  
        public Queue orderDeadLetterQueue() {  
            return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();  
        }  
    
        /**  
        * 綁定死信交換機和死信隊列  
        */  
        @Bean("orderDeadLetterBinding")  
        public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();  
        }  
    
    
        /**  
        * 創建訂單交換機  
        */  
        @Bean("orderExchange")  
        public Exchange orderExchange() {  
            return new TopicExchange(ORDER_EXCHANGE, true, false);  
        }  
    
        /**  
        * 創建訂單隊列  
        */  
        @Bean("orderQueue")  
        public Queue orderQueue() {  
            Map<String, Object> args = new HashMap<>(3);  
            //消息過期后,進入到死信交換機  
            args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  
    
            //消息過期后,進入到死信交換機的路由key  
            args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  
    
            //過期時間,單位毫秒  
            args.put("x-message-ttl", 10000);  
    
            return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();  
        }  
    
        /**  
        * 綁定訂單交換機和隊列  
        */  
        @Bean("orderBinding")  
        public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {  
            return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();  
        }  
      
      
    }

    消費者

    package com.fandf.test.rabbit;  
      
    import cn.hutool.core.date.DateUtil;  
    import com.rabbitmq.client.Channel;  
    import org.springframework.amqp.core.Message;  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;  
    import org.springframework.stereotype.Component;  
      
    import java.io.IOException;  
      
    /**  
    * @author fandongfeng  
    * @date 2023/4/15 15:42  
    */  
    @Component  
    @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  
    public class OrderMQListener {  
      
      
      
        @RabbitHandler  
        public void consumer(String body, Message message, Channel channel) throws IOException {  
            System.out.println("收到消息:" + DateUtil.now());  
            long msgTag = message.getMessageProperties().getDeliveryTag();  
            System.out.println("msgTag=" + msgTag);  
            System.out.println("message=" + message);  
            System.out.println("body=" + body);  
            channel.basicAck(msgTag, false);  
        }  
      
    }

    測試類

    @Test  
    void testOrder() throws InterruptedException {  
    //為true,則交換機處理消息到路由失敗,則會返回給生產者 配置文件指定,則這里不需指定  
        rabbitTemplate.setMandatory(true);  
        //開啟強制消息投遞(mandatory為設置為true),但消息未被路由至任何一個queue,則回退一條消息  
        rabbitTemplate.setReturnsCallback(returned -> {  
        int code = returned.getReplyCode();  
        System.out.println("code=" + code);  
        System.out.println("returned=" + returned);  
        });  
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "測試訂單延遲");  
        System.out.println("發送消息:" + DateUtil.now());  
        Thread.sleep(20000);  
    }

    程序輸出

    發送消息:2023-04-16 15:14:34
    收到消息:2023-04-16 15:14:44
    msgTag=1
    message=(Body:'測試訂單延遲' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
    body=測試訂單延遲

    到此,關于“SpringBoot怎么整合RabbitMQ實現延遲隊列”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    尼木县| 石渠县| 达尔| 闸北区| 涞水县| 托克逊县| 广河县| 凤冈县| 沧州市| 峨边| 织金县| 海宁市| 长治市| 东丰县| 邯郸县| 磐石市| 黎川县| 六枝特区| 清涧县| 武强县| 昆山市| 天镇县| 淳化县| 武鸣县| 桐乡市| 东乡| 临澧县| 信丰县| 方正县| 峨边| 连城县| 库伦旗| 孟州市| 金平| 霍山县| 玛纳斯县| 武穴市| 玉田县| 浮梁县| 丰都县| 新和县|