中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JUC的SynchronousQueue如何使用

發布時間:2021-12-21 10:21:39 來源:億速云 閱讀:186 作者:iii 欄目:大數據

本篇內容主要講解“JUC的SynchronousQueue如何使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“JUC的SynchronousQueue如何使用”吧!

SynchronousQueue 從真正意義上來說并不能算是一個隊列,而將其理解為一個用于線程之間通信的組件更為恰當。SynchronousQueue 沒有容量的概念,一個線程在執行完入隊列操作之后,必須等待另外一個線程與之匹配完成出隊列后方可繼續再次入隊列,反之亦然。此外,有別于我們通常理解的隊列中的結點只承載元素,SynchronousQueue 中的結點還需要附著對應的操作線程,這些線程在對應的結點上等待被匹配(fulfill)。

SynchronousQueue 實現自 BlockingQueue 接口,底層基于 LockSupport 工具類 實現線程的阻塞和喚醒操作,并依賴 CAS 保證線程安全。在構造 SynchronousQueue 對象時,允許通過參數指定是否啟用公平模式。SynchronousQueue 基于 Dual Stack 數據結構實現非公平的線程通信,基于 Dual Queue 數據結構實現公平的線程通信。SynchronousQueue 的公平模式因為減少了線程之間的沖突,在競爭頻繁的場景下反而具備更高的性能,而非公平模式能夠更好的維持線程局部性(thread locality),減少線程上下文切換的開銷。

SynchronousQueue 示例

本小節我們以“生產者-消費者”示例演示 SynchronousQueue 的基本使用,在示例中我們設置了一個生產者和兩個消費者,以展示 SynchronousQueue 公平性特征。示例實現如下(省略了異常處理):

private static BlockingQueue<Integer> queue = new SynchronousQueue<>(true);

private static class Producer implements Runnable {

    @Override
    public void run() {
        int count = 0;
        while (count < 10) {
            int val = count++;
            System.out.println("Producer produce: " + val);
            queue.put(val);
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

private static class Consumer implements Runnable {

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Consumer " + Thread.currentThread().getName() + " consume: " + queue.take());
        }
    }
}

public static void main(String[] args) {
    Thread producer = new Thread(new Producer());
    Thread consumer1 = new Thread(new Consumer());
    Thread consumer2 = new Thread(new Consumer());
    consumer1.setName("A");
    consumer2.setName("B");

    producer.start();
    consumer1.start();
    consumer2.start();
}

運行輸出如下:

Producer produce: 0
Consumer A consume: 0
Producer produce: 1
Consumer A consume: 1
Producer produce: 2
Consumer B consume: 2
Producer produce: 3
Consumer A consume: 3
Producer produce: 4
Consumer B consume: 4
Producer produce: 5
Consumer A consume: 5
Producer produce: 6
Consumer B consume: 6
Producer produce: 7
Consumer A consume: 7
Producer produce: 8
Consumer B consume: 8
Producer produce: 9
Consumer A consume: 9

可以看到,當生產者往 SynchronousQueue 中插入一個元素之后,生產者線程會等待消費者完成消費,而消費者線程在完成消費之后會等待生產者生產。SynchronousQueue 的公平性特性盡可能保證了消費者 A 和 B 能夠交替執行消費操作。

在上述示例中,如果我們將 Producer 入隊列的方法由 put 改為 offer,那么在 Consumer 入隊列成功之前,Producer 始終不能入隊列成功,這對于一般的隊列而言顯得有些奇怪。實際上,這里說的不能成功入隊列不夠準確,要知道 offer 是一類帶有超時機制的方法,也就是說當 Producer 在將某個元素執行入隊列之后,它希望有一個 Consumer 能夠在自己期望的時間內與該元素進行匹配,否則就只能返回 false,從表象上來看就是沒有入隊列成功。實際應用中我們需要考慮此類表象是否符合自己的業務場景,如果不滿足則可以考慮使用 put 方法執行入隊列操作。

核心方法實現

SynchronousQueue 實現自 BlockingQueue 接口,但并未對接口中聲明的方法全部支持,例如 SynchronousQueue 的 SynchronousQueue#peek 方法就始終返回 null,在使用時推薦先閱讀 API 文檔,避免影響程序的正確性。本文主要分析 SynchronousQueue 的實現機制,所以下面重點來看一下 SynchronousQueue 已實現的出隊列和入隊列操作。

前面我們提及到 SynchronousQueue 內部基于 Dual Stack 和 Dual Queue 數據結構實現,在 SynchronousQueue 中定義了一個 Transferer 抽象類,該類抽象了 Dual Stack 和 Dual Queue 數據結構的實現,定義如下:

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

SynchronousQueue 的出隊列和入隊列操作均委托給 Transferer#transfer 方法執行(如下),該方法接收 3 個參數,其中參數 e 表示待添加到隊列中的元素值,對于出隊列操作來說,e 始終等于 null;參數 timed 用于設置當前操作是否具備超時策略,如果是則需要使用參數 nanos 參數指定超時時間。

