中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ如何保證隊列里的消息99.99%被消費?

發布時間:2020-05-25 19:16:26 來源:網絡 閱讀:1906 作者:Java_老男孩 欄目:編程語言

1. 本篇概要

其實,還有1種場景需要考慮:當消費者接收到消息后,還沒處理完業務邏輯,消費者掛掉了,那消息也算丟失了?,比如用戶下單,訂單中心發送了1個消息到RabbitMQ里的隊列,積分中心收到這個消息,準備給這個下單的用戶增加20積分,但積分還沒增加成功呢,積分中心自己掛掉了,導致數據出現問題。

那么如何解決這種問題呢?

為了保證消息被消費者成功的消費,RabbitMQ提供了消息確認機制(message acknowledgement),本文主要講解RabbitMQ中,如何使用消息確認機制來保證消息被消費者成功的消費,避免因為消費者突然宕機而引起的消息丟失。

2. 開啟顯式Ack模式

我們開啟一個消費者的代碼是這樣的:

// 創建隊列消費者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

這里的重點是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2個參數,讓我們先看下basicConsume()的源碼:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}

這里的autoAck參數指的是是否自動確認,如果設置為ture,RabbitMQ會自動把發送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者接收到消息是否處理成功;如果設置為false,RabbitMQ會等待消費者顯式的回復確認信號后才會從內存(或者磁盤)中刪除。

建議將autoAck設置為false,這樣消費者就有足夠的時間處理消息,不用擔心處理消息過程中消費者宕機造成消息丟失。

此時,隊列里的消息就分成了2個部分:

  1. 等待投遞給消費者的消息(下圖中的Ready部分)
  2. 已經投遞給消費者,但是還沒有收到消費者確認信號的消息(下圖中的Unacked部分)

RabbitMQ如何保證隊列里的消息99.99%被消費?

如果RabbitMQ一直沒有收到消費者的確認信號,并且消費此消息的消費者已經斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。

RabbitMQ不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開,這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。

為了便于理解,我們舉個具體的例子,生產者的話的我們延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設置 RabbitMQ 的主機名
        factory.setHost("localhost");
        // 創建一個連接
        Connection connection = factory.newConnection();
        // 創建一個通道
        Channel channel = connection.createChannel();
        // 創建一個Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 關閉頻道和連接
        channel.close();
        connection.close();
    }
}

然后新建一個消費者AckConsumer類:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設置 RabbitMQ 的主機名
        factory.setHost("localhost");
        // 創建一個連接
        Connection connection = factory.newConnection();
        // 創建一個通道
        Channel channel = connection.createChannel();
        // 創建隊列消費者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我們先將autoAck參數設置為ture,即自動確認,并在消費消息時故意寫個異常,然后先運行生產者客戶端將消息寫入隊列中,然后運行消費者客戶端,發現消息未消費成功但是卻消失了:

RabbitMQ如何保證隊列里的消息99.99%被消費?

RabbitMQ如何保證隊列里的消息99.99%被消費?

然后我們將autoAck設置為false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次運行生產者客戶端將消息寫入隊列中,然后運行消費者客戶端,此時雖然消費者客戶端仍然代碼異常,但是消息仍然在隊列中:

RabbitMQ如何保證隊列里的消息99.99%被消費?

然后我們刪除掉消費者客戶端中的異常代碼,重新啟動消費者客戶端,發現消息消費成功了,但是消息一直未Ack:

RabbitMQ如何保證隊列里的消息99.99%被消費?

RabbitMQ如何保證隊列里的消息99.99%被消費?

手動停掉消費者客戶端,發現消息又到了Ready狀態,準備重新投遞:

RabbitMQ如何保證隊列里的消息99.99%被消費?

之所以消費掉消息,卻一直還是Unacked狀態,是因為我們沒在代碼中添加顯式的Ack代碼:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的編號,它是一個64位的長×××值。

此時運行消費者客戶端,發現消息消費成功,并且在隊列中被移除:

RabbitMQ如何保證隊列里的消息99.99%被消費?

RabbitMQ如何保證隊列里的消息99.99%被消費?

文末彩蛋

Java學習、面試;文檔、視頻資源免費獲取

RabbitMQ如何保證隊列里的消息99.99%被消費?

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

南宁市| 长海县| 昌邑市| 固阳县| 库尔勒市| 织金县| 博罗县| 孝感市| 如皋市| 丰台区| 菏泽市| 循化| 淅川县| 长阳| 桓台县| 栾城县| 商河县| 周口市| 泸西县| 无极县| 台东市| 拜城县| 施甸县| 通海县| 彭州市| 舞钢市| 来安县| 深泽县| 青河县| 西华县| 舟山市| 台湾省| 云阳县| 庄河市| 陆良县| 建瓯市| 观塘区| 长丰县| 罗江县| 廊坊市| 赤壁市|