您好,登錄后才能下訂單哦!
(1)讀寫鎖是什么?
(2)讀寫鎖具有哪些特性?
(3)ReentrantReadWriteLock是怎么實現讀寫鎖的?
(4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap?
讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個線程可以同時對共享資源進行讀訪問,但是同一時間只能有一個線程對共享資源進行寫訪問,使用讀寫鎖可以極大地提高并發量。
讀寫鎖具有以下特性:
是否互斥 | 讀 | 寫 |
---|---|---|
讀 | 否 | 是 |
寫 | 是 | 是 |
可以看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。
那么,ReentrantReadWriteLock是怎么實現讀寫鎖的呢?
在看源碼之前,我們還是先來看一下ReentrantReadWriteLock這個類的主要結構。
ReentrantReadWriteLock中的類分成三個部分:
(1)ReentrantReadWriteLock本身實現了ReadWriteLock接口,這個接口只提供了兩個方法readLock()
和writeLock()
;
(2)同步器,包含一個繼承了AQS的Sync內部類,以及其兩個子類FairSync和NonfairSync;
(3)ReadLock和WriteLock兩個內部類實現了Lock接口,它們具有鎖的一些特性。
// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;
維護了讀鎖、寫鎖和同步器。
// 默認構造方法
public ReentrantReadWriteLock() {
this(false);
}
// 是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
它提供了兩個構造方法,默認構造方法使用的是非公平鎖模式,在構造方法中初始化了讀鎖和寫鎖。
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
屬性中的讀鎖和寫鎖是私有屬性,通過這兩個方法暴露出去。
下面我們主要分析讀鎖和寫鎖的加鎖、解鎖方法,且都是基于非公平模式的。
// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {
// 嘗試獲取共享鎖(返回1表示成功,返回-1表示失敗)
if (tryAcquireShared(arg) < 0)
// 失敗了就可能要排隊
doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 狀態變量的值
// 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被獲取的次數,低16位存儲的是互斥鎖(寫鎖)被獲取的次數
int c = getState();
// 互斥鎖的次數
// 如果其它線程獲得了寫鎖,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 讀鎖被獲取的次數
int r = sharedCount(c);
// 下面說明此時還沒有寫鎖,嘗試去更新state的值獲取讀鎖
// 讀者是否需要排隊(是否是公平模式)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 獲取讀鎖成功
if (r == 0) {
// 如果之前還沒有線程獲取讀鎖
// 記錄第一個讀者為當前線程
firstReader = current;
// 第一個讀者重入的次數為1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果有線程獲取了讀鎖且是當前線程是第一個讀者
// 則把其重入次數加1
firstReaderHoldCount++;
} else {
// 如果有線程獲取了讀鎖且當前線程不是第一個讀者
// 則從緩存中獲取重入次數保存器
HoldCounter rh = cachedHoldCounter;
// 如果緩存不屬性當前線程
// 再從ThreadLocal中獲取
// readHolds本身是一個ThreadLocal,里面存儲的是HoldCounter
if (rh == null || rh.tid != getThreadId(current))
// get()的時候會初始化rh
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 如果rh的次數為0,把它放到ThreadLocal中去
readHolds.set(rh);
// 重入的次數加1(初始次數為0)
rh.count++;
}
// 獲取讀鎖成功,返回1
return 1;
}
// 通過這個方法再去嘗試獲取讀鎖(如果之前其它線程獲取了寫鎖,一樣返回-1表示失敗)
return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {
// 進入AQS的隊列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是頭節點(說明是第一個排隊的節點)
if (p == head) {
// 再次嘗試獲取讀鎖
int r = tryAcquireShared(arg);
// 如果成功了
if (r >= 0) {
// 頭節點后移并傳播
// 傳播即喚醒后面連續的讀節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 沒獲取到讀鎖,阻塞并等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
// h為舊的頭節點
Node h = head;
// 設置當前節點為新頭節點
setHead(node);
// 如果舊的頭節點或新的頭節點為空或者其等待狀態小于0(表示狀態為SIGNAL/PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 需要傳播
// 取下一個節點
Node s = node.next;
// 如果下一個節點為空,或者是需要獲取讀鎖的節點
if (s == null || s.isShared())
// 喚醒下一個節點
doReleaseShared();
}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 這個方法只會喚醒一個節點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒下一個節點
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果喚醒后head沒變,則跳出循環
if (h == head) // loop if head changed
break;
}
}
看完【死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖】的分析再看這章的內容應該會比較簡單,中間一樣的方法我們這里直接跳過了。
我們來看看大致的邏輯:
(1)先嘗試獲取讀鎖;
(2)如果成功了直接結束;
(3)如果失敗了,進入doAcquireShared()方法;
(4)doAcquireShared()方法中首先會生成一個新節點并進入AQS隊列中;
(5)如果頭節點正好是當前節點的上一個節點,再次嘗試獲取鎖;
(6)如果成功了,則設置頭節點為新節點,并傳播;
(7)傳播即喚醒下一個讀節點(如果下一個節點是讀節點的話);
(8)如果頭節點不是當前節點的上一個節點或者(5)失敗,則阻塞當前線程等待被喚醒;
(9)喚醒之后繼續走(5)的邏輯;
在整個邏輯中是在哪里連續喚醒讀節點的呢?
答案是在doAcquireShared()方法中,在這里一個節點A獲取了讀鎖后,會喚醒下一個讀節點B,這時候B也會獲取讀鎖,然后B繼續喚醒C,依次往復,也就是說這里的節點是一個喚醒一個這樣的形式,而不是一個節點獲取了讀鎖后一次性喚醒后面所有的讀節點。
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
// 如果嘗試釋放成功了,就喚醒下一個節點
if (tryReleaseShared(arg)) {
// 這個方法實際是喚醒下一個節點
doReleaseShared();
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 如果第一個讀者(讀線程)是當前線程
// 就把它重入的次數減1
// 如果減到0了就把第一個讀者置為空
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果第一個讀者不是當前線程
// 一樣地,把它重入的次數減1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 共享鎖獲取的次數減1
// 如果減為0了說明完全釋放了,才返回true
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行為跟方法名有點不符,實際是喚醒下一個節點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒下一個節點
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果喚醒后head沒變,則跳出循環
if (h == head) // loop if head changed
break;
}
}
解鎖的大致流程如下:
(1)將當前線程重入的次數減1;
(2)將共享鎖總共被獲取的次數減1;
(3)如果共享鎖獲取的次數減為0了,說明共享鎖完全釋放了,那就喚醒下一個節點;
如下圖,ABC三個節點各獲取了一次共享鎖,三者釋放的順序分別為ACB,那么最后B釋放共享鎖的時候tryReleaseShared()才會返回true,進而才會喚醒下一個節點D。
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {
sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// 先嘗試獲取鎖
// 如果失敗,則會進入隊列中排隊,后面的邏輯跟ReentrantLock一模一樣了
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 狀態變量state的值
int c = getState();
// 互斥鎖被獲取的次數
int w = exclusiveCount(c);
if (c != 0) {
// 如果c!=0且w==0,說明共享鎖被獲取的次數不為0
// 這句話整個的意思就是
// 如果共享鎖被獲取的次數不為0,或者被其它線程獲取了互斥鎖(寫鎖)
// 那么就返回false,獲取寫鎖失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 溢出檢測
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到這里說明當前線程已經獲取過寫鎖,這里是重入了,直接把state加1即可
setState(c + acquires);
// 獲取寫鎖成功
return true;
}
// 如果c等于0,就嘗試更新state的值(非公平模式writerShouldBlock()返回false)
// 如果失敗了,說明獲取寫鎖失敗,返回false
// 如果成功了,說明獲取寫鎖成功,把自己設置為占有者,并返回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 獲取寫鎖失敗了后面的邏輯跟ReentrantLock是一致的,進入隊列排隊,這里就不列源碼了
寫鎖獲取的過程大致如下:
(1)嘗試獲取鎖;
(2)如果有讀者占有著讀鎖,嘗試獲取寫鎖失敗;
(3)如果有其它線程占有著寫鎖,嘗試獲取寫鎖失敗;
(4)如果是當前線程占有著寫鎖,嘗試獲取寫鎖成功,state值加1;
(5)如果沒有線程占有著鎖(state==0),當前線程嘗試更新state的值,成功了表示嘗試獲取鎖成功,否則失敗;
(6)嘗試獲取鎖失敗以后,進入隊列排隊,等待被喚醒;
(7)后續邏輯跟ReentrantLock是一致;
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
// 如果嘗試釋放鎖成功(完全釋放鎖)
// 就嘗試喚醒下一個節點
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {
// 如果寫鎖不是當前線程占有著,拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 狀態變量的值減1
int nextc = getState() - releases;
// 是否完全釋放鎖
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 設置狀態變量的值
setState(nextc);
// 如果完全釋放了寫鎖,返回true
return free;
}
寫鎖釋放的過程大致為:
(1)先嘗試釋放鎖,即狀態變量state的值減1;
(2)如果減為0了,說明完全釋放了鎖;
(3)完全釋放了鎖才喚醒下一個等待的節點;
(1)ReentrantReadWriteLock采用讀寫鎖的思想,能提高并發的吞吐量;
(2)讀鎖使用的是共享鎖,多個讀鎖可以一起獲取鎖,互相不會影響,即讀讀不互斥;
(3)讀寫、寫讀和寫寫是會互斥的,前者占有著鎖,后者需要進入AQS隊列中排隊;
(4)多個連續的讀線程是一個接著一個被喚醒的,而不是一次性喚醒所有讀線程;
(5)只有多個讀鎖都完全釋放了才會喚醒下一個寫線程;
(6)只有寫鎖完全釋放了才會喚醒下一個等待者,這個等待者有可能是讀線程,也可能是寫線程;
(1)如果同一個線程先獲取讀鎖,再獲取寫鎖會怎樣?
分析上圖中的代碼,在tryAcquire()方法中,如果讀鎖被獲取的次數不為0(c != 0 && w == 0),返回false,返回之后外層方法會讓當前線程阻塞。
可以通過下面的方法驗證:
readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();
運行程序后會發現代碼停止在writeLock.lock();
,當然,你也可以打個斷點跟蹤進去看看。
(2)如果同一個線程先獲取寫鎖,再獲取讀鎖會怎樣?
分析上面的代碼,在tryAcquireShared()方法中,第一個紅框處并不會返回,因為不滿足getExclusiveOwnerThread() != current
;第二個紅框處如果原子更新成功就說明獲取了讀鎖,然后就會執行第三個紅框處的代碼把其重入次數更改為1。
可以通過下面的方法驗證:
writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();
你可以打個斷點跟蹤一下看看。
(3)死鎖了么?
通過上面的兩個例子,我們可以感受到同一個線程先讀后寫和先寫后讀是完全不一樣的,為什么不一樣呢?
先讀后寫,一個線程占有讀鎖后,其它線程還是可以占有讀鎖的,這時候如果在其它線程占有讀鎖之前讓自己占有了寫鎖,其它線程又不能占有讀鎖了,這段程序會非常難實現,邏輯也很奇怪,所以,設計成只要一個線程占有了讀鎖,其它線程包括它自己都不能再獲取寫鎖。
先寫后讀,一個線程占有寫鎖后,其它線程是不能占有任何鎖的,這時候,即使自己占有一個讀鎖,對程序的邏輯也不會有任何影響,所以,一個線程占有寫鎖后是可以再占有讀鎖的,只是這個時候其它線程依然無法獲取讀鎖。
如果你仔細思考上面的邏輯,你會發現一個線程先占有讀鎖后占有寫鎖,會有一個很大的問題——鎖無法被釋放也無法被獲取了。這個線程先占有了讀鎖,然后自己再占有寫鎖的時候會阻塞,然后它就自己把自己搞死了,進而把其它線程也搞死了,它無法釋放鎖,其它線程也無法獲得鎖了。
這是死鎖嗎?似乎不是,死鎖的定義是線程A占有著線程B需要的資源,線程B占有著線程A需要的資源,兩個線程相互等待對方釋放資源,經典的死鎖例子如下:
Object a = new Object();
Object b = new Object();
new Thread(()->{
synchronized (a) {
LockSupport.parkNanos(1000000);
synchronized (b) {
}
}
}).start();
new Thread(()->{
synchronized (b) {
synchronized (a) {
}
}
}).start();
簡單的死鎖用jstack是可以看到的:
"Thread-1":
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)
- waiting to lock <0x000000076baa9068> (a java.lang.Object)
- locked <0x000000076baa9078> (a java.lang.Object)
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"Thread-0":
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)
- waiting to lock <0x000000076baa9078> (a java.lang.Object)
- locked <0x000000076baa9068> (a java.lang.Object)
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
(4)如何使用ReentrantReadWriteLock實現一個高效安全的TreeMap?
class SafeTreeMap {
private final Map<String, Object> m = new TreeMap<String, Object>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return m.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return m.put(key, value);
} finally {
writeLock.unlock();
}
}
}
死磕 java同步系列之ReentrantLock VS synchronized
死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖
死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖
死磕 java同步系列之AQS起篇
死磕 java同步系列之自己動手寫一個鎖Lock
死磕 java魔法類之Unsafe解析
死磕 java同步系列之JMM(Java Memory Model)
死磕 java同步系列之volatile解析
歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。