中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ中怎么實現延遲隊列

發布時間:2021-08-07 13:38:36 來源:億速云 閱讀:386 作者:Leah 欄目:編程語言

這篇文章給大家介紹RabbitMQ中怎么實現延遲隊列,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

在 RabbitMQ 3.6.x 之前我們一般采用死信隊列+TTL過期時間來實現延遲隊列,我們這里不做過多介紹,可以參考之前文章來了解:TTL、死信隊列

在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載

首先我們創建交換機和消息隊列

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig {  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";  public static final String LAZY_QUEUE = "MQ.LazyQueue";  public static final String LAZY_KEY = "lazy.#";  @Bean  public TopicExchange lazyExchange(){    //Map<String, Object> pros = new HashMap<>();    //設置交換機支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);    exchange.setDelayed(true);    return exchange;  }  @Bean  public Queue lazyQueue(){    return new Queue(LAZY_QUEUE, true);  }  @Bean  public Binding lazyBinding(){    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);  }}

我們在 Exchange 的聲明中可以設置exchange.setDelayed(true)來開啟延遲隊列,也可以設置為以下內容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現的。

//Map<String, Object> pros = new HashMap<>();    //設置交換機支持延遲消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

發送消息時我們需要指定延遲推送的時間,我們這里在發送消息的方法中傳入參數 new MessagePostProcessor() 是為了獲得 Message對象,因為需要借助 Message對象的api 來設置延遲時間。

import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender {  @Autowired  private RabbitTemplate rabbitTemplate;  //confirmCallback returnCallback 代碼省略,請參照上一篇   public void sendLazy(Object message){    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(confirmCallback);    rabbitTemplate.setReturnCallback(returnCallback);    //id + 時間戳 全局唯一    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());    //發送消息時指定 header 延遲時間    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,        new MessagePostProcessor() {      @Override      public Message postProcessMessage(Message message) throws AmqpException {        //設置消息持久化        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        //message.getMessageProperties().setHeader("x-delay", "6000");        message.getMessageProperties().setDelay(6000);        return message;      }    }, correlationData);  }}

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同于我們手動設置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) {  if (delay == null || delay < 0) {    this.headers.remove(X_DELAY);  }  else {    this.headers.put(X_DELAY, delay);  }}

消費端進行消費

import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver {  @RabbitListener(queues = "MQ.LazyQueue")  @RabbitHandler  public void onLazyMessage(Message msg, Channel channel) throws IOException{    long deliveryTag = msg.getMessageProperties().getDeliveryTag();    channel.basicAck(deliveryTag, true);    System.out.println("lazy receive " + new String(msg.getBody()));  }

測試結果

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest {  @Autowired  private MQSender mqSender;  @Test  public void sendLazy() throws Exception {    String msg = "hello spring boot";    mqSender.sendLazy(msg + ":");  }}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

關于RabbitMQ中怎么實現延遲隊列就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

米脂县| 广平县| 富宁县| 西贡区| 岫岩| 黄冈市| 阳春市| 海安县| 吐鲁番市| 册亨县| 盱眙县| 连州市| 噶尔县| 南宫市| 柳河县| 万载县| 宜良县| 富民县| 曲松县| 靖边县| 甘洛县| 岗巴县| 衡东县| 沈丘县| 宜春市| 连城县| 枞阳县| 铁力市| 博乐市| 罗甸县| 永城市| 涞源县| 水城县| 安徽省| 泽库县| 吉安市| 拉孜县| 卓资县| 北海市| 堆龙德庆县| 普陀区|