中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JUC的AQS隊列同步器怎么使用

發布時間:2021-12-21 10:20:37 來源:億速云 閱讀:93 作者:iii 欄目:大數據

本篇內容介紹了“JUC的AQS隊列同步器怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

AbstractQueuedSynchronizer 簡稱 AQS,可能我們幾乎不會直接去使用它,但它卻是 JUC 的核心基礎組件,支撐著 java 鎖和同步器的實現,例如 ReentrantLock、ReentrantReadWriteLock、CountDownLatch,以及 Semaphore 等。大神 Doug Lea 在設計 JUC 包時希望能夠抽象一個基礎且通用的組件以支撐上層模塊的實現,AQS 應運而生。

AQS 本質上是一個 FIFO 的雙向隊列,線程被包裝成結點的形式,基于自旋機制在隊列中等待獲取資源(這里的資源可以簡單理解為對象鎖)。AQS 在設計上實現了兩類隊列,即 同步隊列條件隊列 ,其中同步隊列服務于線程阻塞等待獲取資源,而條件隊列則服務于線程因某個條件不滿足而進入等待狀態。條件隊列中的線程實際上已經獲取到了資源,但是沒有能夠繼續執行下去的條件,所以被打入條件隊列并釋放持有的資源,以讓渡其它線程執行,如果未來某個時刻條件得以滿足,則該線程會被從條件隊列轉移到同步隊列,繼續參與競爭資源,以繼續向下執行。

本文我們主要分析 AQS 的設計與實現,包括 LockSupport 工具類、同步隊列、條件隊列,以及 AQS 資源獲取和釋放的通用過程。AQS 采用模板方法設計模式,具體獲取資源和釋放資源的過程都交由子類實現,對于這些方法的分析將留到后面分析具體子類的文章中再展開。

LockSupport 工具類

LockSupport 工具類是 JUC 的基礎組件,主要作用是用來阻塞和喚醒線程,底層依賴于 Unsafe 類實現。LockSupport 主要定義類 2 類方法:park 和 unpark,其中 park 方法用于阻塞當前線程,而 unpark 方法用于喚醒處于阻塞狀態的指定線程。

下面的示例演示了 park 和 unpark 方法的基本使用:

Thread thread = new Thread(() -> {
    System.out.println("Thread start: " + Thread.currentThread().getName());
    LockSupport.park(); // 阻塞自己
    System.out.println("Thread end: " + Thread.currentThread().getName());
});

thread.setName("A");
thread.start();

