中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring Cloud Stream異常處理的示例分析

發布時間:2021-08-19 14:13:47 來源:億速云 閱讀:279 作者:小新 欄目:編程語言

這篇文章給大家分享的是有關Spring Cloud Stream異常處理的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

應用處理

當消費者在處理接收到的消息時,有可能會由于某些原因而拋出異常。若希望對拋出來的異常進行處理的話,就需要采取一些異常處理手段,異常處理的方式可分為三種:應用層面的處理、系統層面的處理以及通過RetryTemplate進行處理。

局部處理

Stream相關的配置內容如下:

spring:
 cloud:
  stream:
   rocketmq:
    binder:
     name-server: 192.168.190.129:9876
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group

所謂局部處理就是針對指定的channel進行處理,需要定義一個處理異常的方法,并在該方法上添加@ServiceActivator注解,該注解有一個inputChannel屬性,用于指定對哪個channel進行處理,格式為{destination}.{group}.errors。具體代碼如下:

package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
/**
 * 消費者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {
  @StreamListener(Sink.INPUT)
  public void receive1(String messageBody) {
    log.info("消費消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("參數錯誤");
  }
  /**
   * 處理局部異常的方法
   *
   * @param errorMessage 異常消息對象
   */
  @ServiceActivator(
    // 通過特定的格式指定處理哪個channel的異常
    inputChannel = "stream-test-topic.binder-group.errors"
  )
  public void handleError(ErrorMessage errorMessage) {
    // 獲取異常對象
    Throwable errorMessagePayload = errorMessage.getPayload();
    log.error("發生異常", errorMessagePayload);
    // 獲取消息體
    Message<?> originalMessage = errorMessage.getOriginalMessage();
    if (originalMessage != null) {
      log.error("消息體: {}", originalMessage.getPayload());
    } else {
      log.error("消息體為空");
    }
  }
}

全局處理

全局處理則是可以處理所有channel拋出來的異常,所有的channel拋出異常后會生成一個ErrorMessage對象,即錯誤消息。錯誤消息會被放到一個專門的channel里,這個channel就是errorChannel。所以通過監聽errorChannel就可以實現全局異常的處理。具體代碼如下:

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
  log.info("消費消息,messageBody = {}", messageBody);
  throw new IllegalArgumentException("參數錯誤");
}
/**
 * 處理全局異常的方法
 *
 * @param errorMessage 異常消息對象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
  log.error("發生異常. errorMessage = {}", errorMessage);
}

系統處理

系統處理方式,因消息中間件的不同而異。如果應用層面沒有配置錯誤處理,那么error將會被傳播給binder,而binder則會將error回傳給消息中間件。消息中間件可以選擇:

  • 丟棄消息:錯誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用于生產

  • requeue(重新排隊,從而重新處理)

  • 將失敗的消息發送給DLQ(死信隊列)

DLQ

目前RabbitMQ對DLQ的支持比較好,這里以RabbitMQ為例,只需要添加DLQ相關的配置:

spring:
 cloud:
  stream:
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group
   rabbit:
    bindings:
     input:
      consumer:
       # 自動將失敗的消息發送給DLQ
       auto-bind-dlq: true

消息消費失敗后,就會放入死信隊列。在控制臺操作一下,即可將死信放回消息隊列,這樣,客戶端就可以重新處理。

如果想獲取原始錯誤的異常堆棧,可添加如下配置:

spring:
 cloud:
  stream:
   rabbit:
    bindings:
     input:
      consumer:
       republish-to-dlq: true

requeue

Rabbit及Kafka的binder依賴RetryTemplate實現消息重試,從而提升消息處理的成功率。然而,如果設置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate則不會再重試。此時可以通過requeue方式來處理異常。

需要添加如下配置:

# 默認是3,設為1則禁用重試
spring.cloud.stream.bindings.<input channel名稱>.consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

這樣,失敗的消息將會被重新提交到同一個handler進行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。

RetryTemplate

RetryTemplate主要用于實現消息重試,也是錯誤處理的一種手段。有兩種配置方式,一種是通過配置文件進行配置,如下示例:

spring:
 cloud:
  stream:
   bindings:
    <input channel名稱>:
     consumer:
      # 最多嘗試處理幾次,默認3
      maxAttempts: 3
      # 重試時初始避退間隔,單位毫秒,默認1000
      backOffInitialInterval: 1000
      # 重試時最大避退間隔,單位毫秒,默認10000
      backOffMaxInterval: 10000
      # 避退乘數,默認2.0
      backOffMultiplier: 2.0
      # 當listen拋出retryableExceptions未列出的異常時,是否要重試
      defaultRetryable: true
      # 異常是否允許重試的map映射
      retryableExceptions:
       java.lang.RuntimeException: true
       java.lang.IllegalStateException: false

另一種則是通過代碼配置,在多數場景下,使用配置文件定制重試行為都是可以滿足需求的,但配置文件里支持的配置項可能無法滿足一些復雜需求。此時可使用代碼方式配置RetryTemplate,如下示例:

@Configuration
class RetryConfiguration {
  @StreamRetryTemplate
  public RetryTemplate sinkConsumerRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(retryPolicy());
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
  }
  private ExceptionClassifierRetryPolicy retryPolicy() {
    BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
        Collections.singletonList(IllegalAccessException.class
        ));
    keepRetryingClassifier.setTraverseCauses(true);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
    AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

    ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
    retryPolicy.setExceptionClassifier(
        classifiable -> keepRetryingClassifier.classify(classifiable) ?
            alwaysRetryPolicy : simpleRetryPolicy);
    return retryPolicy;
  }
  private FixedBackOffPolicy backOffPolicy() {
    final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(2);
    return backOffPolicy;
  }
}

最后還需要添加一段配置:

spring.cloud.stream.bindings.<input channel名稱>.consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持設置retry-template-name

感謝各位的閱讀!關于“Spring Cloud Stream異常處理的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武宣县| 巨野县| 永善县| 泗水县| 基隆市| 兴海县| 丁青县| 荣成市| 沁水县| 盐池县| 望江县| 阿荣旗| 绥德县| 永康市| 灯塔市| 信宜市| 吴旗县| 常山县| 沅江市| 张家口市| 临邑县| 电白县| 湘阴县| 循化| 内黄县| 平谷区| 苗栗县| 布尔津县| 西贡区| 安康市| 茶陵县| 大悟县| 浮梁县| 张家港市| 杭锦后旗| 垣曲县| 错那县| 建始县| 彩票| 鹰潭市| 布尔津县|