中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka

發布時間:2021-12-15 11:17:07 來源:億速云 閱讀:332 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

業務的發展對MQ的依賴越來越重,地位也越來越高,對它的需求也越來越多。比如順序消費,事務消息,回溯消費等,性能方面也有更高要求。越來越多的趨勢提醒我們有更好MQ方案。

假如我們將“MQ從Rabbit替換成Rocket”的方案提上議程,就會發放這是一個非常浩大的工程。以前好多服務都是用的有RabbitMQ的特征代碼,如果要替換相當于所有服務的代碼都要較大的更新,這帶來的運營風險是巨大的,需要非常多的開發測試資源的投入。

那回頭來講,我們最開始使用rabbitmq的時候能不能盡量隱藏特征代碼嗎,為以后的升級替換保留可能性。

這個時候就需要使用Spring Cloud的子組件Spring Cloud Stream。它是一個構建消息驅動微服務的框架,提供一套消息訂閱消費的標準為不同供應商的消息中間件進行集成。目前官方提供KafkaRabbitMQ的集成實現,而阿里也實現對RocketMQ的集成。

一、 Spring Cloud Stream簡介

怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka

Spring Cloud Stream應用由第三方的中間件組成。應用間的通信通過輸入通道(input channel)和輸出通道(output channel)完成。這些通道是由Spring Cloud Stream 注入的。而通道與外部的代理的連接又是通過Binder實現的。

二、 RabbitMQ集成

1. 引入包

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 設置消息輸入輸出通道

public interface Source {
    String OUTPUT = "myOutput";

    @Output(OUTPUT)
    MessageChannel message();
}

public interface Sink {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel sub1();
}

輸出通道為消息的發送方,輸入通道為消息的接收方

myOutputmyInput為通道名,后續通過配置文件進行特性配置,切記兩個通道的綁定最好是分開定義,不然有可能產生莫名錯誤

3. 消息特性配置

spring
  cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
        myInput: # 通道名,對應代碼中的消費監聽組
          destination: login-user # exchange
          group: logined-member   # 消費組

      rabbit:
        bindings:
          myOutput:
            producer:
              routing-key-expression: headers.routingKey   # 發送端路由key
              delayed-exchange: true    # 開啟延時隊列

          myInput:
            consumer:
              binding-routing-key: login.user.succeed   # 消費監聽路由表達式
              delayed-exchange: true    # 開啟延時隊列
              auto-bind-dlq: true   # 綁定死信隊列
              republish-to-dlq: true  # 重投到死信隊列并帶有報錯信息
1) destination消息的主題名

在Rabbit中用來定義exchange以及成為queue的一部分

2) group消費組
  • 沒有定義消費組時,如果啟動多實例則一個消息同時都消費 怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka

  • 定義了消費組后,多實例共用一個queue,負載消費。從圖可以看出queue名為destination.group組成 怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka

  • binding-routing-key:消費路由監聽表達式

  • delayed-exchange: 開啟延時隊列

  • auto-bind-dlq:開啟死信隊列

  • republish-to-dlq:此設置可以讓死信消息帶報錯信息 怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka

4. 消息的發送接收實現

發送消息

@Autowired
private Source source;

@GetMapping("/")
public void sendSucceed() {
    source.message().send(MessageBuilder.withPayload("Hello World...")
            .setHeader("routingKey", "login.user.succeed")
            .setHeader("version", "1.0")
            .setHeader("x-delay", 5000)
            .build());
}

這里可以為消息設置不同header,以現實不同的功能,這部分每種MQ有不同的特性,需要視情況而定

接收消息

@StreamListener(value = Sink.MY_INPUT_1, condition = "headers['version']=='1.0'")
public void receiveSucceed_v1(@Payload String message) {
    String msg = "StreamReceiver v1: " + message;
    log.error(msg);
}

5. 綁定消息通道

@EnableBinding(value = {Source.class, Sink.class})
@SpringBootApplication
public class RabbitApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitApplication.class, args);
    }
}

實現這5步就可以正常發送接收消息了,你會發現除了引入不同的包和消息特性配置外,其它的代碼都是抽象代碼,沒有任何rabbitmq的特征代碼

三、 RocketMQ集成

根據RabbitMQ的相關代碼,只需要修改引入包和特片配置就可以替換成RocketMQ了(一些特性功能除外

1. 引入包

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

2. 消息特征配置

spring
  cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
          content-type: application/json

        myInput: # 通道名,對應代碼中的消費監聽組
          destination: login-user # exchange
          group: logined-member   # 消費者組, 同組負載消費

      rocketmq:
        binder:
          name-server: 127.0.0.1:9876

四、 Kafka集成

1. 引入包

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 消息特征配置

spring
    cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
          content-type: application/json

        myInput: # 通道名,對應代碼中的消費監聽組
          destination: login-user # exchange
          group: logined-member   # 消費者組, 同組負載消費


      kafka:
        binder:
          brokers: localhost:9092         #Kafka的消息中間件服務器
          auto-create-topics: true

由上面三個簡單的例子可以看出,Spring Cloud Stream對消息訂閱和消費做了高度抽象,用一套代碼實現多種消息中間件的支持。同時它也可以非常簡單的實現多種消息中間件的混用,大大擴展了消息中間件的玩法。

上述就是小編為大家分享的怎么使用Spring Cloud Stream玩轉RabbitMQ,RocketMQ和Kafka了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

珠海市| 麻江县| 凤阳县| 朔州市| 青川县| 丰顺县| 内黄县| 仪征市| 长海县| 晋中市| 白水县| 鄢陵县| 大田县| 延川县| 海宁市| 高淳县| 镇雄县| 霍林郭勒市| 班戈县| 砀山县| 什邡市| 高淳县| 神农架林区| 任丘市| 丰城市| 日土县| 疏附县| 仙游县| 开鲁县| 南通市| 资讯| 绍兴市| 凤城市| 铁岭县| 阿克苏市| 罗城| 仁化县| 双鸭山市| 桑日县| 开化县| 华池县|