System.out.println("Main thread sleep 3 second: " + Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(3);
LockSupport.unpark(thread); // 喚醒線程 A

線程 A 在啟動之后調用了 LockSupport#park 方法將自己阻塞,主線程在休息 3 秒之后調用 LockSupport#unpark 方法線程 A 喚醒。運行結果:

Thread start: A
Main thread sleep 3 second: 1
Thread end: A

LockSupport 針對 park 方法提供了多種實現,如下:

public static void park()
public static void park(Object blocker)
public static void parkNanos(long nanos)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(long deadline)
public static void parkUntil(Object blocker, long deadline)

由方法命名不難看出,parkNanos 和 parkUntil 都屬于 park 方法的超時版本,區別在于 parkNanos 方法接收一個納秒單位的時間值,用于指定阻塞的時間長度,例如當設置 nanos=3000000000 時,線程將阻塞 3 秒后蘇醒,而 parkUntil 方法則接收一個時間戳,參數 deadline 用于指定阻塞的到期時間。

所有的 park 方法都提供了包含 Object blocker 參數的重載版本,參數 blocker 指代導致當前線程阻塞等待的鎖對象,方便問題排查和系統監控,而在 LockSupport 最開始被設計時卻忽視了這一點,導致在線程 dump 時無法提供阻塞對象的相關信息,這一點在 java 6 中得以改進。實際開發中如果使用到了 LockSupport 工具類,推薦使用帶 blocker 參數的版本。

下面以 LockSupport#park(java.lang.Object) 方法為例來看一下具體的實現,如下:

public static void park(Object blocker) {
    // 獲取當前線程對象
    Thread t = Thread.currentThread();
    // 記錄當前線程阻塞等待的鎖對象(設置線程對象的 parkBlocker 為參數指定的 blocker 對象)
    setBlocker(t, blocker);
    // 阻塞線程
    UNSAFE.park(false, 0L);
    // 線程恢復運行,清除 parkBlocker 參數記錄的鎖對象
    setBlocker(t, null);
}

具體實現比較簡單,阻塞線程的操作依賴于 Unsafe 類實現。上述方法會調用 LockSupport#setBlocker 方法基于 Unsafe 類將參數指定的 blocker 對象記錄到當前線程對象的 Thread#parkBlocker 字段中,然后進入阻塞狀態,并在被喚醒之后清空對應的 Thread#parkBlocker 字段。

當一個線程調用 park 方法進入阻塞狀態之后,會在滿足以下 3 個條件之一時從阻塞狀態中蘇醒:

  1. 其它線程調用 unpark 方法喚醒當前線程。

  2. 其它線程中斷了當前線程的阻塞狀態。

  3. 方法 park 因為一些不合邏輯的原因退出。

線程在從 park 方法中返回時并不會攜帶具體的返回原因,調用者需要自行檢測,例如再次檢查之前調用 park 方法的條件是否仍然滿足以予以推測。

方法 LockSupport#unpark 的實現同樣基于 Unsafe 類實現,不同于 park 的多版本實現,LockSupport 針對 unpark 方法僅提供了單一實現,如下:

public static void unpark(Thread thread) {
    if (thread != null) {
        UNSAFE.unpark(thread);
    }
}

需要注意的一點是,如果事先針對某個線程調用了 unpark 方法,則該線程繼續調用 park 方法并不會進入阻塞狀態,而是會立即返回,并且 park 方法是不可重入的。

同步隊列

同步隊列的作用在于管理競爭資源的線程,當一個線程競爭資源失敗會被記錄到同步隊列的末端,并以自旋的方式循環檢查能夠成功獲取到資源。AQS 的同步隊列基于 CLH(Craig, Landin, and Hagersten) 鎖思想進行設計和實現。CLH 鎖是一種基于鏈表的可擴展、高性能,且具備公平性的自旋鎖。線程以鏈表結點的形式進行組織,在等待期間相互獨立的執行自旋,并不斷輪詢前驅結點的狀態,如果發現前驅結點上的線程釋放了資源則嘗試獲取。

CLH 鎖是 AQS 隊列同步器實現的基礎,AQS 以內部類 Node 的形式定義了同步隊列結點,包括下一小節介紹的條件隊列,同樣以 Node 定義結點。Node 的字段定義如下:

static final class Node {

    /** 模式定義 */

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** 線程狀態 */

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 線程等待狀態 */
    volatile int waitStatus;

    /** 前驅結點 */
    volatile Node prev;
    /** 后置結點 */
    volatile Node next;

    /** 持有的線程對象 */
    volatile Thread thread;

    /** 對于獨占模式而言,指向下一個處于 CONDITION 等待狀態的結點;對于共享模式而言,則為 SHARED 結點 */
    Node nextWaiter;

    // ... 省略方法定義
}

由上述字段定義可以看出,位于 CLH 鏈表中的線程以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨占模式。共享模式與獨占模式的主要區別在于,同一時刻獨占模式只能有一個線程獲取到資源,而共享模式在同一時刻可以有多個線程獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個線程同時獲取到讀鎖資源,而寫操作同一時刻只能有一個線程獲取到寫鎖資源,其它線程在嘗試獲取資源時都會被阻塞。

AQS 的 CLH 鎖為處于 CLH 鏈表中的線程定義了 4 種狀態,包括 CANCELLED、SIGNAL、CONDITION,以及 PROPAGATE,并以 Node#waitStatus 字段進行記錄。這 4 種狀態的含義分別為:

  • CANCELLED :表示當前線程處于取消狀態,一般是因為等待超時或者被中斷,處于取消狀態的線程不會再參與到競爭中,并一直保持該狀態。

  • SIGNAL:表示當前結點后繼結點上的線程正在等待被喚醒,如果當前線程釋放了持有的資源或者被取消,需要喚醒后繼結點上的線程。

  • CONDITION :表示當前線程正在等待某個條件,當某個線程在調用了 Condition#signal 方法后,當前結點將會被從條件隊列轉移到同步隊列中,參與競爭資源。

  • PROPAGATE :處于該狀態的線程在釋放共享資源,或接收到釋放共享資源的信號時需要通知后繼結點,以防止通知丟失。

一個結點在被創建時,字段 Node#waitStatus 的初始值為 0,表示結點上的線程不位于上述任何狀態。

Node 類在方法定義上除了基本的構造方法外,僅定義了 Node#isSharedNode#predecessor 兩個方法,其中前者用于返回當前結點是否以共享模式在等待,后者用于返回當前結點的前驅結點。

介紹完了隊列結點的定義,那么同步隊列具體如何實現呢?這還需要依賴于 AbstractQueuedSynchronizer 類中的兩個字段定義,即:

private transient volatile Node head;
private transient volatile Node tail;

其中 head 表示同步隊列的頭結點,而 tail 則表示同步隊列的尾結點,具體組織形式如下圖:

JUC的AQS隊列同步器怎么使用

當調用 AQS 的 acquire 方法獲取資源時,如果資源不足則當前線程會被封裝成 Node 結點添加到同步隊列的末端,頭結點 head 用于記錄當前正在持有資源的線程結點,而 head 的后繼結點就是下一個將要被調度的線程結點,當 release 方法被調用時,該結點上的線程將被喚醒,繼續獲取資源。

關于同步隊列結點入隊列、出隊列的實現先不展開,留到后面分析 AQS 資源獲取與釋放的過程時一并分析。

條件隊列

除了上面介紹的同步隊列,在 AQS 中還定義了一個條件隊列。內部類 ConditionObject 實現了條件隊列的組織形式,包含一個起始結點(firstWaiter)和一個末尾結點(lastWaiter),并同樣以上面介紹的 Node 類定義結點,如下:

public class ConditionObject implements Condition, Serializable {

        /** 指向條件隊列中的起始結點 */
        private transient Node firstWaiter;
        /** 指向條件隊列的末尾結點 */
        private transient Node lastWaiter;

        // ... 省略方法定義

}

前面在分析 Node 內部類的時候,可以看到 Node 類還定義了一個 Node#nextWaiter 字段,用于指向條件隊列中的下一個等待結點。由此我們可以描繪出條件隊列的組織形式如下:

JUC的AQS隊列同步器怎么使用

ConditionObject 類實現了 Condition 接口,該接口定義了與 Lock 鎖相關的線程通信方法,主要分為 await 和 signal 兩大類。

當線程調用 await 方法時,該線程會被包裝成結點添加到條件隊列的末端,并釋放持有的資源。當條件得以滿足時,方法 signal 可以將條件隊列中的一個或全部的線程結點從條件隊列轉移到同步隊列以參與競爭資源。應用可以創建多個 ConditionObject 對象,每個對象都對應一個條件隊列,對于同一個條件隊列而言,其中的線程所等待的條件是相同的。

Condition 接口的定義如下:

public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();
    void signalAll();
}
等待:await