  • SynchronousQueue#put(E e) -> transferer.transfer(e, false, 0)

  • SynchronousQueue#offer(E) -> transferer.transfer(e, true, 0)

  • SynchronousQueue#offer(E, long, TimeUnit) -> transferer.transfer(e, true, unit.toNanos(timeout))

  • SynchronousQueue#take -> transferer.transfer(null, false, 0)

  • SynchronousQueue#poll() -> transferer.transfer(null, true, 0)

  • SynchronousQueue#poll(long, TimeUnit) -> transferer.transfer(null, true, unit.toNanos(timeout))

針對 Dual Stack 和 Dual Queue 數據結構,SynchronousQueue 分別定義了 TransferStack 和 TransferQueue 實現類,下面的小節將針對這兩個類的實現機制展開分析。

在開始之前,我們先對 匹配 一詞在 SynchronousQueue 中的含義進行解釋,在下面的章節中將多次提及匹配的概念。我們大致已經了解到 SynchronousQueue 在內部基于棧或隊列實現線程間的交互,以“生產者-消費者”為例,如果使用的是棧結構(隊列亦如此),當生產者往 SynchronousQueue 中插入一個元素時,該生產者線程在插入成功之后并不會立即返回,而是等待消費者前來消費。當消費者執行消費時發現棧上正好有生產者在等待,于是執行消費邏輯,也稱為開始執行匹配(fulfill)進程,將當前消費者與生產者匹配成一對兒紛紛出棧。

Dual Stack

針對 Dual Stack 數據結構,SynchronousQueue 實現了 TransferStack 類。TransferStack 繼承自 Transferer 抽象類,并定義了 SNode 類描述棧上的結點。針對結點的運行模式,TransferStack 定義了 3 個 int 類型的常量字段予以描述,如下:

  1. REQUEST:標識未匹配的消費者結點。

  2. DATA:標識未匹配的生產者結點。

