您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關AQS AbstractQueuedSynchronizer的同步框架是什么,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步組件的基礎框架。
它主要的設計思想是使用一個名為state
的int類型成員變量來表示同步狀態,AQS里面大部分方法都是再對這個邊進行操作;再內置一個FIFO隊列來完成資源獲取線程的排隊工作。
AQS的使用方式主要使用繼承方式,并且推薦使用靜態內部類,這樣做的好處是隔離了使用者和實現者所關注的領域。
方法名稱 | 描述 |
---|---|
getState() | 獲取當前同步狀態。 |
setState(int newState) | 設置當前同步狀態。 |
compareAndSetState(int expect,int update) | 使用CAS設置當前狀態,該方法能夠保證狀態 設置的原子性。 |
方法名稱 | 描述 |
---|---|
tryAcquire(int arg) | 獨占獲取同步狀態,實現該方法需要查詢當前狀態,并判斷同步狀態是否符合預期狀態,然后再進行CAS設置同步狀態。 |
treRelease(int arg) | 獨占式釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態 |
tryAcquireShared(int arg) | 共享式獲取同步狀態,返回大于等于0的值,表示獲取成功,反之失敗 |
tryReleaseShared(int arg) | 共享式釋放同步狀態 |
isHeldExclusively() | 當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占 |
這些可以覆蓋的方法其實都是有默認實現的,默認實現直接拋出一個異常,如:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
沒有把這些方法直接定義成抽象方法的好處是,根據需要我們只覆寫其中一部分方法就行了。假如我們要實現一個排它鎖,我們只需要去覆寫tryAcquire、treRelease和isHeldExclusively就行了。
AQS的使用模板方法的設計模式,大致分為三類接口:獨占式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步隊列中的等待線程情況。
同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取 同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成為一個節點(Node)并將其 加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再 次嘗試獲取同步狀態。
同步隊列中的節點(Node)用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和 后繼節點,他是一個靜態內部類。
static final class Node { /** * 標記表示節點正在共享模式下等待 */ static final Node SHARED = new Node(); /** * 標記表示節點正在獨占模式下等待 */ static final Node EXCLUSIVE = null; /** * 取消狀態 */ static final int CANCELLED = 1; /** * 后繼節點處于等待狀態 */ static final int SIGNAL = -1; /** * 表示線線程在等待隊列中 */ static final int CONDITION = -2; /** * 共享,表示狀態要往后面的節點傳播 */ static final int PROPAGATE = -3; /** * CANCELLED 值為1: 在同步隊列中等待的線程等待超時或者被中斷,需要從同步隊列中取消等待,節點進入該狀態后將不會發生變化 * SIGNAL 值為-1: 后繼節點的線程處于等待狀態,當前節點的線程如果釋放了同步狀態或者被取消,將會通知后繼節點,使后繼節點的線程得以運行 * CONDITION 值為-2: 節點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()方法后,該節點會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中 * PROPAGATE 值為-3: 表示下一次共享同步狀態將會無條件地被傳播下去 * INITIAL 值為0: 節點的初始狀態 */ volatile int waitStatus; /** * 前驅結點,當節點加入同步隊列時被設置(尾部添加) */ volatile Node prev; /** * 后繼節點 */ volatile Node next; /** * 獲取同步狀態的線程 */ volatile Thread thread; /** * 等待隊列中的后繼節點。如果當前節點是共享的,那么這個字段將是一個SHAEED常量,也就是說節點類型(獨占和共享)和等待隊列中的后繼節點共用一個字段 */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
節點是構成同步隊列(等待隊列)的基礎,同步器擁有首節點(head) 和尾節點(tail),沒有成功獲取同步狀態的線程將會成為節點加入該隊列的尾部,同步隊列的 基本結構如圖:
為了保證線程安全所以設置尾部節點需要使用compareAndSetTail(Node expect,Node update)方法。
由于只有獲取同步狀態的線程才能設置頭結點,所以沒有競爭可以直接使用setHead(Node update)方法。
通過調用同步器的acquire(int arg)方法可以獲取同步狀態。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代碼主要完成了同步狀態獲取、節點構造、加入同步隊列以及在同步隊列中自旋等待的相關工作,其主要邏輯是:首先調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨占式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)并通過addWaiter(Node node) 方法將該節點加入到同步隊列的尾部,最后調用acquireQueued(Node node,int arg)方法,使得該 節點以“死循環”的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而被阻塞線程的 喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。
下面分析一下相關工作。首先是節點的構造以及加入同步隊列:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先假設沒有競爭,快速的嘗試一下將當前節點設置成尾節點,如果失敗再調用enq方法,自旋 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
上述代碼通過使用compareAndSetTail(Node expect,Node update)方法來確保節點能夠被線 程安全添加。
在enq(final Node node)方法中,同步器通過“死循環”來保證節點的正確添加,在“死循 環”中只有通過CAS將節點設置成為尾節點之后,當前線程才能從該方法返回,否則,當前線 程不斷地嘗試設置。可以看出,enq(final Node node)方法將并發添加節點的請求通過CAS變 得“串行化”了。
節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說每個線程)都在自 省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這 個自旋過程中(并會阻塞節點的線程),代碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 以自旋的方式來獲取同步狀態 for (;;) { final Node p = node.predecessor(); // 獲取同步狀態 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 檢查并設置同步狀態,然后掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
在acquireQueued(final Node node,int arg)方法中,當前線程在“死循環”中嘗試獲取同步狀態,而只有前驅節點是頭節點才能夠嘗試獲取同步狀態,原因有兩個,如下:
頭節點是成功獲取到同狀態的節點,而頭節點的線程釋放了同步狀態之后,將會喚醒其后繼節點,后繼節點的線程被喚醒后需要檢查自己的前驅節點是否是頭節點。
維護同步隊列的FIFO原則。
節點自旋獲取同步狀態結構圖:
獨占式同步狀態獲取流程,也就是acquire(int arg)方法調用流程,如圖:
獨占式超時獲取同步態起始就是在獨占式獲取同步狀態的自旋方法體加上了時間判斷,每次循環都會判斷是否超少,代碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 在非常短的超時等待無法做到十分精確,所以同步器會直接進入無條件的快速自旋 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 檢查是否超時 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在非常短的超時等待無法做到十分精確,所以同步器會直接進入無條件的快速自旋。
當前線程獲取同步狀態并執行了相應邏輯之后,就需要釋放同步狀態,使得后續節點能 夠繼續獲取同步狀態。通過調用同步器的release(int arg)方法可以釋放同步狀態,該方法在釋 放了同步狀態之后,會喚醒其后繼節點(進而使后繼節點重新嘗試獲取同步狀態)。該方法代碼如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 喚醒頭結點的后繼節點 if (s != null) LockSupport.unpark(s.thread); }
package com.xiaolyuh; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 互斥鎖 * * @author yuhao.wang3 * @since 2019/7/10 17:21 */ public class MutexLock implements Lock { // 使用靜態內部類的方式來自定義同步器,隔離使用者和實現者 static class Sync extends AbstractQueuedSynchronizer { // 我們定義狀態標志位是1時表示獲取到了鎖,為0時表示沒有獲取到鎖 @Override protected boolean tryAcquire(int arg) { // 獲取鎖有競爭所以需要使用CAS原子操作 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { // 只有獲取到鎖的線程才會解鎖,所以這里沒有競爭,直接使用setState方法在來改變同步狀態 setState(0); setExclusiveOwnerThread(null); return true; } @Override protected boolean isHeldExclusively() { // 如果貨物到鎖,當前線程獨占 return getState() == 1; } // 返回一個Condition,每個condition都包含了一個condition隊列 Condition newCondition() { return new ConditionObject(); } } // 僅需要將操作代理到Sync上即可 private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryRelease(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(0); } @Override public Condition newCondition() { return sync.newCondition(); } public static void main(String[] args) { MutexLock lock = new MutexLock(); final User user = new User(); for (int i = 0; i < 100; i++) { new Thread(() -> { lock.lock(); try { user.setAge(user.getAge() + 1); System.out.println(user.getAge()); } finally { lock.unlock(); } }).start(); } } }
總結: 在獲取同步狀態時,同步器會維護一個同步隊列,獲取同步狀態失敗的線程都會被加入到同步隊列中并在隊列中進行自旋;移出隊列 (或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步 器調用tryRelease(int arg)方法釋放同步狀態,然后喚醒頭節點的后繼節點。
共享式獲取與獨占式獲取最主要的區別在于同一時刻能否有多個線程同時獲取到同步狀態。
左半部分,共享式訪問資源時,其他共享式的訪問均被允許,而獨占式訪問被 阻塞,右半部分是獨占式訪問資源時,同一時刻其他訪問均被阻塞。
通過調用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態,該方法代碼如下:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在acquireShared(int arg)方法中,同步器調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值為int類型,當返回值大于等于0時,表示能夠獲取到同步狀態。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態并退出自旋的條件就是 tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋過程中,如果當前節點的前驅為頭節點時,嘗試獲取同步狀態,如果返回值大于等于0,表示該次獲取同步狀態成功并從自旋過程中退出。
與獨占式一樣,共享式獲取也需要釋放同步狀態,通過調用releaseShared(int arg)方法可以 釋放同步狀態,代碼如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法在釋放同步狀態之后,將會喚醒后續處于等待狀態的節點。對于能夠支持多個線 程同時訪問的并發組件(比如Semaphore),它和獨占式主要區別在于tryReleaseShared(int arg) 方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。
package com.xiaolyuh; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 共享鎖,可以允許多個線程同時獲得鎖 * * @author yuhao.wang3 * @since 2019/7/12 10:00 */ public class SharedLock implements Lock { private static class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count); } @Override protected int tryAcquireShared(int arg) { int count = getState(); if (count > 0 && compareAndSetState(count, count - arg)) { setExclusiveOwnerThread(Thread.currentThread()); return count; } return -1; } @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { // 通過循環和CAS來保證安全的釋放鎖 int count = getState(); if (compareAndSetState(count, count + arg)) { setExclusiveOwnerThread(null); return true; } } } @Override protected boolean isHeldExclusively() { return getState() <= 0; } public Condition newCondition() { return new ConditionObject(); } } private Sync sync; /** * @param count 能同時獲取到鎖的線程數 */ public SharedLock(int count) { this.sync = new Sync(count); } @Override public void lock() { sync.acquireShared(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } @Override public boolean tryLock() { try { return sync.tryAcquireSharedNanos(1, 100L); } catch (InterruptedException e) { return false; } } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.releaseShared(1); } @Override public Condition newCondition() { return sync.newCondition(); } public static void main(String[] args) { final Lock lock = new SharedLock(5); // 啟動10個線程 for (int i = 0; i < 100; i++) { new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName()); Thread.sleep(1000); } catch (Exception e) { } finally { lock.unlock(); } }).start(); } // 每隔1秒換行 for (int i = 0; i < 20; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(); } } }
關于AQS AbstractQueuedSynchronizer的同步框架是什么就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。