中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

AbstractQueuedSynchroizer(AQS) 同步器詳解詳解

發布時間:2020-10-19 15:18:10 來源:網絡 閱讀:1225 作者:wx5c78c8b1dbb1b 欄目:編程語言

一、什么是同步器
同步器是用來構建鎖或者其他同步組件的基礎框架,它使用一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,它能實現大部分的同步需求。
同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者的關系:鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程的并行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和同步容器很好地隔離了使用者和實現者所需要關注的領域。
二、同步器的基本成員(介紹常用的類好方法)
Node 是AQS的內部類構成AQS隊列的一種數據結構。

成員變量 作用
waitStatus 記錄節點的等待狀態。包括如下狀態:① CANCELLED,值為1,由于同步隊列中等待線程超時或者被中斷,需要從同步隊列中取消等待,節點進入該狀態將不會變化。② SIGNAL值為-1,后繼節點的線程處于等待狀態,而當前線程如果釋放了同步狀態或者取消,將會通知后繼節點,使得后繼節點得以運行。③ CONDITION值為-2,節點在等待隊列中,節點等待在Condtion上,當其他線程對Condtion調用了signal方法后,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中。④ PROPAGATE值為-3,表示下一次共享式同步狀態將會無條件地被傳播下去。⑤ INITAL,值為0,初始狀態
SHARED = new Node() 表示共享式的node
EXCLUSIVE = null 獨占式的node
Node prev Node的前節點
Node next Node的后節點
nextWaitert 等待隊列的中node的下一個節點

ConditionObject是AQS的內部類構成類似Object的等待/通知機制。

成員/方法 作用
Node firstWaiter 等待隊列的頭節點
Node lastWaiter 等待隊列的尾節點
await() 當前線程進入等待狀態知道被通知或中斷,當前線程進入運行狀態且從await()返回的情況如下,包括:① 其它線程調用Interrupt()方法中斷當前線程。② 如果當前線程從await()方法返回,那么表明該線程已經獲取了Condtion對象鎖對應的鎖
awaitUninterruptibly() 當前線程進入等待直到被通知,該方法對中斷不敏感
awaitNanos(long nanosTimeout)) 當前線程進入等待狀態直到被通知、中斷或者超時。返回值表示剩余的時間,如果在nanosTimeout納秒之前被喚醒,那么返回就是(nanosTimeout-實際耗時);如果返回是0或者負數,那么可以認定已經超時了
awaitUntil(Date deadline) 當前線程進入等待狀態直到被通知、中斷或者某個時間。如果沒有到指定時間就被通知,方法返回true,否則,表示超時,方法返回false
signal() 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須獲得與Condition相關的鎖
signalAll 喚醒所有等待在Condition上的線程,能夠從等待方法返回的線程必須獲得與Condition相關聯的鎖

AQS主要成員

成員變量 作用
state 維護鎖的一個變量(同步狀態,很重要)① setState 。② getState。 ③ compareAndSetState。
Node head FIFO同步隊列的頭結點 。
Node tail FIFO同步隊列的尾結點 。

AQS主要方法

方法名 作用
acquire() 獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法會調用重寫的tryAcquire(arg)方法(需要鎖自己實現)
release(int arg) 獨占式釋放狀態,如果釋放狀態成功,則會去喚醒頭結點;釋放狀態調用tryRelease(arg)方法(需要自己實現)
acquireShared(int arg) 共享式獲取同步狀態,也就是說可以幾個線程同時獲取同步狀態,如果當前線程未獲取同步狀態,將會進入同步隊列。
releaseShared() 共享式釋放狀態,釋放之后會喚醒頭結點
acquireInterruptibly() 響應中斷的獨占式獲取同步狀態,當前線程未獲取同步狀態而進入同步隊列中,如果當前線程被中斷,則該方法會拋出中斷異常,并返回
tryAcquireNanos() 在acquireInterruptibly()基礎上增加超時限制,如果當前線程在超時時間內沒有獲取同步狀態,那么將返回false,如果獲取到了返回true
acquireSharedInterruptibly() 響應中斷的共享式獲取同步狀態
tryAcquireSharedNanos() 在acquireSharedInterruptibly()的基礎上增加超時限制