  3. FULFILLING:標識結點正在執行匹配操作。

棧在運行期間要么為空,要么存放著一個或多個未匹配的消費者結點或生產者結點,對應的消費者或生產者線程依附在具體的結點上等待。一個棧上不可能同時共存未匹配的消費者結點和未匹配的生產者結點,也就是說同一時間棧上所有結點的運行模式(即 SNode#mode 字段值)都應該是一致的,除了棧頂結點可能會因為正在執行匹配進程而附加 FULFILLING 狀態。

SNode 類的字段定義如下:

static final class SNode {
    /** 后繼指針 */
    volatile SNode next;        // next node in stack
    /** 記錄匹配的結點,如果當前結點被取消,則指向自己 */
    volatile SNode match;       // the node matched to this
    /** 在當前結點上等待的線程對象 */
    volatile Thread waiter;     // to control park/unpark
    /** 結點元素值,如果是消費者結點則為 null */
    Object item;                // data; or null for REQUESTs
    /**
     * 結點運行模式:
     * - 0:代表消費者結點
     * - 1:代表生產者結點
     * - (2 | 0) or (2 | 1):代表結點正在或已被匹配
     */
    int mode;

    // ... 省略方法實現

}

各字段的含義如代碼注釋,我們將在下面分析 TransferStack#transfer 方法實現時一并分析 SNode 中定義的方法,并對各個字段的含義結合具體場景做進一步介紹。

前面在介紹 Transferer 抽象類時,我們知道該抽象類僅聲明了一個方法,即 Transferer#transfer 方法,該方法也是整個 SynchronousQueue 中最核心的實現。在開始分析 TransferStack 之于該方法的實現之前,我們先從整體出發,感知一下 TransferStack 的運行流程。

以“生產者-消費者”為例,假設當前有 3 個生產者依次執行往 SynchronousQueue 中插入元素,執行的順序為 1 -> 2 -> 3,則入棧之后得到的棧結構如下:

 3 -> 2 -> 1 -> null
 ↓
head

入棧后的 3 個生產者線程將在棧對應結點上等待。如果來了一個消費者執行出隊列操作,此時消費者將與 head 結點上的生產者進行匹配,匹配成功之后得到的棧結構如下:

 2 -> 1 -> null
 ↓
head

此時剩下的生產者線程將繼續等待,期間可以允許新的消費者出隊列,也可以允許新的生產者入隊列。

上述過程就是 TransferStack#transfer 方法的核心執行邏輯,對此有了一個大概的感知之后,下面來深入分析 TransferStack#transfer 方法的具體實現。實際上在 TransferStack#transfer 方法的開頭,作者已經對整個方法的運行流程給出了直觀的概括,摘錄如下:

  1. If apparently empty or already containing nodes of same mode, try to push node on stack and wait for a match, returning it, or null if cancelled.

  2. If apparently containing node of complementary mode, try to push a fulfilling node on to stack, match with corresponding waiting node, pop both from stack, and return matched item. The matching or unlinking might not actually be necessary because of other threads performing action 3:

  3. If top of stack already holds another fulfilling node, help it out by doing its match and/or pop operations, and then continue. The code for helping is essentially the same as for fulfilling, except that it doesn't return the item.

方法 TransferStack#transfer 實現如下:

E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed

    // 操作模式判定,如果為 null 則說明當前是出隊列操作,否則說明是入隊列操作
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {
        SNode h = head;
        // 1. 如果棧為空,或者包含相同模式的結點,將結點入棧等待匹配
        if (h == null || h.mode == mode) {   // empty or same-mode
            // 如果設置超時且到期
            if (timed && nanos <= 0) {       // can't wait
                // 如果 head 結點被取消,則后移 head 指針
                if (h != null && h.isCancelled()) {
                    this.casHead(h, h.next); // pop cancelled node
                } else {
                    // 否則返回 null
                    return null;
                }
            }
            // 否則,說明當前線程需要在棧上等待,先創建一個結點入棧,之后對應的線程會在該結點上等待
            else if (this.casHead(h, s = snode(s, e, h, mode))) {
                // 等待結點被匹配或取消,返回的是與當前結點匹配的結點,或者結點自己(即結點被取消)
                SNode m = this.awaitFulfill(s, timed, nanos);
                // 如果返回的是結點自己,則說明是被取消了
                if (m == s) {               // wait was cancelled
                    // 清理無效結點
                    this.clean(s);
                    return null;
                }

                /* 當前結點被匹配了 */

                // 與 s 匹配的結點就是 head 結點,將 s 和 m 出棧,這里只是一個優化,不影響程序執行的正確性
                if ((h = head) != null && h.next == s) {
                    this.casHead(h, s.next); // help s's fulfiller
                }

                // 如果是出隊列則返回匹配結點的元素值,如果是入隊列則返回新添加的結點元素值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        }
        // 2. 棧中包含互補模式的結點,且 head 結點不處于 FULFILLING 狀態,執行匹配操作
        else if (!isFulfilling(h.mode)) {  // try to fulfill
            // 頭結點已經被取消,則后移 head 指針后重試
            if (h.isCancelled()) {         // already cancelled
                this.casHead(h, h.next);   // pop and retry
            }
            // 入隊一個帶有 FULFILLING 標志的新結點 s,同一時間棧中最多只有一個帶有 FULFILLING 標志的結點,且該結點一定是 head 結點
            else if (this.casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
                for (; ; ) {               // loop until matched or waiters disappear
                    // 獲取本次與 s 結點執行匹配的結點,也就是 s 的 next 結點
                    SNode m = s.next;      // m is s's match
                    // 如果待匹配的結點為 null,說明已經被其它線程取消
                    if (m == null) {       // all waiters are gone
                        // 將結點 s 出隊列,并退出循環
                        this.casHead(s, null);   // pop fulfill node
                        s = null;                // use new node next time
                        break;                   // restart main loop
                    }
                    // 如果待匹配的結點不為 null,則嘗試執行匹配
                    SNode mn = m.next;
                    if (m.tryMatch(s)) { // 嘗試將結點 m 的 match 指針指向結點 s
                        // 匹配成功,修改頭結點為已匹配結點 m 的 next 結點
                        this.casHead(s, mn);     // pop both s and m
                        // 如果是出隊列則返回已匹配結點的元素值,如果是入隊列則返回新添加的結點元素值
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else {                // lost match
                        // 匹配失敗,說明結點 m 被取消,繼續嘗試匹配 m 的 next 結點
                        s.casNext(m, mn);   // help unlink
                    }
                }
            }
        }
        // 3. 棧中包含互補模式的結點,且 head 結點處于 FULFILLING 狀態
        else {                            // help a fulfiller
            SNode m = h.next;             // m is h's match
            if (m == null) {              // waiter is gone
                this.casHead(h, null);    // pop fulfilling node
            } else {
                SNode mn = m.next;
                if (m.tryMatch(h)) {      // help match
                    this.casHead(h, mn);  // pop both h and m
                } else {                  // lost match
                    h.casNext(m, mn);     // help unlink
                }
            }
        }
    }
}

上述實現中 for 循環內部的 if ... else if ... else 控制結構分別對應作者給出的 3 段注釋(已在代碼中標出),其中場景 3 主要是對場景 2 的輔助,下面重點分析場景 1 和場景 2 的實現和執行流程。

首先來看一下 場景 1 ,此時棧為空,或者棧中等待的線程運行模式與當前線程的運行模式相同,此時需要將結點入棧,并讓當前線程在結點上等待。執行流程可以概括為:

  1. 如果設置了超時且已經到期,則順帶判斷 head 結點是否被取消,如果是則后移 head 指針并進入下一輪循環,否則返回 null;

  2. 否則新建一個包含待添加元素 e 的結點入棧,并執行 TransferStack#awaitFulfill 方法讓當前線程在該結點上等待匹配(或被取消);

  3. 如果在等待期間被取消,則清理棧上的無效結點,并返回 null;

  4. 否則說明結點被成功匹配,如果當前線程是消費者線程則返回匹配結點的元素值,如果當前線程是生產者線程則返回剛剛添加的元素值。

下面利用圖示演示上述執行流程。假設當前操作線程是一個生產者,期望將元素 3 插入到 SynchronousQueue 中,并且當前棧中已經包含兩個處于等待狀態的生產者(如下圖 1 所示)。因為當前線程與棧中等待的線程模式相同(均為 DATA),所以新建一個元素值為 3 的結點入棧(如下圖 2 所示),并讓當前線程在結點上等待。

JUC的SynchronousQueue如何使用

繼續分析讓線程等待的 TransferStack#awaitFulfill 方法,線程會阻塞(或自旋)在該方法上等待被匹配,實現如下:

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 如果設置了超時,則計算到期時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 計算自旋的次數
    int spins = (this.shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        // 從阻塞中醒來,先檢查期間是否被中斷
        if (w.isInterrupted()) {
            // 如果被中斷,則將結點的 match 指針指向自己,表示結點被取消
            s.tryCancel();
        }

        // 獲取與當前結點匹配的結點,要么是結點自己,要么就是某個與之匹配的結點,只要不為 null 就返回
        SNode m = s.match;
        if (m != null) {
            return m;
        }

        /* 結點未被匹配或取消 */

        // 如果設置了超時且已經到期,則取消結點
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }

        // 在阻塞之前先自旋幾次,如果 Producer 和 Consumer 之間交互頻繁,自旋相對于阻塞性能更高
        if (spins > 0) {
            spins = this.shouldSpin(s) ? (spins - 1) : 0;
        }
        // 如果結點的 waiter 為空,則設置為當前線程對象
        else if (s.waiter == null) {
            s.waiter = w; // establish waiter so can park next iter
        }
        // 未設置超時,則無限期等待
        else if (!timed) {
            LockSupport.park(this);
        }
        // 設置了超時,則超時等待
        else if (nanos > spinForTimeoutThreshold) {
            LockSupport.parkNanos(this, nanos);
        }
    }
}

上述方法首先會依據是否設置超時來計算剩余的到期時間和自旋次數,然后執行:

  1. 判斷等待期間是否被中斷,如果是則取消當前結點,即將結點的 match 指針指向自己;

  2. 判斷結點的 match 指針是否指向 null,只要不為 null 就說明當前結點被成功匹配或取消(此時 match 指針指向結點自己),返回 match 指針指向的結點;

  3. 否則,說明結點未被匹配或取消,如果設置了超時且已經到期,則取消當前結點,并在下一輪循環中返回;

  4. 在進入阻塞之前,先嘗試自旋幾次;

  5. 如果自旋幾次之后仍然未完成匹配則阻塞等待,依據是否設置超時來決定是無限期等待還是超時等待,并在等待之前判斷當前結點上是否有綁定線程,如果未綁定則將當前線程綁定到該結點上。

由上述實現可以看到,等待的線程并沒有立即阻塞,而是先嘗試自旋了幾次,這主要是考慮生產者和消費者頻繁交互的情況。這類場景下當生產者執行入隊列操作之后馬上會有消費者前來執行出隊列,此時生產者線程無需被阻塞,只需要自旋幾次即被匹配成功,從而避免線程阻塞和喚醒所帶來的性能開銷。如果生產者和消費者交互并不頻繁,因為自旋的次數并不多,所以不會造成太多的 CPU 開銷,幾乎可以忽略。

如果結點在等待期間被取消,則上述方法會將結點的 match 指針指向自己,后續流程會基于該特征識別被取消的結點,并調用 TransferStack#clean 方法執行清理工作,該方法實現如下:

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    // 尋找 s 的最近一個后繼有效(未被取消)結點,作為本次遍歷的哨兵(sentinel)結點
    SNode past = s.next;
    if (past != null && past.isCancelled()) {
        past = past.next;
    }

    // 從頭開始遍歷,將 head 指針指向第一個有效(未被取消)結點
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled()) {
        this.casHead(p, p.next);
    }