下面來分析一下 ConditionObject 類針對 Condition 接口方法的實現,首先來看一下 ConditionObject#await 方法,該方法用于將當前線程添加到條件隊列中進行等待,同時支持響應中斷。方法實現如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted()) {
        // 立即響應中斷
        throw new InterruptedException();
    }
    // 將當前線程添加到等待隊列末尾,等待狀態為 CONDITION
    Node node = this.addConditionWaiter();
    // 釋放當前線程持有的資源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 如果當前結點位于條件隊列中,則循環
        // 阻塞當前線程
        LockSupport.park(this);
        // 如果線程在阻塞期間被中斷,則退出循環
        if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }
    // 如果在同步隊列中等待期間被中斷,且之前的中斷狀態不為 THROW_IE
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    if (node.nextWaiter != null) {
        // 清除條件隊列中所有狀態不為 CONDITION 的結點
        this.unlinkCancelledWaiters();
    }
    // 如果等待期間被中斷,則響應中斷
    if (interruptMode != 0) {
        this.reportInterruptAfterWait(interruptMode);
    }
}

因為 ConditionObject#await 方法支持響應中斷,所以在方法一開始會先檢查一下當前線程是否被中斷,如果是則拋出 InterruptedException 異常,否則繼續將當前線程加入到條件隊列中進行等待。整體執行流程可以概括為:

  1. 將當前線程加入到條件隊列末端,并設置等待狀態為 CONDITION;

  2. 釋放當前線程所持有的資源,避免饑餓或死鎖;

  3. 基于自旋機制在條件隊列中等待,直到被其它線程轉移到同步隊列,或者等待期間被中斷;

  4. 如果等待期間被中斷,則響應中斷。

ConditionObject 定義了兩種中斷響應方式,即:REINTERRUPTTHROW_IE。如果是 REINTERRUPT,則線程會調用 Thread#interrupt 方法中斷自己;如果是 THROW_IE,則線程會直接拋出 InterruptedException 異常。

下面繼續分析一下支撐 ConditionObject#await 運行的其它幾個方法,包括 addConditionWaiter、fullyRelease、isOnSyncQueue,以及 unlinkCancelledWaiters。

方法 ConditionObject#addConditionWaiter 用于將當前線程包裝成 Node 結點對象添加到條件隊列的末端,期間會執行清除條件隊列中處于取消狀態(等待狀態不為 CONDITION)的線程結點。方法實現如下:

private Node addConditionWaiter() {
    // 獲取條件隊列的末尾結點
    Node t = lastWaiter;
    // 如果末尾結點狀態不為 CONDITION,表示對應的線程已經取消了等待,需要執行清理操作
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清除條件隊列中所有狀態不為 CONDITION 的結點
        this.unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 構建當前線程對應的 Node 結點,等待狀態為 CONDITION,并添加到條件隊列末尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null) {
        firstWaiter = node;
    } else {
        t.nextWaiter = node;
    }
    lastWaiter = node;
    return node;
}

將當前線程對象添加到條件隊列中的過程本質上是一個簡單的鏈表插入操作,在執行插入操作之前,上述方法會先對條件隊列執行一遍清理操作,清除那些狀態不為 CONDITION 的結點。具體實現位于 ConditionObject#unlinkCancelledWaiters 方法中:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null; // 記錄上一個不被刪除的結點
    while (t != null) {
        Node next = t.nextWaiter;
        // 如果結點上的線程等待狀態不為 CONDITION,則刪除對應結點
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null) {
                firstWaiter = next;
            } else {
                trail.nextWaiter = next;
            }
            if (next == null) {
                lastWaiter = trail;
            }
        } else {
            trail = t;
        }
        t = next;
    }
}

