中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java如何實現異步延遲隊列

發布時間:2023-03-22 10:49:50 來源:億速云 閱讀:134 作者:iii 欄目:開發技術

這篇文章主要介紹“Java如何實現異步延遲隊列”,在日常操作中,相信很多人在Java如何實現異步延遲隊列問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java如何實現異步延遲隊列”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1.應用場景

目前系統中有很多需要用到延時處理的功能:支付超時取消、排隊超時、短信、微信等提醒延遲發送、token刷新、會員卡過期等等。通過延時處理,極大的節省系統的資源,不必輪詢數據庫處理任務。

目前大部分功能通過定時任務完成,定時任務還分使用quartz及xxljob兩種類型輪詢時間短,每秒執行一次,對數據庫造成一定的壓力,并且會有1秒的誤差。輪詢時間久,如30分鐘一次,03:01插入一條數據,正常3:31執行過期,但是3:30執行輪詢時,掃描3:00-3:30的數據,是掃描不到3:31的數據的,需要4:00的時候才能掃描到,相當于多延遲了29分鐘!

2.延時處理方式調研

1.DelayQueue

1.實現方式:

jvm提供的延遲阻塞隊列,通過優先級隊列對不同延遲時間任務進行排序,通過condition進行阻塞、睡眠dealy時間 獲取延遲任務。

當有新任務加入時,會判斷新任務是否是第一個待執行的任務,若是,會解除隊列睡眠,防止新加入的元素時需要執行的元素而不能正常被執行線程獲取到。

2.存在的問題:

1.單機運行,系統宕機后,無法進行有效的重試

2.沒有執行記錄和備份

3.沒有重試機制

4.系統重啟時,會將任務清空!

5.不能分片消費

3.優勢:實現簡單,無任務時阻塞,節省資源,執行時間準確

2.延遲隊列mq

實現方式:依賴mq,通過設置延遲消費時間,達到延遲消費功能。像rabbitMq、jmq都可以設置延遲消費時間。RabbitMq通過將消息設置過期時間,放入死信隊列進行消費實現。

存在的問題:

1.時間設置不靈活,每個queue是固定的到期時間,每次新創建延時隊列,需要創建新的消息隊列

優點:依靠jmq,可以有效的監控、消費記錄、重試,具備多機同時消費能力,不懼怕宕機

3.定時任務

通過定時任務輪詢符合條件的數據

缺點:

1.必須要讀業務數據庫,對數據庫造成一定的壓力,

2.存在延時

3.一次掃描數據量過大時,占用過多的系統資源。

4. 無法分片消費

優點:

1.消費失敗后,下次還能繼續消費,具備重試能力,

2.消費能力穩定

4.redis

任務存儲在redis中,使用redis的 zset隊列根據score進行排序,程序通過線程不斷獲取隊列數據消費,實現延時隊列

優點:

1、查詢redis相比較數據庫快,set隊列長度過大,會根據跳表結構進行查詢,效率高

2、redis可根據時間戳進行排序,只需要查詢當前時間戳內的分數的任務即可

3、無懼機器重啟

4、分布式消費

缺點:

1.受限于redis性能,并發10W

2.多個命令無法保證原子性,使用lua腳本會要求所有數據都在一個redis分片上。

5. 時間輪

通過時間輪實現的延遲任務執行,也是基于jvm單機運行,如kafka、netty都有實現時間輪,redisson的看門狗也是通過netty的時間輪實現的。

缺點:不適合分布式服務的使用,宕機后,會丟失任務。

3.實現目標

兼容目前在使用的異步事件組件,并提供更可靠,可重試、有記錄、可監控報警、高性能的延遲組件。

•消息傳輸可靠性:消息進入到延遲隊列后,保證至少被消費一次。

•Client支持豐富:支持多重語言。

•高可用性:支持多實例部署。掛掉一個實例后,還有后備實例繼續提供服務。

•實時性:允許存在一定的時間誤差。

•支持消息刪除:業務使用方,可以隨時刪除指定消息。

•支持消費查詢

•支持手動重試

•對當前異步事件的執行增加監控

4.架構設計

Java如何實現異步延遲隊列

5.延遲組件實現方式

1.實現原理

目前選擇使用jimdb通過zset實現延時功能,將任務id和對應的執行時間作為score存在在zset隊列中,默認會按照score排序,每次取0-當前時間內的score的任務id,

發送延遲任務時,會根據時間戳+機器ip+queueName+sequence 生成唯一的id,構造消息體,加密后放入zset隊列中。

通過搬運線程,將達到執行時間的任務移動到發布隊列中,等待消費者獲取。

監控方通過集成ump

消費記錄通過redis備份+數據庫持久化完成。

通過緩存實現的方式,只是實現的一種,可以通過參數控制使用哪一種實現方式,并可通過spi自由擴展。

2.消息結構

每個Job必須包含一下幾個屬性:

•Topic:Job類型,即QueueName

•Id:Job的唯一標識。用來檢索和刪除指定的Job信息。

•Delay:Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)

•Body:Job的內容,供消費者做具體的業務處理,以json格式存儲。

•traceId:發送線程的traceId,待后續pfinder支持設置traceId后,可與發送線程公用同一個traceiD,便于日志追蹤

具體結構如下圖表示:

Java如何實現異步延遲隊列

TTR的設計目的是為了保證消息傳輸的可靠性。

3.數據流轉及流程圖

Java如何實現異步延遲隊列

基于redis-disruptor方式進行發布、消費,可以作為消息來進行使用,消費者采用原有異步事件的disruptor無鎖隊列消費,不同應用、不同queue之間無鎖

1.支持應用只發布,不消費,達到消息隊列的功能。

2:支持分桶,針對大key問題,若事件多,可以設置延遲隊列和任務隊列桶的數量,減小因大key造成的redis阻塞問題。

3: 通過ducc配置,進行性能的擴展,目前只支持開啟消費和關閉消費。

4: 支持設置超時時間配置,防止消費線程執行過久

瓶頸: 消費速度慢,生產速度過快,會導致ringbuffer隊列占滿,當前應用既是生產者也是消費者時,生產者會休眠,性能取決于消費速度,可通過水平擴展機器,直接提升性能。監控redis隊列的長度,若不斷增長,可考慮增加消費者,直接提高性能。

可能出現的情況: 因一個應用公用一個disruptor,擁有64個消費者線程,如果某一個事件消費過慢,導致64個線程都在消費這個事件,會導致其他事件無消費線程消費,生產者線程也被阻塞,導致所有事件的消費都被阻塞。

后期觀察是否有這個性能瓶頸,可給每一個queue一個消費者線程池。

6.demo示例

增加配置文件

判斷是否開啟jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>?

配置

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消費代碼

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費:{}", key);
}
}

注解形式

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費:{}", key);
}
}?

發送代碼

package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("發送無延遲消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("發送延遲5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("發送延遲到2022-04-02 00:00:00執行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 

}?

到此,關于“Java如何實現異步延遲隊列”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

镇平县| 泽库县| 财经| 禹城市| 新乡市| 运城市| 迁西县| 通山县| 乌什县| 大丰市| 宣武区| 沅陵县| 延长县| 宝坻区| 云和县| 荆州市| 湟源县| 外汇| 中西区| 抚松县| 池州市| 蓬溪县| 丹江口市| 潼关县| 平武县| 微山县| 明水县| 宝坻区| 江达县| 婺源县| 阳城县| 上林县| 镇江市| 墨竹工卡县| 安溪县| 张家港市| 福海县| 尤溪县| 达日县| 鄂伦春自治旗| 抚顺市|