    // 從當前有效的頭結點開始遍歷,直到遇到哨兵結點,移除期間遇到的無效結點
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled()) {
            p.casNext(n, n.next);
        } else {
            p = n;
        }
    }
}

清理的過程首先會確立一個哨兵(sentinel)結點,該結點是位于結點 s 之后最近一個有效(未被取消)的結點,然后從棧頂開始遍歷清除那些已經被取消的結點。至于為什么需要設置一個哨兵結點,考慮在并發場景下結點 s 可能已經被其它線程移除,設置哨兵結點能夠避免對整個棧進行遍歷。

接著來看一下 場景 2 ,此時棧中正在等待的線程運行模式與當前線程互補(可以簡單理解為棧中等待的線程都是生產者,而當前線程是消費者),并且此時沒有線程正在執行匹配操作,所以進入匹配進程。本次與當前線程匹配的是 head 結點上的線程,所以首先需要從上至下在棧上找到第一個有效(未被取消)的 head 結點,然后執行:

  1. 創建一個結點元素為 e,附加 FULFILLING 標志的結點 s,并將結點入棧;

  2. 獲取本次待與 s 匹配的結點 m,如果 m 為 null 則說明棧上已經沒有處于等待的結點,需要退出匹配進程并繼續判定接下去進入哪個場景;

  3. 否則,調用 SNode#tryMatch 方法執行匹配操作;

  4. 如果匹配成功則后移 head 指針,并返回(如果當前線程是消費者線程則返回匹配結點的元素值,如果當前線程是生產者線程則返回剛剛添加的元素值);

  5. 如果匹配失敗,說明結點 m 已經被取消,嘗試繼續匹配 m 的后繼結點。