方法 AbstractQueuedSynchronizer#fullyRelease 用于釋放當前線程持有的資源,這也是非常容易理解的,畢竟當前線程即將進入等待狀態,如果持有的資源不被釋放,將可能導致程序最終被餓死,或者死鎖。方法的實現如下:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取當前線程的同步狀態,可以理解為持有的資源數量
        int savedState = this.getState();
        // 嘗試釋放當前線程持有的資源
        if (this.release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 釋放資源失敗
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed) {
            // 如果釋放資源失敗,則取消當前線程
            node.waitStatus = Node.CANCELLED;
        }
    }
}

如果資源釋放失敗,則上述方法會將當前線程的狀態設置為 CANCELLED,以退出等待狀態。

方法 AbstractQueuedSynchronizer#isOnSyncQueue 用于檢測當前結點是否位于同步隊列中,方法實現如下:

final boolean isOnSyncQueue(Node node) {
    // 如果結點位于等待隊列,或是頭結點則返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null) {
        return false;
    }
    // If has successor, it must be on queue
    if (node.next != null) {
        return true;
    }

    /*
     * node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail.
     * So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method,
     * and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much.
     */

    // 從后往前檢測目標結點是否位于同步隊列中
    return this.findNodeFromTail(node);
}

如果一個線程所等待的條件被滿足,則觸發條件滿足的線程會將等待該條件的一個或全部線程結點從條件隊列轉移到同步隊列,此時,這些線程將從 ConditionObject#await 方法中退出,以參與競爭資源。

方法 ConditionObject#awaitNanosConditionObject#awaitUntilConditionObject#await(long, TimeUnit) 在上面介紹的 ConditionObject#await 方法的基礎上引入了超時機制,當一個線程在條件隊列中等待的時間超過設定值時,線程結點將被從條件隊列轉移到同步隊列,參與競爭資源。其它執行過程與 ConditionObject#await 方法相同,故不再展開。

下面來分析一下 ConditionObject#awaitUninterruptibly 方法,由方法命名可以看出該方法相對于 ConditionObject#await 方法的區別在于在等待期間不響應中斷。方法實現如下:

public final void awaitUninterruptibly() {
    // 將當前線程添加到等待隊列末尾,等待狀態為 CONDITION
    Node node = this.addConditionWaiter();
    // 釋放當前線程持有的資源
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    // 如果當前結點位于條件隊列中,則循環
    while (!isOnSyncQueue(node)) {
        // 阻塞當前線程
        LockSupport.park(this);
        if (Thread.interrupted()) {
            // 標識線程等待期間被中斷,但不立即響應
            interrupted = true;
        }
    }
    // 自旋獲取資源,返回 true 則說明等待期間被中斷過
    if (acquireQueued(node, savedState) || interrupted) {
        // 響應中斷
        selfInterrupt();
    }
}

如果線程在等待期間被中斷,則上述方法會用一個字段進行記錄,并在最后集中處理,而不會因為中斷而退出等待狀態。

通知:signal

調用 await 方法會將線程對象自身加入到條件隊列中進行等待,而 signal 通知方法則用于將一個或全部的等待線程從條件隊列轉移到同步隊列,以參與競爭資源。ConditionObject 定義了兩個通知方法:signal 和 signalAll,前者用于將條件隊列的頭結點(也就是等待時間最長的結點)從條件隊列轉移到同步隊列,后者用于將條件隊列中所有處于等待狀態的結點從條件隊列轉移到同步隊列。下面分別來分析一下這兩個方法的實現。

方法 ConditionObject#signal 的實現如下:

public final void signal() {
    // 先檢測當前線程是否獲取到了鎖,否則不允許繼續執行
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    // 獲取條件隊列頭結點,即等待時間最長的結點
    Node first = firstWaiter;
    if (first != null) {
        // 將頭結點從條件隊列轉移到同步隊列,參與競爭資源
        this.doSignal(first);
    }
}

調用 ConditionObject#signal 方法的線程必須位于臨界區,也就是必須先持有獨占鎖,所以上述方法一開始會對這一條件進行校驗,方法 AbstractQueuedSynchronizer#isHeldExclusively 是一個模板方法,交由子類來實現。如果滿足執行條件,則上述方法會調用 ConditionObject#doSignal 方法將條件隊列的頭結點從條件隊列轉移到同步隊列。

private void doSignal(Node first) {
    // 從前往后遍歷,直到遇到第一個不為 null 的結點,并將其從條件隊列轉移到同步隊列
    do {
        if ((firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null;
        }
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
    // 更新當前結點的等待狀態:CONDITION -> 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 更新失敗,說明對應的結點上的線程已經被取消
        return false;
    }

    /*
     * Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting.
     * If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
     */

    // 將結點添加到同步隊列末端,并返回該結點的前驅結點
    Node p = this.enq(node);
    int ws = p.waitStatus;
    // 如果前驅結點被取消,或者設置前驅結點的狀態為 SIGNAL 失敗,則喚醒當前結點上的線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
        LockSupport.unpark(node.thread);
    }
    return true;
}

