您好,登錄后才能下訂單哦!
1.讀寫鎖的介紹
在并發場景中用于解決線程安全的問題,我們幾乎會高頻率的使用到獨占式鎖,通常使用java提供的關鍵字synchronized或者concurrents包中實現了Lock接口的。它們都是獨占式獲取鎖,也就是在同一時刻只有一個線程能夠獲取鎖。而在一些業務場景中,大部分只是讀數據,寫數據很少,如果僅僅是讀數據的話并不會影響數據正確性(出現臟讀),而如果在這種業務場景下,依然使用獨占鎖的話,很顯然這將是出現性能瓶頸的地方。
針對這種讀多寫少的情況,java還提供了另外一個實現Lock接口的ReentrantReadWriteLock(讀寫鎖)。讀寫所允許同一時刻被多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他的寫線程都會被阻塞。在分析WirteLock和ReadLock的互斥性時可以按照WriteLock與WriteLock之間,WriteLock與ReadLock之間以及ReadLock與ReadLock之間進行分析。更多關于讀寫鎖特性介紹大家可以看源碼上的介紹(閱讀源碼時最好的一種學習方式,我也正在學習中,與大家共勉),這里做一個歸納總結:
公平性選擇:支持非公平性(默認)和公平的鎖獲取方式,吞吐量還是非公平優于公平;
重入性:支持重入,讀鎖獲取后能再次獲取,寫鎖獲取之后能夠再次獲取寫鎖,同時也能夠獲取讀鎖;
鎖降級:遵循獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖
要想能夠徹底的理解讀寫鎖必須能夠理解這樣幾個問題:1. 讀寫鎖是怎樣實現分別記錄讀寫狀態的?2. 寫鎖是怎樣獲取和釋放的?3.讀鎖是怎樣獲取和釋放的?我們帶著這樣的三個問題,再去了解下讀寫鎖。
同步組件的實現聚合了同步器(AQS),并通過重寫重寫同步器(AQS)中的方法實現同步組件的同步語義,AQS的底層實現分析可以。因此,寫鎖的實現依然也是采用這種方式。在同一時刻寫鎖是不能被多個線程所獲取,很顯然寫鎖是獨占式鎖,而實現寫鎖的同步語義是通過重寫AQS中的tryAcquire方法實現的。源碼為:
protected?final?boolean?tryAcquire(int?acquires)?{ ????/* ?????*?Walkthrough: ?????*?1\.?If?read?count?nonzero?or?write?count?nonzero ?????*????and?owner?is?a?different?thread,?fail. ?????*?2\.?If?count?would?saturate,?fail.?(This?can?only ?????*????happen?if?count?is?already?nonzero.) ?????*?3\.?Otherwise,?this?thread?is?eligible?for?lock?if ?????*????it?is?either?a?reentrant?acquire?or ?????*????queue?policy?allows?it.?If?so,?update?state ?????*????and?set?owner. ?????*/ ????Thread?current?=?Thread.currentThread(); ????//?1\.?獲取寫鎖當前的同步狀態 ????int?c?=?getState(); ????//?2\.?獲取寫鎖獲取的次數 ????int?w?=?exclusiveCount(c); ????if?(c?!=?0)?{ ????????//?(Note:?if?c?!=?0?and?w?==?0?then?shared?count?!=?0) ????????//?3.1?當讀鎖已被讀線程獲取或者當前線程不是已經獲取寫鎖的線程的話 ????????//?當前線程獲取寫鎖失敗 ????????if?(w?==?0?||?current?!=?getExclusiveOwnerThread()) ????????????return?false; ????????if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT) ????????????throw?new?Error("Maximum?lock?count?exceeded"); ????????//?Reentrant?acquire ????????//?3.2?當前線程獲取寫鎖,支持可重復加鎖 ????????setState(c?+?acquires); ????????return?true; ????} ????//?3.3?寫鎖未被任何線程獲取,當前線程可獲取寫鎖 ????if?(writerShouldBlock()?|| ????????!compareAndSetState(c,?c?+?acquires)) ????????return?false; ????setExclusiveOwnerThread(current); ????return?true; }
這段代碼的邏輯請看注釋,這里有一個地方需要重點關注,exclusiveCount(c)方法,該方法源碼為:
static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?}
其中EXCLUSIVE_MASK為:?static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE _MASK為1左移16位然后減1,即為0x0000FFFF。而exclusiveCount方法是將同步狀態(state為int類型)與0x0000FFFF相與,即取同步狀態的低16位。那么低16位代表什么呢?根據exclusiveCount方法的注釋為獨占式獲取的次數即寫鎖被獲取的次數,現在就可以得出來一個結論同步狀態的低16位用來表示寫鎖的獲取次數。同時還有一個方法值得我們注意:
static?int?sharedCount(int?c)????{?return?c?>>>?SHARED_SHIFT;?}
該方法是獲取讀鎖被獲取的次數,是將同步狀態(int c)右移16次,即取同步狀態的高16位,現在我們可以得出另外一個結論同步狀態的高16位用來表示讀鎖被獲取的次數。現在還記得我們開篇說的需要弄懂的第一個問題嗎?讀寫鎖是怎樣實現分別記錄讀鎖和寫鎖的狀態的,現在這個問題的答案就已經被我們弄清楚了,其示意圖如下圖所示:
現在我們回過頭來看寫鎖獲取方法tryAcquire,其主要邏輯為:當讀鎖已經被讀線程獲取或者寫鎖已經被其他寫線程獲取,則寫鎖獲取失敗;否則,獲取成功并支持重入,增加寫狀態。
寫鎖釋放通過重寫AQS的tryRelease方法,源碼為:
protected?final?boolean?tryRelease(int?releases)?{ ????if?(!isHeldExclusively()) ????????throw?new?IllegalMonitorStateException(); ????//1\.?同步狀態減去寫狀態 ????int?nextc?=?getState()?-?releases; ????//2\.?當前寫狀態是否為0,為0則釋放寫鎖 ????boolean?free?=?exclusiveCount(nextc)?==?0; ????if?(free) ????????setExclusiveOwnerThread(null); ????//3\.?不為0則更新同步狀態 ????setState(nextc); ????return?free; }
源碼的實現邏輯請看注釋,不難理解與ReentrantLock基本一致,這里需要注意的是,減少寫狀態int nextc = getState() - releases;
只需要用當前同步狀態直接減去寫狀態的原因正是我們剛才所說的寫狀態是由同步狀態的低16位表示的。
看完了寫鎖,現在來看看讀鎖,讀鎖不是獨占式鎖,即同一時刻該鎖可以被多個讀線程獲取也就是一種共享式鎖。按照之前對AQS介紹,實現共享式同步組件的同步語義需要通過重寫AQS的tryAcquireShared方法和tryReleaseShared方法。讀鎖的獲取實現方法為:
protected?final?int?tryAcquireShared(int?unused)?{ ????/* ?????*?Walkthrough: ?????*?1\.?If?write?lock?held?by?another?thread,?fail. ?????*?2\.?Otherwise,?this?thread?is?eligible?for ?????*????lock?wrt?state,?so?ask?if?it?should?block ?????*????because?of?queue?policy.?If?not,?try ?????*????to?grant?by?CASing?state?and?updating?count. ?????*????Note?that?step?does?not?check?for?reentrant ?????*????acquires,?which?is?postponed?to?full?version ?????*????to?avoid?having?to?check?hold?count?in ?????*????the?more?typical?non-reentrant?case. ?????*?3\.?If?step?2?fails?either?because?thread ?????*????apparently?not?eligible?or?CAS?fails?or?count ?????*????saturated,?chain?to?version?with?full?retry?loop. ?????*/ ????Thread?current?=?Thread.currentThread(); ????int?c?=?getState(); ????//1\.?如果寫鎖已經被獲取并且獲取寫鎖的線程不是當前線程的話,當前 ????//?線程獲取讀鎖失敗返回-1 ????if?(exclusiveCount(c)?!=?0?&& ????????getExclusiveOwnerThread()?!=?current) ????????return?-1; ????int?r?=?sharedCount(c); ????if?(!readerShouldBlock()?&& ????????r?<?MAX_COUNT?&& ????????//2\.?當前線程獲取讀鎖 ????????compareAndSetState(c,?c?+?SHARED_UNIT))?{ ????????//3\.?下面的代碼主要是新增的一些功能,比如getReadHoldCount()方法 ????????//返回當前獲取讀鎖的次數 ????????if?(r?==?0)?{ ????????????firstReader?=?current; ????????????firstReaderHoldCount?=?1; ????????}?else?if?(firstReader?==?current)?{ ????????????firstReaderHoldCount++; ????????}?else?{ ????????????HoldCounter?rh?=?cachedHoldCounter; ????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current)) ????????????????cachedHoldCounter?=?rh?=?readHolds.get(); ????????????else?if?(rh.count?==?0) ????????????????readHolds.set(rh); ????????????rh.count++; ????????} ????????return?1; ????} ????//4\.?處理在第二步中CAS操作失敗的自旋已經實現重入性 ????return?fullTryAcquireShared(current); }
代碼的邏輯請看注釋,需要注意的是?當寫鎖被其他線程獲取后,讀鎖獲取失敗,否則獲取成功利用CAS更新同步狀態。另外,當前同步狀態需要加上SHARED_UNIT((1 << SHARED_SHIFT)
即0x00010000)的原因這是我們在上面所說的同步狀態的高16位用來表示讀鎖被獲取的次數。如果CAS失敗或者已經獲取讀鎖的線程再次獲取讀鎖時,是靠fullTryAcquireShared方法實現的,這段代碼就不展開說了,有興趣可以看看。
讀鎖釋放的實現主要通過方法tryReleaseShared,源碼如下,主要邏輯請看注釋:
protected?final?boolean?tryReleaseShared(int?unused)?{ ????Thread?current?=?Thread.currentThread(); ????//?前面還是為了實現getReadHoldCount等新功能 ????if?(firstReader?==?current)?{ ????????//?assert?firstReaderHoldCount?>?0; ????????if?(firstReaderHoldCount?==?1) ????????????firstReader?=?null; ????????else ????????????firstReaderHoldCount--; ????}?else?{ ????????HoldCounter?rh?=?cachedHoldCounter; ????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current)) ????????????rh?=?readHolds.get(); ????????int?count?=?rh.count; ????????if?(count?<=?1)?{ ????????????readHolds.remove(); ????????????if?(count?<=?0) ????????????????throw?unmatchedUnlockException(); ????????} ????????--rh.count; ????} ????for?(;;)?{ ????????int?c?=?getState(); ????????//?讀鎖釋放?將同步狀態減去讀狀態即可 ????????int?nextc?=?c?-?SHARED_UNIT; ????????if?(compareAndSetState(c,?nextc)) ????????????//?Releasing?the?read?lock?has?no?effect?on?readers, ????????????//?but?it?may?allow?waiting?writers?to?proceed?if ????????????//?both?read?and?write?locks?are?now?free. ????????????return?nextc?==?0; ????} }
讀寫鎖支持鎖降級,遵循按照獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖,不支持鎖升級,關于鎖降級下面的示例代碼摘自ReentrantWriteReadLock源碼中:
void?processCachedData()?{ ????????rwl.readLock().lock(); ????????if?(!cacheValid)?{ ????????????//?Must?release?read?lock?before?acquiring?write?lock ????????????rwl.readLock().unlock(); ????????????rwl.writeLock().lock(); ????????????try?{ ????????????????//?Recheck?state?because?another?thread?might?have ????????????????//?acquired?write?lock?and?changed?state?before?we?did. ????????????????if?(!cacheValid)?{ ????????????????????data?=?... ????????????cacheValid?=?true; ??????????} ??????????//?Downgrade?by?acquiring?read?lock?before?releasing?write?lock ??????????rwl.readLock().lock(); ????????}?finally?{ ??????????rwl.writeLock().unlock();?//?Unlock?write,?still?hold?read ????????} ??????} ??????try?{ ????????use(data); ??????}?finally?{ ????????rwl.readLock().unlock(); ??????} ????} }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。