中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java?RabbitMQ高級特性實例分析

發布時間:2022-08-09 16:34:42 來源:億速云 閱讀:172 作者:iii 欄目:開發技術

這篇文章主要介紹“Java RabbitMQ高級特性實例分析”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Java RabbitMQ高級特性實例分析”文章能幫助大家解決問題。

    消息的可靠投遞

    在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。

    • confirm 確認模式

    • return 退回模式

    rabbitmq整個消息投遞的路徑為:

    producer—>rabbitmq broker—>exchange—>queue—>consumer

    • 消息從producer到exchange則會返回一個confirmCallback

    • 消息從exchange—>queue投遞失敗則會返回一個returnCallback

    我們可以利用這兩個callback控制消息的可靠性投遞

    確認模式

    消息從 producer 到 exchange 則會返回一個 confirmCallback

    以spring整合rabbitmq為例,修改rabbitmq配置文件,在connectionFactory中添加publisher-confirms屬性并設置值為true

    <!--
    * 確認模式:
    * 步驟:
    * 1. 確認模式開啟:ConnectionFactory中開啟publisher-confirms="true"
    -->
    <!-- 定義rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"/>
    /*
     * 確認模式:
     * 步驟:
     * 2. 在rabbitTemplate定義ConfirmCallBack回調函數
     */
    @Test
        public void queueTest(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 *
                 * @param correlationData 相關配置信息
                 * @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗
                 * @param cause 失敗原因
                 */
                    System.out.println("confirm方法被執行了....");
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失敗
                        System.out.println("接收失敗消息" + cause);
                        //做一些處理,讓消息再次發送。
                    }
                }
            });
            //路由鍵與隊列同名
            rabbitTemplate.convertAndSend("spring_queue", "message confirm....");
        }

    Java?RabbitMQ高級特性實例分析

    因為正常向隊列中發送了消息,所以返回的cause值為空,如果出現異常,cause為異常原因

    退回模式

    消息從 exchange&ndash;>queue 投遞失敗則會返回一個 returnCallback

    1.開啟回退模式:publisher-returns=“true”

        <!-- 定義rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-returns="true"/>

    2.設置Exchange處理消息失敗的模式:setMandatory,然后設置ReturnCallBack

        @Test
        public void queueTest(){
            //1.設置交換機處理失敗消息的模式
            rabbitTemplate.setMandatory(true);
            //2.設置ReturnCallBack
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * @param message    消息對象
                 * @param replyCode  錯誤碼
                 * @param replyText  錯誤信息
                 * @param exchange   交換機
                 * @param routingKey 路由鍵
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String
                        replyText, String exchange, String routingKey) {
                    System.out.println("return 執行了....");
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    //處理
                }
            });
            //手動添加錯誤路由模擬錯誤發生
            rabbitTemplate.convertAndSend("spring_topic_exchange", "return123", "return message...");
        }

    此處只有發生錯誤才會返回消息,因此手動加上一個錯誤,給發送消息添加路由值return123,實際上并沒有這個路由,運行返回消息如下。

    Java?RabbitMQ高級特性實例分析

    Consumer Ack

    ack指Acknowledge,確認。 表示消費端收到消息后的確認方式。

    有三種確認方式:

    • 自動確認:acknowledge=“none”

    • 手動確認:acknowledge=“manual”

    • 根據異常情況確認:acknowledge=“auto”,(這種方式使用麻煩,沒有進行學習)

    其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用channel.basicNack()方法,讓其自動重新發送消息。

    還是以spring整合rabbitmq為例,rabbitmq配置文件中設置確認方式

    <rabbit:listener-container connection-factory="connectionFactory"
    acknowledge="manual">
    .....

    監聽類代碼如下:

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收轉換消息
                System.out.println(new String(message.getBody()));
                //2. 處理業務邏輯
                System.out.println("處理業務邏輯...");
                int i = 3/0;//出現錯誤
                // 3. 手動簽收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒絕簽收
                /*
                 *第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會
                 *重新發送該消息給消費端
                 */
                channel.basicNack(deliveryTag,true,true);
                //channel.basicReject(deliveryTag,true);
            }
        }
    }

    因為出現異常調用channel.basicNack()方法,讓其自動重新發送消息,所以無限循環輸出內容

    Java?RabbitMQ高級特性實例分析

    消費端限流

    Java?RabbitMQ高級特性實例分析

    當我們的 Rabbitmq 服務器積壓了有上萬條未處理的消息時,我們隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據!當數據量特別大的時候,我們對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,我們無法約束生產端,這是用戶的行為。所以我們應該對消費端限流,rabbitmq提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(給channel或者consume設置Qos值)未被確認前,不進行消費新消息。

    1.確保ack機制為手動確認

    2.listener-container配置屬性perfetch = 1,表示消費端每次從mq拉去一條消息來消費,直到手動確認消費完畢后,才會繼續拉去下一條消息。

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
            <rabbit:listener ref="topicListenerACK" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>

    生產者,發送五條消息

        @Test
        public void topicTest(){
    /**
     * 參數1:交換機名稱
     * 參數2:路由鍵名
     * 參數3:發送的消息內容
     */
            for (int i=0;i<5;i++){
                rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.a", "發送到spring_topic_exchange交換機xzk.cn的消息"+i);
            }
        }
    }

    生產者注釋掉channel.basicAck(deliveryTag,true)即不確認收到消息

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收轉換消息
                System.out.println(new String(message.getBody()));
                //2. 處理業務邏輯
                System.out.println("處理業務邏輯...");
                // 3. 手動簽收
                //channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒絕簽收
                /*
                 *第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會
                 *重新發送該消息給消費端
                 */
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }

    此時啟動消費者再運行生產者之后,發現消費者發送了五條消息,實際上生產者只接受到了一條消息,達到限流作用

    Java?RabbitMQ高級特性實例分析

    觀察rabbitmq控制臺,發現有1條unack消息。4條ready消息,還沒到達consumer。和我們設置的prefetchCount=1限流情況相符。

    Java?RabbitMQ高級特性實例分析

    把channel.basicAck(deliveryTag,true)的注釋取消掉,即可以自動確認收到消息,重新運行消費者,接收到了另外的四條消息

    Java?RabbitMQ高級特性實例分析

    Java?RabbitMQ高級特性實例分析

    TTL(Time To Live)

    Time To Live,消息過期時間設置

    設置某個隊列為過期隊列

    設置交換機,隊列以及隊列過期時間為10000ms

     <!--ttl-->
        <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:topic-exchange name="test_exchange_ttl">
            <rabbit:bindings>
                <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>

    生產者發送10條消息

        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
            }

    Java?RabbitMQ高級特性實例分析

    十秒鐘后,過期消息消失

    Java?RabbitMQ高級特性實例分析

    設置單獨某個消息過期

    設置交換機和隊列

    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"/>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>     
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生產者發送特定過期消息,用到了MessagePostProcessor這個api

     @Test
        public void testTtl() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //1.設置message信息
                    message.getMessageProperties().setExpiration("5000");//消息的過期時間
                    //2.返回該消息
                    return message;
                }
            };
            //消息單獨過期
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
        }

    Java?RabbitMQ高級特性實例分析

    5s之后

    Java?RabbitMQ高級特性實例分析

    注:

    1.如果同時設置隊列過期和消息過期,系統會根據哪個過期的時間短而選用哪兒個。

    2.設置單獨消息過期時,如果該消息不為第一個接受的消息,則不過期。

    死信隊列

    死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Deadmessage后,可以被重新發送到另一個交換機,這個交換機就是DLX。

    Java?RabbitMQ高級特性實例分析

    消息成為死信的三種情況:

    • 隊列消息長度到達限制;

    • 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;

    • 原隊列存在消息過期設置,消息到達超時時間未被消費;

    隊列綁定死信交換機:

    給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key

    Java?RabbitMQ高級特性實例分析

    實現

    1.聲明正常的隊列(test_queue_dlx)和交換機(test_exchange_dlx)

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--正常隊列綁定死信交換機-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交換機名稱-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:發送給死信交換機的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 設置隊列的過期時間 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            <!--4.2 設置隊列的長度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    2.聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    3.生產端測試

    /**
    * 發送測試死信消息:
    * 1. 過期時間
    * 2. 長度限制
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
        //1. 測試過期時間,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一條消息,我會死嗎?");
        //2. 測試長度限制后,消息死信
        /* for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一條消息,我會死嗎?");
        }*/
        //3. 測試消息拒收
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一條消息,我會死嗎?");
    }

    4.消費端監聽

    public class DlxListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收轉換消息
                System.out.println(new String(message.getBody()));
                //2. 處理業務邏輯
                System.out.println("處理業務邏輯...");
                int i = 3/0;//出現錯誤
                //3. 手動簽收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出現異常,拒絕接受");
                //4.拒絕簽收,不重回隊列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">
    </rabbit:listener>

    延遲隊列

    延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。c

    需求:

    1.下單后,30分鐘未支付,取消訂單,回滾庫存。

    2.新用戶注冊成功7天后,發送短信問候。

    實現方式:

    • 定時器

    • 延遲隊列

    定時器的實現方式不夠優雅,我們采取延遲隊列的方式

    Java?RabbitMQ高級特性實例分析

    不過很可惜,在RabbitMQ中并未提供延遲隊列功能。

    但是可以使用:TTL+死信隊列 組合實現延遲隊列的效果。

    Java?RabbitMQ高級特性實例分析

    配置

    <!--
    延遲隊列:
            1. 定義正常交換機(order_exchange)和隊列(order_queue)
            2. 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)
            3. 綁定,設置正常隊列過期時間為30分鐘
    -->
    <!-- 定義正常交換機(order_exchange)和隊列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!-- 綁定,設置正常隊列過期時間為30分鐘-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    生產端測試

    @Test
    public void testDelay() throws InterruptedException {
        //1.發送訂單消息。 將來是在訂單系統中,下單成功后,發送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","訂單信息:id=1,time=2019年8月17日16:41:47");
        /*//2.打印倒計時10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }*/
    }

    消費端監聽

    public class OrderListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		try {
    			//1.接收轉換消息
    			System.out.println(new String(message.getBody()));
    			//2. 處理業務邏輯
    			System.out.println("處理業務邏輯...");
    			System.out.println("根據訂單id查詢其狀態...");
    			System.out.println("判斷狀態是否為支付成功");
    			System.out.println("取消訂單,回滾庫存....");
    			//3. 手動簽收
    			channel.basicAck(deliveryTag,true);
    		} catch (Exception e) {
    			//e.printStackTrace();
    			System.out.println("出現異常,拒絕接受");
    			//4.拒絕簽收,不重回隊列 requeue=false
    			channel.basicNack(deliveryTag,true,false);
    		}
    	}
    }
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">
    </rabbit:listener>

    關于“Java RabbitMQ高級特性實例分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    察隅县| 德清县| 平顶山市| 琼中| 正镶白旗| 都安| 郸城县| 大同县| 宁德市| 新源县| 乐东| 石家庄市| 怀远县| 荣昌县| 连州市| 明溪县| 漳州市| 通化县| 康平县| 绍兴县| 卫辉市| 九江市| 沙坪坝区| 巫山县| 揭西县| 铅山县| 任丘市| 滦平县| 江津市| 岳普湖县| 镇远县| 会昌县| 祁连县| 岳阳县| 临洮县| 乌拉特后旗| 博爱县| 彝良县| SHOW| 法库县| 庐江县|