方法 ConditionObject#doSignal 會從前往后遍歷條件隊列,尋找第一個不為 null 的結點,并應用 AbstractQueuedSynchronizer#transferForSignal 方法嘗試將其從條件隊列轉移到同步隊列。

在入同步隊列之前,方法 AbstractQueuedSynchronizer#transferForSignal 會基于 CAS 機制清除結點的 CONDITION 狀態,如果清除失敗則說明該結點上的線程已被取消,此時 ConditionObject#doSignal 方法會繼續尋找下一個可以被喚醒的結點。如果清除結點狀態成功,則接下來會將該結點添加到同步隊列的末端,同時依據前驅結點的狀態決定是否喚醒當前結點上的線程。

繼續來看 ConditionObject#signalAll 方法的實現,相對于上面介紹的 ConditionObject#signal 方法,該方法的特點在于它會喚醒條件隊列中所有不為 null 的等待結點。方法實現如下:

public final void signalAll() {
    if (!isHeldExclusively()) {
        // 先檢測當前線程是否獲取到了鎖,否則不允許繼續執行
        throw new IllegalMonitorStateException();
    }
    // 獲取條件隊列頭結點
    Node first = firstWaiter;
    if (first != null) {
        // 將所有結點從條件隊列轉移到同步隊列,參與競爭資源
        this.doSignalAll(first);
    }
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

實際上理解了 ConditionObject#doSignal 的運行機制,再理解 ConditionObject#signalAll 的運行機制也是水到渠成的事情。

資源的獲取與釋放

前面的小節我們分析了 LockSupport 工具類,以及 AQS 同步隊列和條件隊列的設計與實現,這些都是支撐 AQS 運行的基礎組件,本小節我們將正式開始分析 AQS 的實現機制。

AQS 對應的 AbstractQueuedSynchronizer 實現類,在屬性定義上主要包含 4 個字段(如下),其中 exclusiveOwnerThread 由父類 AbstractOwnableSynchronizer 繼承而來,用于記錄當前持有獨占鎖的線程對象,而 head 和 tail 字段分別指向同步隊列的頭結點和尾結點:

private transient Thread exclusiveOwnerThread;

private transient volatile Node head;
private transient volatile Node tail;

private volatile int state;

字段 state 用于描述同步狀態,對于不同的實現類來說具備不同的用途:

  • 對于 ReentrantLock 而言,表示當前線程獲取鎖的重入次數。

  • 對于 ReentrantReadWriteLock 而言,高 16 位表示獲取讀鎖的重入次數,低 16 位表示獲取寫鎖的重入次數。

  • 對于 Semaphore 而言,表示當前可用的信號個數。

  • 對于 CountDownLatch 而言,表示計數器當前的值。

具體細節我們將在后面分析相應組件實現機制的文章中再展開說明。

AbstractQueuedSynchronizer 是一個抽象類,在方法設計上引入了模板方法設計模式,下面的代碼塊中列出了所有需要子類依據自身運行機制針對性實現的模板方法:

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()

這里先簡單說明一下各個方法的作用,具體實現留到后面分析各個基于 AQS 實現組件的文章中再進一步分析:

  • tryAcquire :嘗試以獨占模式獲取資源,如果獲取成功則返回 true,否則返回 false。

  • tryRelease :嘗試以獨占模式釋放資源,如果釋放成功則返回 true,否則返回 false。

  • tryAcquireShared :嘗試以共享模式獲取資源,如果返回正數則說明獲取成功,且還有可用的剩余資源;如果返回 0 則說明獲取成功,但是沒有可用的剩余資源;如果返回負數則說明獲取資源失敗。

  • tryReleaseShared :嘗試以共享模式釋放資源,如果釋放成功則返回 true,否則返回 false。

  • isHeldExclusively :判斷當前線程是否正在獨占資源,如果是則返回 true,否則返回 false。

AbstractQueuedSynchronizer 中的方法實現按照功能劃分可以分為兩大類,即獲取資源(acquire)和釋放資源(release),同時區分獨占模式和共享模式。下面的小節中主要對獲取和釋放資源的方法區分獨占模式和共享模式進行分析。

獨占獲取資源

針對獨占模式獲取資源,AbstractQueuedSynchronizer 定義了多個版本的 acquire 方法實現,包括:acquire、acquireInterruptibly,以及 tryAcquireNanos,其中 acquireInterruptibly 是 acquire 的中斷版本,在等待獲取資源期間支持響應中斷請求,tryAcquireNanos 除了支持響應中斷以外,還引入了超時等待機制。

下面主要分析一下 AbstractQueuedSynchronizer#acquire 的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。方法 AbstractQueuedSynchronizer#acquire 的實現如下:

public final void acquire(int arg) {
    if (!this.tryAcquire(arg) // 嘗試獲取資源
            // 如果獲取資源失敗,則將當前線程加入到同步隊列的末端(獨占模式),并基于自旋機制等待獲取資源
            && this.acquireQueued(this.addWaiter(Node.EXCLUSIVE), arg)) {
        // 等待獲取資源期間曾被中斷過,在獲取資源成功之后再響應中斷
        selfInterrupt();
    }
}

方法 AbstractQueuedSynchronizer#tryAcquire 的功能在前面已經簡單介紹過了,用于嘗試獲取資源,如果獲取資源失敗則會將當前線程添加到同步隊列中,基于自旋機制等待獲取資源。

方法 AbstractQueuedSynchronizer#addWaiter 用于將當前線程對象封裝成結點添加到同步隊列末端,并最終返回線程結點對象:

private Node addWaiter(Node mode) {
    // 為當前線程創建結點對象
    Node node = new Node(Thread.currentThread(), mode);
    // 基于 CAS 機制嘗試快速添加結點到同步隊列末端
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (this.compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速添加失敗,繼續嘗試將該結點添加到同步隊列末端,如果同步隊列未被初始化則執行初始化
    this.enq(node);
    // 返回當前線程對應的結點對象
    return node;
}

上述方法在添加結點的時候,如果同步隊列已經存在,則嘗試基于 CAS 操作快速將當前結點添加到同步隊列末端。如果添加失敗,或者隊列不存在,則需要再次調用 AbstractQueuedSynchronizer#enq 方法執行添加操作,該方法在判斷隊列不存在時會初始化同步隊列,然后基于 CAS 機制嘗試往同步隊列末端插入線程結點。方法實現如下:

private Node enq(final Node node) {
    for (; ; ) {
        // 獲取同步隊列末尾結點
        Node t = tail;
        // 如果結點不存在,則初始化
        if (t == null) { // Must initialize
            if (this.compareAndSetHead(new Node())) {
                tail = head;
            }
        } else {
            // 往末尾追加
            node.prev = t;
            if (this.compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

完成了結點的入同步隊列操作,接下來會調用 AbstractQueuedSynchronizer#acquireQueued 方法基于自旋機制等待獲取資源,在等待期間并不會響應中斷,而是記錄中斷標志,等待獲取資源成功后延遲響應。方法實現如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 標記自旋過程中是否被中斷
        // 基于自旋機制等待獲取資源
        for (; ; ) {
            // 獲取前驅結點
            final Node p = node.predecessor();
            // 如果前驅結點為頭結點,說明當前結點是排在同步隊列最前面,可以嘗試獲取資源
            if (p == head && this.tryAcquire(arg)) {
                // 獲取資源成功,更新頭結點
                this.setHead(node); // 頭結點一般記錄持有資源的線程結點
                p.next = null; // help GC
                failed = false;
                return interrupted; // 自旋過程中是否被中斷
            }
            // 如果還未輪到當前結點,或者獲取資源失敗
            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,并在蘇醒時檢查中斷狀態
                // 標識等待期間被中斷
                interrupted = true;
            }
        }
    } finally {
        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的進程
        if (failed) {
            this.cancelAcquire(node);
        }
    }
}

上述方法會循環檢測當前結點是否已經排在同步隊列的最前端,如果是則調用 AbstractQueuedSynchronizer#tryAcquire 方法嘗試獲取資源,具體獲取資源的過程由子類實現。自旋期間如果還未輪到調度當前線程結點,或者嘗試獲取資源失敗,則會調用 AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire 方法檢測是否需要阻塞當前線程,具體判定的過程依賴于前驅結點的等待狀態,實現如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅結點狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) {
        // 前驅結點狀態為 SIGNAL,說明當前結點需要被阻塞
        return true;
    }
    if (ws > 0) {
        // 前驅結點處于取消狀態,則一直往前尋找處于等待狀態的結點,并排在其后面
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 前驅結點的狀態為 0 或 PROPAGATE,但是當前結點需要一個被喚醒的信號,
         * 所以基于 CAS 將前驅結點等待狀態設置為 SIGNAL,在阻塞之前,調用者需要重試以再次確認不能獲取到資源。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

上述方法首先會獲取前驅結點的等待狀態,并依據具體的狀態值進行決策:

  1. 如果前驅結點等待狀態為 SIGNAL,則說明當前結點需要被阻塞,所以直接返回 true;

  2. 否則,如果前驅結點的等待狀態大于 0(即處于取消狀態),則一直往前尋找未被取消的結點,并將當前結點排在其后,這種情況下直接返回 false,再次嘗試獲取一次資源;

  3. 否則,前驅結點的狀態為 0 或 PROPAGATE(不可能為 CONDITION 狀態,因為當前處于同步隊列),因為當前結點需要一個喚醒信號,所以修改前驅結點的狀態為 SIGNAL,這種情況下同樣返回 false,以再次確認不能獲取到資源。

如果上述檢查返回 true,則接下來會調用 AbstractQueuedSynchronizer#parkAndCheckInterrupt 方法,基于 LockSupport 工具阻塞當前線程,并在線程蘇醒時檢查中斷狀態。如果期間被中斷過則記錄中斷標記,而不立即響應,直到成功獲取到資源,或者期間發生異常退出自旋。方法 AbstractQueuedSynchronizer#acquireQueued 最終會返回這一中斷標記,并在外圍進行響應。

如果在自旋期間發生異常,則上述方法會執行 AbstractQueuedSynchronizer#cancelAcquire 以取消當前結點等待獲取資源的進程,包括設置結點的等待狀態為 CANCELLED,喚醒后繼結點等。

獨占釋放資源

針對獨占模式釋放資源,AbstractQueuedSynchronizer 定義了單一實現,即 AbstractQueuedSynchronizer#release 方法,該方法本質上是一個調度的過程,具體釋放資源的操作交由 tryRelease 方法完成,由子類實現。方法 AbstractQueuedSynchronizer#release 實現如下:

public final boolean release(int arg) {
    // 嘗試釋放資源
    if (this.tryRelease(arg)) {
        Node h = head;
        // 如果釋放資源成功,則嘗試喚醒后繼結點
        if (h != null && h.waitStatus != 0) {
            this.unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

如果 tryRelease 釋放資源成功,則上述方法會嘗試喚醒同步隊列中由后往前距離頭結點最近的一個結點上的線程。方法 AbstractQueuedSynchronizer#unparkSuccessor 的實現如下:

private void unparkSuccessor(Node node) {
    // 獲取當前結點狀態
    int ws = node.waitStatus;
    if (ws < 0) {
        // 如果當前結點未被取消,則基于 CAS 更新結點等待狀態為 0
        compareAndSetWaitStatus(node, ws, 0);
    }

    /*
     * Thread to unpark is held in successor, which is normally just the next node.
     * But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
     */
    Node s = node.next; // 獲取后繼結點
    // 如果后繼結點為 null,或者被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從后往前尋找距離當前結點最近的一個未被取消的線程結點
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0) {
                s = t;
            }
        }
    }
    // 喚醒結點上的線程
    if (s != null) {
        LockSupport.unpark(s.thread);
    }
}

選舉待喚醒線程結點的過程被設計成從后往前遍歷,尋找距離當前結點最近的未被取消的結點,并調用 LockSupport 工具類喚醒結點上的線程。

為什么要設計成從后往前遍歷同步隊列呢 ?在 Doug Lea 大神的論文 The java.util.concurrent Synchronizer Framework 中給出了答案,摘錄如下:

An AbstractQueuedSynchronizer queue node contains a next link to its successor. But because there are no applicable techniques for lock-free atomic insertion of double-linked listnodes using compareAndSet, this link is not atomically set as part of insertion; it is simply assigned: pred.next = node; after the insertion. This is reflected in all usages. The next link is treated only as an optimized path. If a node's successor does not appear to exist (or appears to be cancelled) via its next field, it is always possible to start at the tail of the list and traverse backwards using the pred field to accurately check if therereally is one.

也就說對于雙向鏈表而言,沒有不加鎖的原子手段可以保證構造雙向指針的線程安全性。回到代碼中,我們回顧一下往同步隊列中添加結點的執行過程,如下(其中 pred 是末尾結點,而 node 是待插入的結點):

node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
}

上述方法會將 node 結點的 prev 指針指向 pred 結點,而將 pred 的 next 指針指向 node 結點的過程需要建立在基于 CAS 成功將 node 設置為末端結點的基礎之上,如果這一過程失敗則 next 指針將會斷掉,而選擇從后往前遍歷則始終能夠保證遍歷到頭結點。

共享獲取資源

針對共享模式獲取資源,AbstractQueuedSynchronizer 同樣定義了多個版本的 acquire 方法實現,包括:acquireShared、acquireSharedInterruptibly,以及 tryAcquireSharedNanos,其中 acquireSharedInterruptibly 是 acquireShared 的中斷版本,在等待獲取資源期間支持響應中斷請求,tryAcquireSharedNanos 除了支持響應中斷以外,還引入了超時等待機制。下面同樣主要分析一下 AbstractQueuedSynchronizer#acquireShared 的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。

方法 AbstractQueuedSynchronizer#acquireShared 的實現如下:

public final void acquireShared(int arg) {
    // 返回負數表示獲取資源失敗
    if (this.tryAcquireShared(arg) < 0) {
        // 將當前線程添加到條件隊列,基于自旋等待獲取資源
        this.doAcquireShared(arg);
    }
}

private void doAcquireShared(int arg) {
    // 將當前線程加入條件隊列末端,并標記為共享模式
    final Node node = this.addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false; // 標記自旋過程中是否被中斷
        for (; ; ) {
            // 獲取前驅結點
            final Node p = node.predecessor();
            // 如果前驅結點為頭結點,說明當前結點是排在同步隊列最前面,可以嘗試獲取資源
            if (p == head) {
                // 嘗試獲取資源
                int r = this.tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取資源成功,設置自己為頭結點,并嘗試喚醒后繼結點
                    this.setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) {
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }
            // 如果還未輪到當前結點,或者獲取資源失敗
            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前線程
                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,并在蘇醒時檢查中斷狀態
                // 標識等待期間被中斷
                interrupted = true;
            }
        }
    } finally {
        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的進程
        if (failed) {
            this.cancelAcquire(node);
        }
    }
}