下面利用圖示演示上述執行流程。如下圖 1 所示,假設當前操作線程是一個消費者(圖中黃色結點),期望對 SynchronousQueue 執行出隊列操作,并且當前棧中已經包含兩個處于等待狀態的生產者(圖中青色結點)。因為當前線程與棧中等待的線程模式互補,所以新建一個元素值為 null 的結點入棧(如下圖 2 所示),并附加 FULFILLING 標志(圖中紅色結點)。

JUC的SynchronousQueue如何使用

然后開始執行匹配進程,設置 m 和 mn 指針,如上圖 3 所示。在成功執行完 SNode#tryMatch 方法之后會將結點 m 的 match 指針指向結點 s,表示結點 m 和 s 匹配成功,如上圖 4 所示。

繼續來分析一下執行匹配進程的 SNode#tryMatch 方法,實現如下:

boolean tryMatch(SNode s) {
    // 基于 CAS 將當前結點的 match 字段設置為 s 結點
    if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // waiters need at most one unpark
            waiter = null;
            // 喚醒阻塞在當前結點上的線程
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;
}

匹配的過程核心在于將待匹配結點的 match 指針指向當前操作線程對應的結點。

關于 Dual Stack 的運行機制就介紹這么多,受棧 FILO 特性的約束,基于 Dual Stack 的 SynchronousQueue 始終在棧頂執行入隊列和出隊列操作,后入隊的線程會先被匹配,這也解釋了為什么基于 Dual Stack 的 SynchronousQueue 是非公平的。基于 Dual Stack 的 SynchronousQueue 潛在的一個問題是可能導致先入隊的線程長期得不到匹配而饑餓,而優點在于能夠更好的維持線程局部性(thread locality),減少線程上下文切換的開銷。

Dual Queue

針對 Dual Queue 數據結構,SynchronousQueue 實現了 TransferQueue 類,TransferQueue 同樣繼承自 Transferer 抽象類,并定義了 QNode 類描述隊列上的結點。TransferQueue 定義了 TransferQueue#headTransferQueue#tail 指針字段,分別指向隊列的頭結點和尾結點。

