中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ如何防止數據丟失

發布時間:2021-12-24 09:28:50 來源:億速云 閱讀:129 作者:小新 欄目:大數據

小編給大家分享一下RabbitMQ如何防止數據丟失,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!


 

一、分析數據丟失的原因

分析RabbitMQ消息丟失的情況,不妨先看看一條消息從生產者發送到消費者消費的過程:

RabbitMQ如何防止數據丟失  

可以看出,一條消息整個過程要經歷兩次的網絡傳輸:從生產者發送到RabbitMQ服務器,從RabbitMQ服務器發送到消費者

在消費者未消費前存儲在隊列(Queue)中

所以可以知道,有三個場景下是會發生消息丟失的:

  • 存儲在隊列中,如果隊列沒有對消息持久化,RabbitMQ服務器宕機重啟會丟失數據。
  • 生產者發送消息到RabbitMQ服務器過程中,RabbitMQ服務器如果宕機停止服務,消息會丟失。
  • 消費者從RabbitMQ服務器獲取隊列中存儲的數據消費,但是消費者程序出錯或者宕機而沒有正確消費,導致數據丟失。

針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是消息持久化,confirm機制,ACK事務機制。

RabbitMQ如何防止數據丟失  
 

二、消息持久化

RabbitMQ是支持消息持久化的,消息持久化需要設置:Exchange為持久化和Queue持久化,這樣當消息發送到RabbitMQ服務器時,消息就會持久化。

首先看Exchange交換機的類圖:

RabbitMQ如何防止數據丟失  

看這個類圖其實是要說明上一篇文章介紹的四種交換機都是AbstractExchange抽象類的子類,所以根據java的特性,創建子類的實例會先調用父類的構造器,父類也就是AbstractExchange的構造器是怎么樣的呢?

RabbitMQ如何防止數據丟失  

從上面的注釋可以看到durable參數表示是否持久化。默認是持久化(true)。創建持久化的Exchange可以這樣寫:

 @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交換機
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }
 

接著是Queue隊列,我們先看看Queue的構造器是怎么樣的:

RabbitMQ如何防止數據丟失  

也是通過durable參數設置是否持久化,默認是true。所以創建時可以不指定:

 @Bean
    public Queue fanoutExchangeQueueA() {
     //只需要指定名稱,默認是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }
 

這就完成了消息持久化的設置,接下來啟動項目,發送幾條消息,我們可以看到:

RabbitMQ如何防止數據丟失怎么證明是已經持久化了呢,實際上可以找到對應的文件:RabbitMQ如何防止數據丟失找到對應磁盤中的目錄:RabbitMQ如何防止數據丟失消息持久化可以防止消息在RabbitMQ Server中不會因為宕機重啟而丟失

 

三、消息確認機制

 

3.1 confirm機制

在生產者發送到RabbitMQ Server時有可能因為網絡問題導致投遞失敗,從而丟失數據。我們可以使用confirm模式防止數據丟失。工作流程是怎么樣的呢,看以下圖解:RabbitMQ如何防止數據丟失從上圖中可以看到是通過兩個回調函數**confirm()、returnedMessage()**進行通知。

一條消息從生產者發送到RabbitMQ,首先會發送到Exchange,對應回調函數confirm()。第二步從Exchange路由分配到Queue中,對應回調函數則是returnedMessage()

代碼怎么實現呢,請看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
#    publisher-returns: true
    template:
      mandatory: true
# publisher-confirms:設置為true時。當消息投遞到Exchange后,會回調confirm()方法進行通知生產者
# publisher-returns:設置為true時。當消息匹配到Queue并且失敗時,會通過回調returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 設置為true時。指定消息在沒有被隊列接收時會通過回調returnedMessage()方法退回。
 

有個小細節,publisher-returns和mandatory如果都設置的話,優先級是以mandatory優先。可以看源碼:RabbitMQ如何防止數據丟失接著我們需要定義回調方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 監聽消息是否到達Exchange
     *
     * @param correlationData 包含消息的唯一標識的對象
     * @param ack             true 標識 ack,false 標識 nack
     * @param cause           nack 投遞失敗的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
        } else {
            logger.error("消息投遞失敗,Id:{},錯誤提示:{}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("消息沒有路由到隊列,獲得返回的消息");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return t;
    }
}
 

我這里就簡單地打印回調方法返回的消息,在實際項目中,可以把返回的消息存儲到日志表中,使用定時任務進行進一步的處理。

我這里是使用RabbitTemplate進行發送,所以在Service層的RabbitTemplate需要設置一下:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    
 private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        return map;
    }
}
 

大功告成!接下來我們進行測試,發送一條消息,我們可以控制臺:RabbitMQ如何防止數據丟失假設發送一條信息沒有路由匹配到隊列,可以看到如下信息:RabbitMQ如何防止數據丟失這就是confirm模式。它的作用是為了保障生產者投遞消息到RabbitMQ不會出現消息丟失

 

3.2 事務機制(ACK)

最開始的那張圖已經講過,消費者從隊列中獲取到消息后,會直接確認簽收,假設消費者宕機或者程序出現異常,數據沒有正常消費,這種情況就會出現數據丟失

所以關鍵在于把自動簽收改成手動簽收,正常消費則返回確認簽收,如果出現異常,則返回拒絕簽收重回隊列。RabbitMQ如何防止數據丟失代碼怎么實現呢,請看演示:

