中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

DelayQueue使用方式是什么

發布時間:2021-10-21 13:59:38 來源:億速云 閱讀:195 作者:iii 欄目:編程語言

這篇文章主要講解了“DelayQueue使用方式是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“DelayQueue使用方式是什么”吧!

DelayQueue 顧名思義,它是一個延時隊列

使用方式 :

假設我們生產者提交一個任務,消費者5秒鐘之后才可以執行,那么我們可以把任務定義為如下格式,并實現Delayed接口,其中data是任務存儲的信息。

/**
 * 具體的任務
 * @author wangshixiang
 */
public class Task implements Delayed {
    /**
     * 數據
     */
    private final String data;
    /**
     * 任務執行時間
     */
    private final long time;

    public Task(String data,TimeUnit timeUnit,long time){
        this.data=data;
        this.time=System.currentTimeMillis()+timeUnit.toMillis(time);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long res= time-System.currentTimeMillis();
        return unit.convert(res,TimeUnit.MILLISECONDS);
    }

    public String getData() {
        return data;
    }

    @Override
    public int compareTo(Delayed o) {
        if (o instanceof Task ){
            Task task= (Task) o;
            return (int) (this.time-task.time);
        }
        return 0;
    }
}

定義好任務后,我們需要定義一個任務隊列 QUEUE_TASK,來存儲消息,實現效果為程序運行后 五秒鐘后輸出Hello...

  private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5));
        System.out.println(QUEUE_TASK .take().getData());
    }

使用詳解

  1. Delayed 接口定義:

public interface DeDlayed extends Comparable<Delayed> {

 
    long getDelay(TimeUnit unit);
}

我們發現Delayed接口繼承了Comparable接口,并且有一個getDelay方法,在程序運行的過程中,會調用頭部任務的這個方法,來返回該任務具體還有多長時間可以執行。當我們任務實現這個接口時 可以存儲任務的執行時間,通過執行時間-當前時間 計算出距離執行時間的差值,因此我們Task定義了一個任務的變量,在創建對象時設置任務的執行時間。
2. DelayQueue 延時隊列
首先我們看一下DelayQueue類繼承實現結構圖
DelayQueue使用方式是什么

可以理解為 DelayQueue 是一個帶延遲執行功能的阻塞隊列

深入理解

  • 為什么Delayed接口繼承了Comparable接口 ?

  • DelayQueue是怎么實現只有到預定時間才能取出任務 ?

  • 向隊列里放入一個任務時 發生了什么事情 ?

帶著這幾個問題,我們來看一下DelayQueeu的源碼 首先看一下主要的參數:

    //鎖
    private final transient ReentrantLock lock = new ReentrantLock();
    //優先級隊列 執行時間最早的排在第一個
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //是否有線程在等待任務到執行時間
    private Thread leader;
    //條件喚醒
    private final Condition available = lock.newCondition();

那么我們先看add(E e)方法 ,任務入隊列時做了哪些操作

    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

入隊列時做了一下步驟:

  1. 獲取鎖

  2. 放入元素 (放入優先級隊列)

  3. 如果自己排在第一個 則原來標記的leader線程已經失效 直接設置為null,并喚醒消費者

  4. 釋放鎖

接下來在看出隊列時take()方法做了哪些操作

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
            //我拿到元素了 喚醒其他的線程
                available.signal();
            lock.unlock();
        }
    }

出隊列做了如下步驟:

  1. 獲取鎖(可中斷的鎖 獲取這種鎖允許其他線程中斷此線程)

  2. 取出第一個元素 如果第一個元素為空 則直接 await(),等待被喚醒(如放隊列時的喚醒)

  3. 如果第一個元素不為空,查看是否到執行時間,如果沒有到執行時間 查看是否有leader已經注意到這個任務 如果他注意到這個任務 我直接await()。如果沒人注意,那么我就把自己設置為leader然后設置帶時間的await()。

  4. 睡眠到執行時間后 醒來后查看leader是否還是自己 如果是的話 取消自己的leader身份。然后在嘗試獲取任務。

  5. 如果我獲取到了符合要求的元素,那么我應該喚醒大家 來一塊競爭獲取下一個元素。

帶時間的出隊列方法 E poll(long timeout, TimeUnit unit) 的實現邏輯與take()方法的唯一區別就是。只有當自己剩余等待時間大于第一個元素剩余執行時間時 才允許把自己設置為leader

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0L)
                        return null;
                    else
                        //睡眠等待時間 有可能提前返回 那么返回的是剩余等待時間
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    if (nanos <= 0L)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                     //如果剩余等待時間比第一個元素剩余執行時間還短 那么應該睡剩余等待時間
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            //計算剩余等待時間 
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

應用場景:

在大多數業務場景中,我們會利用中間件提供的延時消息的功能。比如利用redis zset實現 ,kafka rabbit mq 的延時隊列。我們需要根據我們的業務場景,來選擇合適的中間件。

  1. 訂單超時未支付取消.

  2. 調用其他系統時失敗間隔重試.

  3. 調用第三方接口時,過段時間異步獲取結果。

感謝各位的閱讀,以上就是“DelayQueue使用方式是什么”的內容了,經過本文的學習后,相信大家對DelayQueue使用方式是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄浦区| 肥城市| 安宁市| 太和县| 惠州市| 额尔古纳市| 义马市| 黑河市| 鹿泉市| 秭归县| 涞水县| 云阳县| 武山县| 宁强县| 陵川县| 通渭县| 丹东市| 丰宁| 沁阳市| 城口县| 山东| 嵊泗县| 内乡县| 玛曲县| 安义县| 丽水市| 瓦房店市| 朝阳区| 宝清县| 赣榆县| 徐州市| 大兴区| 渝北区| 阜新市| 墨玉县| 慈溪市| 临泽县| 新巴尔虎左旗| 浙江省| 盐边县| 尼勒克县|