上述方法與 AbstractQueuedSynchronizer#acquire 的實現邏輯大同小異,區別在于線程在被封裝成結點之后,是以共享(SHARED)模式在同步隊列中進行等待。這里我們重點關注一下 AbstractQueuedSynchronizer#setHeadAndPropagate 方法的實現,當結點上的線程成功獲取到資源會觸發執行該方法,以嘗試喚醒后繼結點。實現如下:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 記錄之前的頭結點
    this.setHead(node); // 頭結點一般記錄持有資源的線程結點
    /*
     * 如果滿足以下條件,嘗試喚醒后繼結點:
     *
     * 1. 存在剩余可用的資源;
     * 2. 后繼結點處于等待狀態,或后繼結點為空
     *
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *   or was recorded (as h.waitStatus either before or after setHead) by a previous operation
     *   (note: this uses sign-check of waitStatus because PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *   or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause unnecessary wake-ups,
     * but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
     */
    if (propagate > 0 // 存在剩余可用的資源
            || h == null || h.waitStatus < 0 // 此時 h 是之前的頭結點
            || (h = head) == null || h.waitStatus < 0) { // 此時 h 已經更新為當前頭結點
        Node s = node.next;
        // 如果后繼結點以共享模式在等待,或者后繼結點未知,則嘗試喚醒后繼結點
        if (s == null || s.isShared()) {
            this.doReleaseShared();
        }
    }
}

