中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何在SpringBoot中使用RabbitMQ

發布時間:2021-05-19 16:00:19 來源:億速云 閱讀:225 作者:Leah 欄目:編程語言

這篇文章將為大家詳細講解有關如何在SpringBoot中使用RabbitMQ,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

死信隊列:沒有被及時消費的消息存放的隊列,消息沒有被及時消費有以下幾點原因:
1.有消息被拒絕(basic.reject/ basic.nack)并且requeue=false
2.隊列達到最大長度
3.消息TTL過期

場景

如何在SpringBoot中使用RabbitMQ

1.小時進入初始隊列,等待30分鐘后進入5分鐘隊列
2.消息等待5分鐘后進入執行隊列
3.執行失敗后重新回到5分鐘隊列
4.失敗5次后,消息進入2小時隊列
5.消息等待2小時進入執行隊列
6.失敗5次后,將消息丟棄或做其他處理

使用

安裝MQ

使用docker方式安裝,選擇帶mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

訪問 localhost: 15672,默認賬號密碼guest/guest

項目配置

(1)創建springboot項目
(2)在application.properties配置文件中配置mq連接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)隊列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

  //time
  @Value("${spring.df.buffered.min:120}")
  private int springdfBufferedTime;

  @Value("${spring.df.high-buffered.min:5}")
  private int springdfHighBufferedTime;

  @Value("${spring.df.low-buffered.min:120}")
  private int springdfLowBufferedTime;

  // 30min Buffered Queue
  @Value("${spring.df.queue:spring-df-buffered-queue}")
  private String springdfBufferedQueue;

  @Value("${spring.df.topic:spring-df-buffered-topic}")
  private String springdfBufferedTopic;

  @Value("${spring.df.route:spring-df-buffered-route}")
  private String springdfBufferedRouteKey;

  // 5M Buffered Queue
  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private String springdfHighBufferedQueue;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  // High Queue
  @Value("${spring.df.high.queue:spring-df-high-queue}")
  private String springdfHighQueue;

  @Value("${spring.df.high.topic:spring-df-high-topic}")
  private String springdfHighTopic;

  @Value("${spring.df.high.route:spring-df-high-route}")
  private String springdfHighRouteKey;

  // 2H Low Buffered Queue
  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private String springdfLowBufferedQueue;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  // Low Queue
  @Value("${spring.df.low.queue:spring-df-low-queue}")
  private String springdfLowQueue;

  @Value("${spring.df.low.topic:spring-df-low-topic}")
  private String springdfLowTopic;

  @Value("${spring.df.low.route:spring-df-low-route}")
  private String springdfLowRouteKey;


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
  Queue springdfBufferedQueue() {
    int bufferedTime = 1000 * 60 * springdfBufferedTime;
    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
  Queue springdfHighBufferedQueue() {
    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
  Queue springdfHighQueue() {
    return new Queue(springdfHighQueue, true);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
  Queue springdfLowBufferedQueue() {
    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
  Queue springdfLowQueue() {
    return new Queue(springdfLowQueue, true);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
  TopicExchange springdfBufferedTopic() {
    return new TopicExchange(springdfBufferedTopic);
  }

  @Bean
  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
  TopicExchange springdfHighBufferedTopic() {
    return new TopicExchange(springdfHighBufferedTopic);
  }

  @Bean
  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
  TopicExchange springdfHighTopic() {
    return new TopicExchange(springdfHighTopic);
  }

  @Bean
  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
  TopicExchange springdfLowBufferedTopic() {
    return new TopicExchange(springdfLowBufferedTopic);
  }

  @Bean
  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
  TopicExchange springdfLowTopic() {
    return new TopicExchange(springdfLowTopic);
  }

  @Bean
  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
  }


  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                       MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(springdfHighQueue, springdfLowQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
    adapter.setDefaultListenerMethod("receive");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    return adapter;

  }


  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routeKey);
    args.put("x-message-ttl", bufferedTime);
    // 是否持久化
    boolean durable = true;
    // 僅創建者可以使用的私有隊列,斷開后自動刪除
    boolean exclusive = false;
    // 當所有消費客戶端連接斷開后,是否自動刪除隊列
    boolean autoDelete = false;

    return new Queue(queueName, durable, exclusive, autoDelete, args);
  }
}

消費者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

  @Value("${high-retry:5}")
  private int highRetry;

  @Value("${low-retry:5}")
  private int lowRetry;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  private final RabbitTemplate rabbitTemplate;
  @Autowired
  public MqReceiver(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void receive(Object message) {
    if (logger.isInfoEnabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息從初始隊列進入5分鐘的高速緩沖隊列
   * @param message
   */
  public void highReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);

    try{
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highRetry) {
        msg.put("times", times + 1);
        rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
      } else {
        msg.put("times", 0);
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }
    }
  }

  /**
   * 消息從5分鐘緩沖隊列進入2小時緩沖隊列
   * @param message
   */
  public void lowReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);
    
    try {
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowRetry) {
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }else{
        logger.info("消息無法被消費...");
      }
    } 
  }
}

springboot是什么

springboot一種全新的編程規范,其設計目的是用來簡化新Spring應用的初始搭建以及開發過程,SpringBoot也是一個服務于框架的框架,服務范圍是簡化配置文件。

關于如何在SpringBoot中使用RabbitMQ就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

龙胜| 濮阳县| 分宜县| 凯里市| 建阳市| 崇文区| 崇州市| 玛曲县| 赣州市| 孝感市| 岐山县| 博野县| 奉化市| 呼和浩特市| 墨江| 阿鲁科尔沁旗| 若尔盖县| 称多县| 玉屏| 裕民县| 洛浦县| 丹巴县| 德庆县| 文昌市| 丹江口市| 洪泽县| 和硕县| 宝兴县| 厦门市| 武清区| 镇雄县| 左云县| 营山县| 乐亭县| 新兴县| 叶城县| 集安市| 团风县| 万源市| 墨玉县| 临泽县|