中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何設置RabbitMQ延遲隊列

發布時間:2021-06-15 14:52:50 來源:億速云 閱讀:461 作者:小新 欄目:大數據

小編給大家分享一下如何設置RabbitMQ延遲隊列,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

延遲消費。比如:用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單;用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。

如何設置RabbitMQ延遲隊列

rabbitmq的消息TTL和死信Exchange結合

1.消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。

2.Dead Letter Exchanges

Exchage的概念在這里就不在贅述。一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。

①.一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。

②. 上面的消息的TTL到了,消息過期了。

③. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。

Dead Letter Exchange其實就是一種普通的exchange,和創建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

3.實現延遲隊列

我們先設置好各個配置的字符串

public interface TestMq {/**     * 隊列名     */    String TEST_QUEUE = "test";;    /**     * 服務添加routing key     */    String ROUTING_KEY_TEST = "post.test";    /**     * 死信隊列     */    String DEAD_QUEUE = "dead";    String ROURING_KEY_DEAD = "dead.routing.key";    String MQ_EXCHANGE_DEAD = "dead.exchange";}

配置信息

/** * rabbitmq配置 * */@Configurationpublic class RabbitmqConfig {   /**    * 死信隊列    * @return    */   @Bean   public Queue deadQueue() {
      Map<String,Object> arguments = new HashMap<>();      //此處填入死信交換機      arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD);      //此處填入消息隊列的路由,而非死信隊列自己的路由      arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST);      return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments);   }   /**    * 死信交換機    * @return    */   @Bean   public DirectExchange deadExchange() {      return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD);   }   /**    * 綁定死信隊列到死信交換機    * @return    */   @Bean   public Binding bindingDeadExchange() {      return BindingBuilder.bind(deadQueue()).to(deadExchange())
            .with(TestMq.ROURING_KEY_DEAD);   }   /**    * 被消費者偵聽的獲取消息的隊列    * @return    */   @Bean   public Queue testQueue() {      return new Queue(TestMq.TEST_QUEUE,true,false,false);   }   /**    * 將消息隊列綁定到死信交換機,跟死信隊列的路由不同    * @return    */   @Bean   public Binding bindingTest() {      return BindingBuilder.bind(testQueue()).to(deadExchange())
            .with(TestMq.ROUTING_KEY_TEST);   }

}

消息生產者

@Slf4j@Componentpublic class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired    private RabbitTemplate rabbitTemplate;    public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        MessagePostProcessor processor = message -> {//給消息設置的過期時間,我們這里為10秒            message.getMessageProperties().setExpiration(10000 + "");            return message;        };        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);    }/**     * 確認后回調:     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {if (!ack) {log.info("send ack fail, cause = " + cause);        } else {log.info("send ack success");        }
    }/**     * 失敗后return回調:     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);    }/**     * 對消息對象進行二進制序列化     * @param o     * @return     */    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();        ByteArrayOutputStream stream = new ByteArrayOutputStream();        Output output = new Output(stream);        kryo.writeObject(output, o);        output.close();        return stream.toByteArray();    }
}

消費者

@Slf4j@Component@RabbitListener(queues = TestMq.TEST_QUEUE)public class TestConsumer {@RabbitHandler    public void receice(byte[] data, Channel channel, Message message) throws IOException {try {//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉;否則消息服務器以為這條消息沒處理掉 后續還會在發            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            Integer orderNo = unSerialize(data);            log.info(orderNo + "為收到的消息");        } catch (IOException e) {
            e.printStackTrace();            //丟棄這條消息            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);            log.info("receiver fail");        }
    }/**     * 反序列化     * @param data     * @return     */    private Integer unSerialize(byte[] data) {
        Input input = null;        try {
            Kryo kryo = new Kryo();            input = new Input(new ByteArrayInputStream(data));            return kryo.readObject(input,Integer.class);        }finally {
            input.close();        }
    }
}

我們隨便寫個測試

@Servicepublic class TestService {@Autowired    private TestSender sender;    @PostConstruct    public void test() {//此處順序為死信交換機,死信隊列路由,消息        sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1);    }
}

經測試

2019-10-11 17:26:18.079  INFO 879 --- [           main] c.g.rabbitdelay.config.TestSender        : send content=1
2019-10-11 17:26:18.098  INFO 879 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227  INFO 879 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337  INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender        : send ack success
2019-10-11 17:26:18.446  INFO 879 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751  INFO 879 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959  INFO 879 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962  INFO 879 --- [           main] c.g.rabbitdelay.RabbitdelayApplication   : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342  INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer    : 1為收到的消息

通過日志可以看到,發送消息是18秒,收到消息消費為28秒,中間隔了10秒鐘。

以上是“如何設置RabbitMQ延遲隊列”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄平县| 陆良县| 凉城县| 嘉义市| 宝清县| 孝昌县| 江门市| 高要市| 西充县| 蕲春县| 富锦市| 屏边| 利津县| 武清区| 新蔡县| 景泰县| 靖西县| 永登县| 罗江县| 芷江| 调兵山市| 剑阁县| 广南县| 西城区| 松原市| 南靖县| 伊春市| 福海县| 南漳县| 郴州市| 恩平市| 新巴尔虎左旗| 祁东县| 巴南区| 老河口市| 菏泽市| 抚宁县| 河北省| 桐柏县| 阿拉善左旗| 大港区|