QNode 類的字段定義如下:

static final class QNode {
    /** 后繼指針 */
    volatile QNode next;          // next node in queue
    /** 結點元素值,如果等于結點自己則說明被取消 */
    volatile Object item;         // CAS'ed to or from null
    /** 在當前結點上等待的線程對象 */
    volatile Thread waiter;       // to control park/unpark
    /** 標識當前是消費者結點,還是生產者結點 */
    final boolean isData;

    // ... 省略方法定義

}

各字段的含義如代碼注釋,其中 QNode#isData 字段用于標識對應結點是生產者結點還是消費者結點。不同于 TransferStack 的 SNode 需要使用 SNode#mode 字段描述結點是未匹配的生產者、未匹配的消費者,或者是正在匹配中等狀態,TransferQueue 因為出隊列和入隊列分別在 head 和 tail 結點上執行,所以無需定義專門的字段描述結點的運行模式。我們將在下面分析 TransferQueue#transfer 方法實現時一并分析 QNode 中定義的方法,并對各個字段的含義結合具體場景做進一步介紹。

在開始分析 TransferQueue 之于 Transferer#transfer 方法的實現之前,我們還是先從整體出發,感知一下 TransferQueue 的運行流程。同樣以“生產者-消費者”為例,假設當前有 3 個生產者依次執行往 SynchronousQueue 中插入元素,執行的順序為 1 -> 2 -> 3,則入隊列之后得到的隊列結構如下:

 1 -> 2 -> 3 -> null
 ↓         ↓
head      tail

入隊列后的 3 個生產者線程將在隊列對應結點上等待。如果來了一個消費者執行出隊列操作,此時消費者將與 head 結點上的生產者進行匹配,匹配成功之后得到的隊列結構如下:

 2 -> 3 -> null
 ↓    ↓
head tail

此時剩下的生產者線程將繼續等待,期間可以允許新的消費者出隊列,也可以允許新的生產者入隊列。

上述過程就是 TransferQueue#transfer 方法的核心執行邏輯,對此有了一個大概的感知之后,下面來深入分析 TransferQueue#transfer 方法的具體實現。實際上在 TransferQueue#transfer 方法的開頭,作者同樣已經對整個方法的運行流程給出了直觀的概括,摘錄如下:

  1. If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, wait to be fulfilled (or cancelled) and return matching item.

  2. If queue apparently contains waiting items, and this call is of complementary mode, try to fulfill by CAS'ing item field of waiting node and dequeuing it, and then returning matching item.

方法 TransferQueue#transfer 實現如下:

E transfer(E e, boolean timed, long nanos) {
    QNode s = null;                         // constructed/reused as needed
    // 標識當前是生產模式還是消費模式
    boolean isData = (e != null);

    for (; ; ) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null) {       // saw uninitialized value
            continue;                       // spin
        }

        // 1. 隊列為空,或者包含相同模式的結點,將結點入隊列等待匹配
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 期間有其它線程入隊列,進入下一輪循環重新獲取 tail
            if (t != tail) {                // inconsistent read
                continue;
            }
            // t 不是隊列尾結點,嘗試后移 tail 指針
            if (tn != null) {               // lagging tail
                this.advanceTail(t, tn);
                continue;
            }
            // 設置超時且已到期,則返回 null
            if (timed && nanos <= 0) {      // can't wait
                return null;
            }
            if (s == null) {
                s = new QNode(e, isData);
            }
            // 基于 CAS 將結點 s 添加到隊列末端
            if (!t.casNext(null, s)) {      // failed to link in
                // 添加失敗則重試
                continue;
            }

            /* 將 s 結點入隊列成功 */

            // 后移 tail 指針
            this.advanceTail(t, s);         // swing tail and wait
            // 等待結點被匹配或取消,返回已匹配的結點元素值,或者結點自己(表示已取消)
            Object x = this.awaitFulfill(s, e, timed, nanos);
            // 結點已經被取消
            if (x == s) {                   // wait was cancelled
                // 執行清理工作
                this.clean(t, s);
                return null;
            }

            /* 結點被匹配 */

            // 結點 s 的 next 指針未指向自己,表示結點 s 未出隊列
            if (!s.isOffList()) {           // not already unlinked
                // t 當前是 s 的前驅結點,也是當前的 head 結點,
                // 因為 s 已經匹配,說明 s 前面的結點都已經被匹配
                this.advanceHead(t, s);     // unlink if head
                // 將 s 的 item 指向自己,說明結點被取消
                if (x != null) {            // and forget fields
                    s.item = s;
                }
                s.waiter = null;
            }
            // 如果是出隊列則返回匹配結點的元素值,如果是入隊列則返回新添加的結點元素值
            return (x != null) ? (E) x : e;
        }
        // 2. 隊列中存在互補模式的結點
        else {                             // complementary-mode
            QNode m = h.next;              // node to fulfill
            // 期間有入隊或出隊操作,或者待匹配的結點 m 為 null,則進入下一輪循環
            if (t != tail || m == null || h != head) {
                continue;                  // inconsistent read
            }

            // 獲取結點 m 的元素值
            Object x = m.item;
            // 如果 isData = true,則說明當前操作線程為生產者,期望 m 為消費者,即 x == null
            // 如果 isData = false,則說明當前操作線程為消費者,期望 m 為生產者,即 x != null
            if (isData == (x != null)      // m already fulfilled
                    // 結點 m 已經被取消
                    || x == m              // m cancelled
                    // 嘗試修改結點 m 的元素值為 e 失敗
                    || !m.casItem(x, e)) { // lost CAS
                // 結點 m 已經被匹配,或被取消,或已經被其它線程匹配,則后移 head 指針繼續
                this.advanceHead(h, m);    // dequeue and retry
                continue;
            }

            // 匹配成功,后移 head 指針
            this.advanceHead(h, m);        // successfully fulfilled
            // 喚醒阻塞在匹配結點 m 上的線程
            LockSupport.unpark(m.waiter);
            // 如果是出隊列則返回匹配結點的元素值,如果是入隊列則返回新添加的結點元素值
            return (x != null) ? (E) x : e;
        }
    }
}

