中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

springboot中rabbitmq如何實現消息可靠性機制

發布時間:2021-09-26 10:05:29 來源:億速云 閱讀:424 作者:小新 欄目:開發技術

這篇文章主要介紹springboot中rabbitmq如何實現消息可靠性機制,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

1. 生產者模塊通過publisher confirm機制實現消息可靠性

 1.1 生產者模塊導入rabbitmq相關依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--用于mq消息的序列化與反序列化-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

1.2 配置文件中進行mq的相關配置

spring.rabbitmq.host=10.128.240.183
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
        
spring.rabbitmq.publisher-confirm-type=correlated
        
spring.rabbitmq.publisher-returns=true
        
spring.rabbitmq.template.mandatory=true
  • publish-confirm-type:開啟publisher-confirm,有以下可選值

simple:同步等待confirm結果,直到超時
correlated:異步回調,定義ConfirmCallback。mq返回結果時會回調這個ConfirmCallback

  • publish-returns:開啟publish-return功能。可以定義ReturnCallback

  • template.mandatory: 定義消息路由失敗的策略

true:調用ReturnCallback
false:直接丟棄消息

1.3 定義ReturnCallback(消息投遞到隊列失敗觸發此回調)

  • 每個RabbitTemplate只能配置一個ReturnCallback。

  • 當消息投遞失敗,就會調用生產者的returnCallback中定義的處理邏輯

  • 可以在容器啟動時就配置這個回調

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發消息
        });
    }
}

1.4 定義ConfirmCallback(消息到達交換機觸發此回調)

可以為redisTemplate指定一個統一的確認回調

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發消息
        });
 
        
        // 設置統一的confirm回調。只要消息到達broker就ack=true
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("這是統一的回調");
                System.out.println("correlationData:" + correlationData);
                System.out.println("ack:" + b);
                System.out.println("cause:" + s);
            }
        });
    }
}

也可以為特定的消息定制回調

 @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testmq() throws InterruptedException {
 
 
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 
        correlationData.getFuture().addCallback(result->{
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投遞到交換機!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機失敗!消息ID:{}", correlationData.getId());
                // 重發消息
            }
        },ex->{
            // 記錄日志
            log.error("消息發送失敗!", ex);
            // 重發消息
        });
        rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData);
    }

2. 消費者模塊開啟消息確認

2.1 添加配置

# 手動ack消息,不使用默認的消費端確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • none:關閉ack,消息投遞時不可靠的,可能丟失

  • auto:類似事務機制,出現異常時返回nack,消息回滾到mq,沒有異常,返回

  • ackmanual:我們自己指定什么時候返回ack

2.2 manual模式在監聽器中自定義返回ack

@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
 
    @Autowired
    private OrderService orderService;
 
 
    @RabbitHandler
    private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("收到過期的訂單信息,準備關閉訂單" + orderEntity.getOrderSn());
 
        try {
            orderService.closeOrder(orderEntity);
            // 第二個參數為false則表示僅確認此條消息。如果為true則表示對收到的多條消息同時確認
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 第二個參數為ture表示將這個消息重新加入隊列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

3. 消費者模塊開啟消息失敗重試機制

3.1 配置文件添加配置,開啟本地重試

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初識的失敗等待時長為1秒
          multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數
          stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
  • 開啟本地重試,如果消息處理過程總拋出異常,不會requeue到隊列,而是在消費者本地重試

  • 重試達到最大次數后,spring會返回ack,消息會被丟棄

4.  消費者模塊添加失敗策略(用于開啟失敗本地重試功能后)

  • 當開啟本地重試后,重試最大次數后消息直接丟棄。

  • 三種策略,都繼承于MessageRecovery接口

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式

  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊

  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機

4.2 定義處理失敗消息的交換機和隊列 沒有會自動創建相應的隊列、交換機與綁定關系,有了就啥也不做

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
 
// 路由鍵為key
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

4.3 向容器中添加一個失敗策略組件

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    // error為路由鍵
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

以上是“springboot中rabbitmq如何實現消息可靠性機制”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

静海县| 仁化县| 西吉县| 札达县| 三原县| 济源市| 淮北市| 中卫市| 芒康县| 清流县| 喜德县| 庆安县| 宜昌市| 哈巴河县| 明光市| 新宾| 定襄县| 闸北区| 毕节市| 岗巴县| 金塔县| 孟连| 依兰县| 旌德县| 安远县| 富阳市| 南昌县| 巨野县| 常宁市| 广丰县| 乌兰察布市| 峨山| 阳江市| 安溪县| 杭锦后旗| 塘沽区| 元谋县| 九江市| 南投县| 曲阜市| 新建县|