因為當前結點已經獲取到資源,所以需要將當前結點記錄到頭結點中。此外,如果滿足以下 2 種情況之一,還需要喚醒后繼結點:

  1. 參數 propagate > 0,即存在可用的剩余資源;

  2. 前任頭結點或當前頭結點不存在,或指明后繼結點需要被喚醒。

如果滿足上述條件之一,且后繼結點狀態未知或以共享模式在等待,則調用 AbstractQueuedSynchronizer#doReleaseShared 方法喚醒后繼結點,關于該方法的實現留到下一小節進行分析。

共享釋放資源

針對共享模式釋放資源,AbstractQueuedSynchronizer 同樣定義了單一實現,即 AbstractQueuedSynchronizer#releaseShared 方法,該方法本質上也是一個調度的過程,具體釋放資源的操作交由 tryReleaseShared 方法完成,由子類實現。方法 AbstractQueuedSynchronizer#releaseShared 實現如下:

public final boolean releaseShared(int arg) {
    // 嘗試釋放資源
    if (this.tryReleaseShared(arg)) {
        // 釋放資源成功,喚醒后繼結點
        this.doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other in-progress acquires/releases.
     * This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
     * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added while we are doing this.
     * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
     */
    for (; ; ) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果頭結點狀態為 SIGNAL,則在喚醒后繼結點之前嘗試清除當前結點的狀態
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                    // loop to recheck cases
                    continue;
                }
                // 喚醒后繼結點
                this.unparkSuccessor(h);
            }
            /*
             * 如果后繼結點暫時不需要被喚醒,則基于 CAS 嘗試將目標結點的 waitStatus 由 0 修改為 PROPAGATE,
             * 以保證后續由喚醒通知到來時,能夠將通知傳遞下去
             */
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                // loop on failed CAS
                continue;
            }
        }
        // 如果頭結點未變更,則說明期間持有鎖的線程未發生變化,能夠走到這一步說明前面的操作已經成功完成
        if (h == head) {
            break;
        }
        // 如果頭結點發生變更,則說明期間持有鎖的線程發生了變化,需要重試以保證喚醒動作的成功執行
    }
}

如果釋放資源成功,需要依據頭結點當下等待狀態分別處理:

  1. 如果頭結點的等待狀態為 SIGNAL,則表明后繼結點需要被喚醒,在執行喚醒操作之前需要清除等待狀態。

  2. 如果頭結點狀態為 0,則表示后繼結點不需要被喚醒,此時需要將結點狀態修改為 PROPAGATE,以保證后續接收到喚醒通知時能夠將通知傳遞下去。

“JUC的AQS隊列同步器怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

台北县| 临安市| 昌江| 博乐市| 桓仁| 上林县| 醴陵市| 农安县| 浦江县| 定州市| 霍邱县| 泸水县| 雷州市| 山丹县| 建德市| 大英县| 台东市| 牙克石市| 无棣县| 镇康县| 和顺县| 邵阳市| 富源县| 会宁县| 宁津县| 无为县| 嘉祥县| 桐城市| 大兴区| 海兴县| 马关县| 建昌县| 台南县| 年辖:市辖区| 玉溪市| 新丰县| 齐齐哈尔市| 山阴县| 昌宁县| 桦甸市| 邹平县|