上述實現中 for 循環內部的 if ... else 控制結構分別對應作者給出的 2 段注釋(已在代碼中標出),在 for 循環的一開始會判斷 head 或 tail 指針是否為 null,但是在 SynchronousQueue 運行期間正常是不會出現 head 或 tail 指針為 null 的情況,作者在注釋中給出的解釋如下:

The loop starts off with a null check guarding against seeing uninitialized head or tail values. This never happens in current SynchronousQueue, but could if callers held non-volatile/final ref to the transferer. The check is here anyway because it places null checks at top of loop, which is usually faster than having them implicitly interspersed.

下面展開分析場景 1 和場景 2 的實現和執行流程。首先來看一下 場景 1 ,此時隊列為空,或者隊列中等待的線程運行模式與當前線程的運行模式相同,此時需要將結點入隊列,并讓當前線程在結點上等待。執行流程可以概括為:

  1. 因為要入隊列操作,所以要保證 tail 指向隊列真正的尾結點;

  2. 如果設置了超時且已到期,則返回 null;

  3. 否則新建一個包含待添加元素 e 的結點入隊列,如果失敗進入下一輪循環重試,否則后移 tail 指針并調用 TransferQueue#awaitFulfill 方法讓當前線程在該結點上等待匹配(或被取消);

  4. 如果在等待期間被取消,則清理隊列上的無效結點,并返回 null;

  5. 否則說明結點被成功匹配,更新 head 指針,如果當前線程是消費者線程則返回匹配結點的元素值,如果當前線程是生產者線程則返回剛剛添加的元素值。

下面利用圖示演示上述執行流程。假設當前操作線程是一個生產者,期望將元素 3 插入到 SynchronousQueue 中,并且當前棧中已經包含了兩個處于等待狀態的生產者(如下圖 1 所示)。因為當前線程與隊列中等待的線程模式相同(即 isData=true),所以新建一個元素值為 3 的結點入隊列(如下圖 2 所示),并讓當前線程在結點上等待。

JUC的SynchronousQueue如何使用

