您好,登錄后才能下訂單哦!
這篇文章主要介紹了Redis中如何使用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
說到消息隊列中間件,我們都會想到RabbitMQ、RocketMQ和Kafka,來給應用實現異步消息傳遞的功能。這些都是專業的消息隊列中間件,其特性之多超出了我們的理解能力。
而這些消息中間件使用起來的是復雜的,例如RabbitMQ,發消息之前要創建Exchange,還要創建Queue,然后將Exchange和Queue通過某種規則綁定起來,發送消息的時候還要制定routing-key,還要 控制頭消息。這僅是生產者,消費者在消費消息之前也要將上面一系列的繁瑣步驟再操作一遍。
那么對于那些并不要求百分百可靠,并且希望實現簡單的消息隊列需求時,我們可以通過Redis將我們從消息隊列的中間件的繁瑣步驟中解脫出來。
Redis的消息隊列不是專業的消息隊列,他并沒有消息隊列中許多的高級特性,也沒有ack保證。如果對消息的可靠性有著極致的追求,請轉向專業的MQ中間件。
從最簡單的異步消息隊列開始,Redis的list數據結構常用來作為異步消息隊列,通過lrpush/lpush來操作入列,通過rpop/lpop來出列。
對于pop操作來說,當消息隊列空了的時候,客戶端會陷入pop的死循環,造成大量的浪費生命的空輪詢,導致客戶端CPU拉高,同時Redis的QPS也被拉高。
對于以上問題的解決辦法就是通過list結構的blpop/brpop來操作出列,其中b前綴代表的就是blocking,阻塞讀。對于阻塞讀在隊列沒有數據的時候就會進入休眠狀態,一旦數據到來就會立刻醒來。完美的解決了上面這個問題。
阻塞讀的方案看似完美,緊接著引出了另外一個問題:空閑連接。 如果線程一直阻塞在哪哪里,Redis的客戶端連接就變成了空閑連接。空閑時間過長,Redis服務器就會主動斷開連接,以減少閑置資源占用。這時候blpop/brpop就會拋出異常來。
所以,我們在編寫客戶端(應用程序)消費者的時候需要小心,注意捕獲異常,并進行重試。
在Redis的分布式鎖中一般有三種策略來處理加鎖失敗的情況:
直接拋出異常,前端提醒用戶是否要繼續操作;
sleep一會再重試;
將請求放到延時隊列中,一會再重試;
而Redis中延時隊列,我們可以通過zset(有序列表)數據結構來實現。我們將消息序列化作為一個字符串作為zse的value,而消息的到期處理時間(延時時間)作為score。然后通過輪詢zset獲取到期時間進行處理,通過zrem將key從zset移除代表成功消費,進而處理任務。
核心代碼如下:
// 生產\ public void delay(T msg) {\ TaskItem task = new TaskItem();\ task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\ task.msg = msg;\ String s = JSON.toJSONString(task); // fastjson 序列化\ jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 后再試\ }\ // 消費\ public void loop() {\ while (!Thread.interrupted()) {\ // zrangeByScore參數中0, System.currentTimeMills()代表從redis中去score范圍在0到系統當前時間的數據, 0,1表示從0開始取1個 拓展傳入的score為-inf, +inf 分別表示zset中的最大值和最小值,當你不知道zset中的score最值時就可以使用inf作為參數變量\ Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\ if (values.isEmpty()) {\ try {\ Thread.sleep(500); // 歇會繼續\ }\ catch (InterruptedException e) {\ break;\ }\ continue;\ }\ String s = values.iterator().next(); //消費隊列\ if (jedis.zrem(queueKey, s) > 0) { // 搶到了,要考慮到多線程下鎖爭搶的情況,只有rem成功代表成功的消費了一條消息。\ TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\ this.handleMsg(task.msg);\ }\ }\ }
以上的代碼在多線程中對于同一個任務被多個線程爭搶的情況,雖然能夠通過zrem后在處理任務來避免一個任務被多次消費的情況。但是對于那些獲取到了任務但是沒有成功消費的線程來說,都是白白浪費時間取了一次任務。所以可以考慮通過lua scripting來優化這個邏輯。將zrangeByScore和zrem一同挪到服務器進行原子操作,就能夠完美解決了。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Redis中如何使用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。