首先在消費者的application.yml文件中設置事務提交為manual手動模式:

spring:
  rabbitmq:
    listener:
      simple:
  acknowledge-mode: manual # 手動ack模式
        concurrency: 1 # 最少消費者數量
        max-concurrency: 10 # 最大消費者數量
 

然后編寫消費者的監聽器:

@Component
public class RabbitDemoConsumer {

    enum Action {
        //處理成功
        SUCCESS,
        //可以重試的錯誤,消息重回隊列
        RETRY,
        //無需重試的錯誤,拒絕消息,并從隊列中刪除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消費者RabbitDemoConsumer從RabbitMQ服務端消費消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("測試:拋出可重回隊列的異常");
            }
            if ("error".equals(msg)) {
                throw new Exception("測試:拋出無需重回隊列的異常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根據異常的類型判斷,設置action是可重試的,還是無需重試的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印異常
            e2.printStackTrace();
            //根據異常的類型判斷,設置action是可重試的,還是無需重試的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量處理。true表示批量ack處理小于tag的所有消息。false則處理當前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒絕策略,消息重回隊列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒絕策略,并且從隊列中刪除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
 

解釋一下上面的代碼,如果沒有異常,則手動確認回復RabbitMQ服務端basicAck(消費成功)。

如果拋出某些可以重回隊列的異常,我們就回復basicNack并且設置重回隊列。

如果是拋出不可重回隊列的異常,就回復basicNack并且設置從RabbitMQ的隊列中刪除。

接下來進行測試,發送一條普通的消息"hello":RabbitMQ如何防止數據丟失解釋一下ack返回的三個方法的意思。

①成功確認

void basicAck(long deliveryTag, boolean multiple) throws IOException;
 

消費者成功處理后調用此方法對消息進行確認。

  • deliveryTag:該消息的index
  • multiple:是否批量.。true:將一次性ack所有小于deliveryTag的消息。

②失敗確認

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • multiple:是否批量。true:將一次性拒絕所有小于deliveryTag的消息。
  • requeue:被拒絕的是否重新入隊列。

③失敗確認

void basicReject(long deliveryTag, boolean requeue) throws IOException;
 
  • deliveryTag:該消息的index。
  • requeue:被拒絕的是否重新入隊列。

basicNack()和basicReject()的區別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息

 

四、遇到的坑

 

4.1 啟用nack機制后,導致的死循環

上面的代碼我故意寫了一個bug。測試發送一條"bad",然后會拋出重回隊列的異常。這就有個問題:重回隊列后消費者又消費,消費拋出異常又重回隊列,就造成了死循環。RabbitMQ如何防止數據丟失那怎么避免這種情況呢?

既然nack會造成死循環的話,我提供的一個思路是不使用basicNack(),把拋出異常的消息落庫到一張表中,記錄拋出的異常,消息體,消息Id。通過定時任務去處理

如果你有什么好的解決方案,也可以留言討論~

 

4.2 double ack

有的時候比較粗心,不小心開啟了自動Ack模式,又手動回復了Ack。那就會報這個錯誤:

消費者RabbitDemoConsumer從RabbitMQ服務端消費消息:java技術愛好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102  INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
 

出現這個錯誤,可以檢查一下yml文件是否添加了以下配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
 

如果上面這個配置已經添加了,還是報錯,有可能你使用@Configuration配置了SimpleRabbitListenerContainerFactory,根據SpringBoot的特性,代碼優于配置,代碼的配置覆蓋了yml的配置,并且忘記設置手動manual模式

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設置手動ack模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
 

如果你還是有報錯,那可能是寫錯地方了,寫在生產者的項目了。以上的配置應該配置在消費者的項目。因為ack模式是針對消費者而言的。我就是寫錯了,寫在生產者,折騰了幾個小時,淚目~

 

4.3 性能問題

其實手動ACK相對于自動ACK肯定是會慢很多,我在網上查了一些資料,性能相差大概有10倍。所以一般在實際應用中不太建議開手動ACK模式。不過也不是絕對不可以開,具體情況具體分析,看并發量,還有數據的重要性等等。

所以在實際項目中還需要權衡一下并發量和數據的重要性,再決定具體的方案

 

4.4 啟用手動ack模式,如果沒有及時回復,會造成隊列異常

如果開啟了手動ACK模式,但是由于代碼有bug的原因,沒有回復RabbitMQ服務端,那么這條消息就會放到Unacked狀態的消息堆里,只有等到消費者的連接斷開才會轉到Ready消息。如果消費者一直沒有斷開連接,那Unacked的消息就會越來越多,占用內存就越來越大,最后就會出現異常。

這個問題,我沒法用我的電腦演示,我的電腦太卡了。

 

看完了這篇文章,相信你對“RabbitMQ如何防止數據丟失”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

于田县| 资中县| 蓬溪县| 通化市| 潮州市| 兴山县| 从化市| 清镇市| 扶风县| 神农架林区| 镇宁| 华阴市| 肥东县| 泰和县| 临清市| 翁源县| 民权县| 融水| 梁山县| 巴林右旗| 天柱县| 陆川县| 赤城县| 嵊州市| 社旗县| 田阳县| 当阳市| 名山县| 阿勒泰市| 阳泉市| 青浦区| 临湘市| 恩施市| 枝江市| 界首市| 翁源县| 义马市| 怀远县| 临城县| 尼玛县| 黄平县|