TransferQueue 實現的讓線程等待的方法 TransferQueue#awaitFulfill 與 TransferStack 中實現的 TransferStack#awaitFulfill 方法在設計和實現思路上相同,這里不再重復介紹。下面來分析一下執行清理工作的 TransferQueue#clean 方法,實現如下(其中 s 是待清理的結點,pred 是 s 的前驅結點):

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        // 從頭開始,將 head 指針指向有效(未被取消)的頭結點
        if (hn != null && hn.isCancelled()) {
            this.advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        // 隊列已經為空,返回
        if (t == h) {
            return;
        }
        QNode tn = t.next;
        // t 不是最新的 tail 結點
        if (t != tail) {
            continue;
        }
        // tail 指針未指向最新的尾結點
        if (tn != null) {
            this.advanceTail(t, tn);
            continue;
        }
        // 如果待刪除的 s 結點不是 tail 結點,直接清理
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn)) {
                return;
            }
        }

        /* 當前待刪除的 s 結點是 tail 結點 */

        QNode dp = cleanMe;
        // cleanMe 非空,此時該 cleanMe 的 next 指針一定不是 tail 結點,清理 cleanMe.next
        if (dp != null) {      // Try unlinking previous cancelled node
            QNode d = dp.next; // d 是需要清理的結點
            QNode dn;          // d 的 next 結點
            if (d == null      // d is gone or
                    || d == dp // d is off list or
                    || !d.isCancelled() // d not cancelled or
                    || (d != t // d not tail and
                    && (dn = d.next) != null  //   has successor
                    && dn != d // that is on list
                    && dp.casNext(d, dn))) { // d unspliced
                // 將 cleanMe 設置為 null
                this.casCleanMe(dp, null);
            }
            if (dp == pred) {
                return;      // s is already saved node
            }
        }
        // cleanMe 為空,需要將 s 結點的 pred 標記為 cleanMe,以后再清理 s 結點
        else if (this.casCleanMe(null, pred)) {
            return;          // Postpone cleaning s
        }
    }
}

如果待刪除結點 s 不是 tail 結點,則只需要簡單移除 s 即可,否則暫時不能移除 s 結點,會導致 tail 為 null,影響后續入隊列操作。針對這種場景,作者設計了一個 cleanMe 結點,該結點的 next 指針指向需要被移除的 s 結點(此時 s 為 tail 結點),當結點 s 后續不再是 tail 結點時,延后刪除。

接著來看一下 場景 2 ,此時隊列中正在等待的線程運行模式與當前線程互補,所以進入匹配進程。本次與當前線程匹配的是 head 結點的后繼結點上的線程,所以首先需要從前往后在隊列上找到第一個有效(未被取消)的 head 后繼結點,然后執行:

  1. 獲取 head 結點的后繼結點 m;

  2. 如果結點 m 已經被匹配,或被取消,則后移 head 指針后進入下一輪循環重試;

  3. 否則,基于 CAS 嘗試將結點 m 的元素值替換為 e,如果失敗則說明結點 m 已經被其它線程匹配,繼續后移 head 指針后進入下一輪循環重試;

  4. 否則,說明匹配成功,后移 head 指針,并喚醒在匹配結點 m 上等待的線程,如果當前線程是消費者線程則返回匹配結點的元素值,如果當前線程是生產者線程則返回剛剛添加的元素值。

下面利用圖示演示上述執行流程。如下圖 1 所示,假設當前操作線程是一個消費者(圖中黃色結點),期望對 SynchronousQueue 執行出隊列操作,并且當前隊列中已經包含兩個處于等待狀態的生產者(圖中青色結點)。因為當前線程與隊列中等待的線程模式互補,所以獲取 head 結點的 next 結點 m 作為待匹配結點(如下圖 2 所示)。基于 CAS 嘗試將結點 m 的元素值修改為 null,如下圖 3 所示,然后后移 head 指針指向 m 結點,并喚醒在結點 m 上等待的線程,如下圖 4 所示。

JUC的SynchronousQueue如何使用

關于 Dual Queue 的運行機制就介紹這么多,受隊列 FIFO 特性的約束,基于 Dual Queue 的 SynchronousQueue 在隊頭執行出隊列操作,并在隊尾執行入隊列操作,先入隊的線程通常會先被匹配,這也解釋了為什么基于 Dual Queue 的 SynchronousQueue 是公平的。基于 Dual Queue 的 SynchronousQueue 因為入隊和出隊的沖突相對較小,所以在競爭頻繁的場景下相對于非公平模式反而具有更好的性能。

到此,相信大家對“JUC的SynchronousQueue如何使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

昌宁县| 沂南县| 翼城县| 普兰店市| 文山县| 肥西县| 绵阳市| 和田市| 滨海县| 兰溪市| 玉环县| 石楼县| 鄂温| 永寿县| 舞钢市| 都昌县| 汉沽区| 昭通市| 芜湖市| 班玛县| 威远县| 淮北市| 大连市| 南涧| 黄石市| 革吉县| 临安市| 保靖县| 苗栗市| 吴江市| 南岸区| 遵义县| 宾川县| 寻甸| 澎湖县| 保定市| 乌海市| 武川县| 女性| 湘西| 鲁甸县|