以上就是AQS的一些基本成員和方法,下面主要從現實的角度分析這些方法,理解這些方法的實現,能剛好的幫助我們去理解鎖。
三、AQS的方法實現分析
1)、獨占系列的方法
①、acquire()獨占式獲取同步狀態,表示只會有一個線程獲取,其它線程進入同步隊列。
源代碼如下:

// 獲取鎖的方法(獨占模式)
    public final void acquire(int arg) {
        // tryAcquire(arg) 這個方法需要我們自己去實現,如果獲取失敗,
        // 調用addWaiter構造節點
        // acquireQueued
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我們可以看見acquire方法內部調用了tryAcquire(arg)方法,這個方法需要構造同步組件的類自己去實現,不過返回值已經被AQS定義好了,返回true代表獲取同步狀態成功,返回false代表失敗,需要將線程構造節點加入同步隊列,就是調用acquireQueued這個方法。
acquireQueued這個方法實際是先去調用了addWaiter方法。
addWaiter(),這個方法其實就是把當前節點加入到同步隊列,加入成功才返回,其實隊列初始化時會制造一個空的節點,然后在空的節點后面設置同步節點(可以理解為每次獲取獲取鎖的那個線程其實就是頭結點,它是一個空的節點,結合acquireQueued方法,每次獲取鎖之后,該節點就會升級為頭節點,并且變成一個空節點。)

private Node addWaiter(Node mode) {
        // 構造一個節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 獲取尾節點
        Node pred = tail;
        // 判斷尾節點是否為空
        if (pred != null) {
            // 不為空,設置node節點的上一個節點為pred(也就是尾節點)
            node.prev = pred;
            // cas設置尾節點
            if (compareAndSetTail(pred, node)) {
                // 成功后,設置pred節點的next節點為node,返回
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

acquireQueued()方法,通過上面的addWaiter方法我們已經把這個節點加入同步隊列,接下來需要處理這個節點。首先判斷自己的前節點是否是頭結點,自己是否獲取到同步狀態,如果滿足,把自己設置尾頭結點,返回,如果不是,進入shouldParkAfterFailedAcquire(詳情見后面方法分析)方法主要作用是判斷自己的前置節點是否是SIGNAL狀態,是的話自己就可以阻塞自己了,調用parkAndCheckInterrupt(詳情見后面方法分析)方法,直到被喚醒或者中斷。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取前一個節點
                final Node p = node.predecessor();
                // p是頭結點,自己獲取鎖成功
                if (p == head && tryAcquire(arg)) {
                                // 設置自己為頭節點,變成一個空節點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

                // 找到前節點為signal,然后阻塞自己
                // 清理等待超時或者中斷的節點
                // 嘗試設置線程的狀態為signal
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 出現異常
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法,這個方法主要做三件事,判斷自己的前置節點是否是SIGNAL,返回true,就可以阻塞了,不是如果狀態大于0,證明前面的節點被中斷或者超時了,需要從隊列清理了,不是大于0,就利用cas設置前置節點為SIGNAL,返回false。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取node前節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            // 如果pred節點釋放了狀態,會通知自己
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                // 大于0證明,前面的線程等待超時或者已經被中斷,需要從節點中移除
                // 需要找到不大于0的那個節點
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 找到小于等于0的前節點,設置為SIGNAL
            // 這個地方ws值只會為PROPAGATE或者0
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt方法,這個方法需要前面的方法返回true才會執行,它會阻塞node的這個線程,返回線程的中斷中斷狀態并清理Thread.interrupted(),所以獨占式獲取同步狀態對中斷不響應的。

private final boolean parkAndCheckInterrupt() {
        // 阻塞線程
        LockSupport.park(this);
        return Thread.interrupted();
    }

cancelAcquire方法,在finally塊里面,出現異常就會執行這個方法,做一些處理當前node的操作。

// 異常后,finally里面執行的方法
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        // node是為節點,設置尾節點是pred
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            // 不是頭結點和尾節點,前節點是SIGNAL
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

附上一張acquire獨占式獲取同步狀態的流程圖:
AbstractQueuedSynchroizer(AQS) 同步器詳解詳解

②、release()獨占式釋放同步狀態,釋放線程后喚醒節點。
源代碼:
其中tryRelease也需要同步組件自己去實現,語義也被AQS所定義,true代表釋放成功,false代表失敗,如果為true,就需要決定是否去喚醒節點,首先獲取同步隊列的頭節點,判斷頭結點不是空,證明有同步對別有節點才需要喚醒,判斷頭結點不是剛剛初始化,如果是剛剛初始化,就還沒有阻塞,請參考acquire的acquireQueued處理節點的邏輯,都為true執行unparkSuccessor方法,false返回。

 public final boolean release(int arg) {
        // 釋放鎖
        if (tryRelease(arg)) {
            // 獲取頭結點
            Node h = head;
            // 頭結點不為空,證明初始化了
            // 證明頭結點不是剛剛創建
            // 那就可以去喚醒頭結點或者它的后繼節點
                        // 為0就證明沒有其他節點了,不需要喚醒
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor()方法,喚醒節點,喚醒的node的后置節點,因為在獲取同步狀態是我們阻塞的也是后置節點,喚醒后置節點后,會去找到前節點,也就是當前的結點去獲取同步狀態,然后再把自己變成頭結點。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
                // 沒有這個節點或者超時或者被中斷了,查找一個可以用的節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
                // 證明有這個節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

獨占式釋放同步狀態的流程圖:
AbstractQueuedSynchroizer(AQS) 同步器詳解詳解
③、acquireInterruptibly()響應中斷的獨占式獲取同步狀態
可以看出如果線程中斷立馬返回異常,然后再去執行tryAcquire()獲取同步狀態,獲取失敗執行doAcquireInterruptibly方法。

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

doAcquireInterruptibly()方法,和acquire的acquireQueued的方法差不多,區別就是在parkAndCheckInterrupt這個方法如果返回true,就會拋異常InterruptedException,說明這個方法響應異常。

private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

④、tryAcquireNanos()帶時間的獲取同步狀態,在時間內獲取到,返回true,超時返回false,首先判斷線程中斷狀態,為true就拋異常,為false就嘗試獲取同步狀態tryAcquire,獲取失敗執行doAcquireNanos方法。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

doAcquireNanos()方法,其實這個都是在acquire方法上的改進,我們看看這個方法,首先算下時間也就是deadline,然后加入同步隊列addWaiter方法,然后判斷node的前節點是否為頭結點,是就嘗試獲取同步狀態,都為true就返回,為false就接著算下時間,判斷node前節點是否為SIGNAL,也就是shouldParkAfterFailedAcquire這個方法,為true,線程阻塞計算的時間,然后true(等待阻塞時間到)和false都判斷線程中斷狀態,中斷就拋出異常,執行異常方法,不為true,繼續循環,直到獲取鎖或者超時。

 private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2)、共享系列的方法
①、acquireShared共享式獲取同步狀態,獲取失敗就加入同步隊列
AQS也把語義指定好了,返貨負數證明沒有了,就執行doAcquireShared方法

 public final void acquireShared(int arg) {
        // 返回負數就證明沒有鎖了,加入同步隊列
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared()方法,首先構造節點加入隊列addWaiter,然后獲取node的前節點,判斷node的前節點是否為頭結點,如果是,獲取資源的個數,如果資源大于等于0,調用setHeadAndPropagate方法,然后返回,不滿足,調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法和獨占式一樣。

private void doAcquireShared(int arg) {
        // 構建共享節點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取node的前節點
                final Node p = node.predecessor();
                // 前節點是否是頭節點
                if (p == head) {
                    // 獲取鎖的個數
                    int r = tryAcquireShared(arg);
                    // 大于等于0,獲取鎖成功
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 設置頭結點,如果有多余資源接著喚醒
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate()方法,設置頭結點,設置waitStatus為Propagate,為如果還有資源,喚醒后面的節點,調用doReleaseShared方法(這個方法會在共享式釋放同步狀態詳解)

private void setHeadAndPropagate(Node node, int propagate) {
        // 頭結點
        Node h = head; // Record old head for check below
        // 設置頭結點為node
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        // 還有資源
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 當前節點的next節點是共享或者沒有next節點
            if (s == null || s.isShared())
                // 喚醒后置節點
                doReleaseShared();
        }
    }

②、releaseShared()共享式釋放狀態
tryReleaseShared是需要同步組件自己去實現,釋放成功調用doReleaseShared喚醒節點

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

doReleaseShared()方法,方法有些復雜,不好理解,我們主要來分析三個if的含義,第一個 if (ws == Node.SIGNAL) 表示當前node需要被喚醒,然后后面利用cas設置waitStatus為0,因為是共享模式可能有多個線程同時來釋放同步狀態,所以只能有一個釋放成功,另外一個重試;第二個else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),其實也是用來處理并發的,當第一次并發失敗的線程第二次進入時,可能會看到ws等于0(因為成功的線程設置的),所以利用cas設置為PROPAGATE,表示傳遞,這里補充一下不管是0或者PROPAGATE,都會被喚醒的線程利用cas設置為SIGNAL(參考shouldParkAfterFailedAcquire方法);第三個(h == head)

  1. head.waitStatus的初始值必然為SIGNAL,因此在并發時,必然只有一個線程A能將等待狀態由 SIGNAL CAS更新為 0,
    該線程A會喚醒其他線程B
  2. 被喚醒的線程B會首先執行setHead
    因此如果最后h!=head,說明新一輪的喚醒競爭已經開始,當前線程c已經覺察到,因此繼續參與競爭,加快喚醒
    因此如果最后h==head,說明新一輪的喚醒競爭尚未開始,而被喚醒的線程B必然會開啟新一輪的喚醒競爭,而當前線程c可以安心退出喚醒競選

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            // 頭結點
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 表示后面節點需要喚醒
                if (ws == Node.SIGNAL) {
                    // 多線程控制并發 ,可能存在多個線程同時來修改
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果ws等于0,嘗試把cas設置waitStatus為PROPAGATE,傳遞下去
                // 請聯系shouldParkAfterFailedAcquire方法一起看
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果頭結點沒有發生變化,表示設置完成,退出循環
            // 如果發生變化,加入喚醒的過程(加速喚醒,可能存在多個線程在喚醒這些node,速度比一個接一個要快)
            if (h == head)                   // loop if head changed
                break;
        }
    }

    ③、acquireSharedInterruptibly()和tryAcquireSharedNanos()一個響應中斷,一個響應中斷支持添加獲取的超時時間(參考獨占模式的這些方法)
    3)、ConditionObject系列方法
    ①、await()方法,類似Object的await方法,阻塞線程釋放鎖。
    我們可以看見await的第一步是調用addConditionWaiter方法,它的作用是構建等待節點加入隊列的尾部,使用的也是AQS的Node,隊列里面順便也會清理清除Node不為CONDITION的節點;第二步需要釋放線程獲取的同步狀態fullyRelease方法;第三步:阻塞線程,找到線程中斷時機,也就是調用signal方法的前后順序;第四步:調用acquireQueued方法處理節點(阻塞還是其它);第五步:清理節點unlinkCancelledWaiters方法(清除Node不為CONDITION的節點);第六步:響應await語義,await阻塞線程時調用interrupt方法會拋異常reportInterruptAfterWait方法。

    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 加入等待隊列,清除節點
            Node node = addConditionWaiter();
            // 釋放狀態
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 阻塞線程
                LockSupport.park(this);
                // 線程是被中斷喚醒的
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 加入同步隊列
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                // 清理節點
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                // 響應中斷  await語義
                reportInterruptAfterWait(interruptMode);
        }

    addConditionWaiter方法每次都是加入隊列的尾部

    private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
                private void unlinkCancelledWaiters() {
            // 獲取第一個
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                // 獲取第一個的下一個
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    // t需要斷開連接
                    t.nextWaiter = null;
                    // 第一次trail = null
                    // firstWaiter = next
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

    isOnSyncQueue方法判斷node是否在同步隊列中

    final boolean isOnSyncQueue(Node node) {
        // 節點狀態為CONDITION ,或者node.prev == null 等待節點沒有前置節點
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 等待節點沒有next節點
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        // 循環查找
        return findNodeFromTail(node);

    checkInterruptWhileWaiting 方法,判斷是否是中斷喚醒,這方法就是為了確認中斷的時機是在signal的前面還是后面signal,因為需要響應中斷

    private int checkInterruptWhileWaiting(Node node) {
            // 判斷是否是線程中斷喚醒
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }
                final boolean transferAfterCancelledWait(Node node) {
        // 設置成功表示在signal 執行之前
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        // 設置成功表示在signal 執行之后
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    acquireQueued和unlinkCancelledWaiters方法前面都介紹過了,一個是加入同步隊列,一個是清理節點,介紹下reportInterruptAfterWait方法,它是我為了響應線程Interrupt方法,interruptMode == THROW_IE只在在signal方法后調用Interrupt方法才滿足,線程阻塞時調用Interrupt方法會拋異常,這是Object.await里面滿足的,請參考checkInterruptWhileWaiting方法里面的transferAfterCancelledWait方法理解其實現。

    private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

    ②、awaitNanos(long nanosTimeout)和awaitUntil(Date deadline)都是提供了超時時間,和await方法類似,只是加入了時間機制。
    ③、awaitUninterruptibly不響應中斷方法,發現里面都沒有判斷是都發生中斷的標記,只有調用signal喚醒node,循環才會結束,然后調用acquireQueued處理這個節點(阻塞還是其它)

    public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

    ④、signal方法,喚醒第一個等待隊列的node。

    public final void signal() {
            // 判斷是否獲取鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 初始化
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

    doSignal方法,首先把這個first節點和等待隊列斷開連接,然后把調用transferForSignal方法把節點從等待隊列加入同步隊列,喚醒節點的線程,然后被喚醒的線程就會在await方法里面執行acquireQueued這個方法。

    private void doSignal(Node first) {
            do {
                // 隊列里面即將沒有節點,所以首尾都要為null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 把first 斷開連接
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }
                final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    ⑤、signalAll喚醒所有等待隊列的節點加入同步隊列,并且清空等待隊列

    public final void signalAll() {
            // 判斷是否獲取鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 獲取第一個節點
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
                private void doSignalAll(Node first) {
            // 隊列設置為null
            lastWaiter = firstWaiter = null;
            // 從首節點開始加入同步隊列,知道隊列為空
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

    四、總結

    我們學習AQS其實我覺得主要從三個方面,也就是本文的第三部分,從獨占式獲取和釋放同步狀態、共享式獲取和釋放同步狀態和ConditionObject里面的等待/通知機制;這里在說一下獨占式釋放鎖和共享式釋放鎖,獨占式因為只會有一個線程獲取同步狀態,所以釋放時也只會有一個,但是在共享這一塊,我們在釋放同步同步狀態時可能會有多個線程同時來釋放,可能出現并發的情況,理解doReleaseShared是理解共享式釋放的重點;學習獲取和釋放同步狀態,理解同步隊列節點的變化是重點;學習等待/通知理解等待隊列和同步隊列的關系和節點的轉換;只有學習好了AQS才能更好的學習后面JUC的那些鎖。
    最后感慨下AQS里面的邏輯是真心有些繞,本人有些理解的可能有些不夠。
    參考《Java 并發編程的藝術》

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

龙陵县| 二手房| 青铜峡市| 尼勒克县| 泰安市| 炎陵县| 龙海市| 安义县| 昌乐县| 老河口市| 双鸭山市| 长岭县| 崇左市| 芷江| 如皋市| 姚安县| 镇赉县| 宜城市| 高台县| 平遥县| 垦利县| 普定县| 屯门区| 葵青区| 马鞍山市| 崇州市| 靖边县| 高唐县| 林西县| 固镇县| 洛阳市| 鹿邑县| 彭水| 天全县| 鸡泽县| 古丈县| 浦北县| 兴国县| 西乌珠穆沁旗| 莲花县| 礼泉县|