您好,登錄后才能下訂單哦!
本篇內容主要講解“Java并發知識點有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java并發知識點有哪些”吧!
從操作系統的角度來看,線程是CPU分配的最小單位。
并行就是同一時刻,兩個線程都在執行。這就要求有兩個CPU去分別執行兩個線程。
并發就是同一時刻,只有一個執行,但是一個時間段內,兩個線程都執行了。并發的實現依賴于CPU切換線程,因為切換的時間特別短,所以基本對于用戶是無感知的。
就好像我們去食堂打飯,并行就是我們在多個窗口排隊,幾個阿姨同時打菜;并發就是我們擠在一個窗口,阿姨給這個打一勺,又手忙腳亂地給那個打一勺。
要說線程,必須得先說說進程。
進程:進程是代碼在數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位。
線程:線程是進程的一個執行路徑,一個進程中至少有一個線程,進程中的多個線程共享進程的資源。
操作系統在分配資源時是把資源分配給進程的, 但是 CPU 資源比較特殊,它是被分配到線程的,因為真正要占用CPU運行的是線程,所以也說線程是 CPU分配的基本單位。
比如在Java中,當我們啟動 main 函數其實就啟動了一個JVM進程,而 main 函數在的線程就是這個進程中的一個線程,也稱主線程。
一個進程中有多個線程,多個線程共用進程的堆和方法區資源,但是每個線程有自己的程序計數器和棧。
Java中創建線程主要有三種方式,分別為繼承Thread類、實現Runnable接口、實現Callable接口。
繼承Thread類,重寫run()方法,調用start()方法啟動線程
public class ThreadTest { /** * 繼承Thread類 */ public static class MyThread extends Thread { @Override public void run() { System.out.println("This is child thread"); } } public static void main(String[] args) { MyThread thread = new MyThread(); thread.start(); }}
實現 Runnable 接口,重寫run()方法
public class RunnableTask implements Runnable { public void run() { System.out.println("Runnable!"); } public static void main(String[] args) { RunnableTask task = new RunnableTask(); new Thread(task).start(); }}
上面兩種都是沒有返回值的,但是如果我們需要獲取線程的執行結果,該怎么辦呢?
實現Callable接口,重寫call()方法,這種方式可以通過FutureTask獲取任務執行的返回值
public class CallerTask implements Callable<String> { public String call() throws Exception { return "Hello,i am running!"; } public static void main(String[] args) { //創建異步任務 FutureTask<String> task=new FutureTask<String>(new CallerTask()); //啟動線程 new Thread(task).start(); try { //等待執行完成,并獲取返回結果 String result=task.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }}
JVM執行start方法,會先創建一條線程,由創建出來的新線程去執行thread的run方法,這才起到多線程的效果。
**為什么我們不能直接調用run()方法?**也很清楚, 如果直接調用Thread的run()方法,那么run方法還是運行在主線程中,相當于順序執行,就起不到多線程的效果。
線程等待與通知
在Object類中有一些函數可以用于線程的等待與通知。
wait():當一個線程A調用一個共享變量的 wait()方法時, 線程A會被阻塞掛起, 發生下面幾種情況才會返回 :
(1) 線程A調用了共享對象 notify()或者 notifyAll()方法;
(2)其他線程調用了線程A的 interrupt() 方法,線程A拋出InterruptedException異常返回。
wait(long timeout) :這個方法相比 wait() 方法多了一個超時參數,它的不同之處在于,如果線程A調用共享對象的wait(long timeout)方法后,沒有在指定的 timeout ms時間內被其它線程喚醒,那么這個方法還是會因為超時而返回。
wait(long timeout, int nanos),其內部調用的是 wait(long timout)函數。
上面是線程等待的方法,而喚醒線程主要是下面兩個方法:
notify() : 一個線程A調用共享對象的 notify() 方法后,會喚醒一個在這個共享變量上調用 wait 系列方法后被掛起的線程。 一個共享變量上可能會有多個線程在等待,具體喚醒哪個等待的線程是隨機的。
notifyAll() :不同于在共享變量上調用 notify() 函數會喚醒被阻塞到該共享變量上的一個線程,notifyAll()方法則會喚醒所有在該共享變量上由于調用 wait 系列方法而被掛起的線程。
Thread類也提供了一個方法用于等待的方法:
join():如果一個線程A執行了thread.join()語句,其含義是:當前線程A等待thread線程終止之后才
從thread.join()返回。
線程休眠
sleep(long millis) :Thread類中的靜態方法,當一個執行中的線程A調用了Thread 的sleep方法后,線程A會暫時讓出指定時間的執行權,但是線程A所擁有的監視器資源,比如鎖還是持有不讓出的。指定的睡眠時間到了后該函數會正常返回,接著參與 CPU 的調度,獲取到 CPU 資源后就可以繼續運行。
讓出優先權
yield() :Thread類中的靜態方法,當一個線程調用 yield 方法時,實際就是在暗示線程調度器當前線程請求讓出自己的CPU ,但是線程調度器可以無條件忽略這個暗示。
線程中斷
Java 中的線程中斷是一種線程間的協作模式,通過設置線程的中斷標志并不能直接終止該線程的執行,而是被中斷的線程根據中斷狀態自行處理。
void interrupt() :中斷線程,例如,當線程A運行時,線程B可以調用錢程interrupt() 方法來設置線程的中斷標志為true 并立即返回。設置標志僅僅是設置標志, 線程A實際并沒有被中斷, 會繼續往下執行。
boolean isInterrupted() 方法: 檢測當前線程是否被中斷。
boolean interrupted() 方法: 檢測當前線程是否被中斷,與 isInterrupted 不同的是,該方法如果發現當前線程被中斷,則會清除中斷標志。
在Java中,線程共有六種狀態:
狀態 | 說明 |
---|---|
NEW | 初始狀態:線程被創建,但還沒有調用start()方法 |
RUNNABLE | 運行狀態:Java線程將操作系統中的就緒和運行兩種狀態籠統的稱作“運行” |
BLOCKED | 阻塞狀態:表示線程阻塞于鎖 |
WAITING | 等待狀態:表示線程進入等待狀態,進入該狀態表示當前線程需要等待其他線程做出一些特定動作(通知或中斷) |
TIME_WAITING | 超時等待狀態:該狀態不同于 WAITIND,它是可以在指定的時間自行返回的 |
TERMINATED | 終止狀態:表示當前線程已經執行完畢 |
線程在自身的生命周期中, 并不是固定地處于某個狀態,而是隨著代碼的執行在不同的狀態之間進行切換,Java線程狀態變化如圖示:
使用多線程的目的是為了充分利用CPU,但是我們知道,并發其實是一個CPU來應付多個線程。
為了讓用戶感覺多個線程是在同時執行的, CPU 資源的分配采用了時間片輪轉也就是給每個線程分配一個時間片,線程在時間片內占用 CPU 執行任務。當線程使用完時間片后,就會處于就緒狀態并讓出 CPU 讓其他線程占用,這就是上下文切換。
Java中的線程分為兩類,分別為 daemon 線程(守護線程)和 user 線程(用戶線程)。
在JVM 啟動時會調用 main 函數,main函數所在的錢程就是一個用戶線程。其實在 JVM 內部同時還啟動了很多守護線程, 比如垃圾回收線程。
那么守護線程和用戶線程有什么區別呢?區別之一是當最后一個非守護線程束時, JVM會正常退出,而不管當前是否存在守護線程,也就是說守護線程是否結束并不影響 JVM退出。換而言之,只要有一個用戶線程還沒結束,正常情況下JVM就不會退出。
volatile和synchronized關鍵字
關鍵字volatile可以用來修飾字段(成員變量),就是告知程序任何對該變量的訪問均需要從共享內存中獲取,而對它的改變必須同步刷新回共享內存,它能保證所有線程對變量訪問的可見性。
關鍵字synchronized可以修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一個時刻,只能有一個線程處于方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。
等待/通知機制
可以通過Java內置的等待/通知機制(wait()/notify())實現一個線程修改一個對象的值,而另一個線程感知到了變化,然后進行相應的操作。
管道輸入/輸出流
管道輸入/輸出流和普通的文件輸入/輸出流或者網絡輸入/輸出流不同之處在于,它主要用于線程之間的數據傳輸,而傳輸的媒介為內存。
管道輸入/輸出流主要包括了如下4種具體實現:PipedOutputStream、PipedInputStream、 PipedReader和PipedWriter,前兩種面向字節,而后兩種面向字符。
使用Thread.join()
如果一個線程A執行了thread.join()語句,其含義是:當前線程A等待thread線程終止之后才從thread.join()返回。。線程Thread除了提供join()方法之外,還提供了join(long millis)和join(long millis,int nanos)兩個具備超時特性的方法。
使用ThreadLocal
ThreadLocal,即線程變量,是一個以ThreadLocal對象為鍵、任意對象為值的存儲結構。這個結構被附帶在線程上,也就是說一個線程可以根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值。
可以通過set(T)方法來設置一個值,在當前線程下再通過get()方法獲取到原先設置的值。
關于多線程,其實很大概率還會出一些筆試題,比如交替打印、銀行轉賬、生產消費模型等等,后面老三會單獨出一期來盤點一下常見的多線程筆試題。
ThreadLocal其實應用場景不是很多,但卻是被炸了千百遍的面試老油條,涉及到多線程、數據結構、JVM,可問的點比較多,一定要拿下。
ThreadLocal,也就是線程本地變量。如果你創建了一個ThreadLocal變量,那么訪問這個變量的每個線程都會有這個變量的一個本地拷貝,多個線程操作這個變量的時候,實際是操作自己本地內存里面的變量,從而起到線程隔離的作用,避免了線程安全問題。
創建
創建了一個ThreadLoca變量localVariable,任何一個線程都能并發訪問localVariable。
//創建一個ThreadLocal變量public static ThreadLocal<String> localVariable = new ThreadLocal<>();
寫入
線程可以在任何地方使用localVariable,寫入變量。
localVariable.set("鄙人三某”);
讀取
線程在任何地方讀取的都是它寫入的變量。
localVariable.get();
有用到過的,用來做用戶信息上下文的存儲。
我們的系統應用是一個典型的MVC架構,登錄后的用戶每次訪問接口,都會在請求頭中攜帶一個token,在控制層可以根據這個token,解析出用戶的基本信息。那么問題來了,假如在服務層和持久層都要用到用戶信息,比如rpc調用、更新用戶獲取等等,那應該怎么辦呢?
一種辦法是顯式定義用戶相關的參數,比如賬號、用戶名……這樣一來,我們可能需要大面積地修改代碼,多少有點瓜皮,那該怎么辦呢?
這時候我們就可以用到ThreadLocal,在控制層攔截請求把用戶信息存入ThreadLocal,這樣我們在任何一個地方,都可以取出ThreadLocal中存的用戶數據。
很多其它場景的cookie、session等等數據隔離也都可以通過ThreadLocal去實現。
我們常用的數據庫連接池也用到了ThreadLocal:
數據庫連接池的連接交給ThreadLoca進行管理,保證當前線程的操作都是同一個Connnection。
我們看一下ThreadLocal的set(T)方法,發現先獲取到當前線程,再獲取ThreadLocalMap
,然后把元素存到這個map中。
public void set(T value) { //獲取當前線程 Thread t = Thread.currentThread(); //獲取ThreadLocalMap ThreadLocalMap map = getMap(t); //講當前元素存入map if (map != null) map.set(this, value); else createMap(t, value); }
ThreadLocal實現的秘密都在這個ThreadLocalMap
了,可以Thread類中定義了一個類型為ThreadLocal.ThreadLocalMap
的成員變量threadLocals
。
public class Thread implements Runnable { //ThreadLocal.ThreadLocalMap是Thread的屬性 ThreadLocal.ThreadLocalMap threadLocals = null;}
ThreadLocalMap既然被稱為Map,那么毫無疑問它是<key,value>型的數據結構。我們都知道map的本質是一個個<key,value>形式的節點組成的數組,那ThreadLocalMap的節點是什么樣的呢?
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; //節點類 Entry(ThreadLocal<?> k, Object v) { //key賦值 super(k); //value賦值 value = v; } }
這里的節點,key可以簡單低視作ThreadLocal,value為代碼中放入的值,當然實際上key并不是ThreadLocal本身,而是它的一個弱引用,可以看到Entry的key繼承了 WeakReference(弱引用),再來看一下key怎么賦值的:
public WeakReference(T referent) { super(referent); }
key的賦值,使用的是WeakReference的賦值。
所以,怎么回答ThreadLocal原理?要答出這幾個點:
Thread類有一個類型為ThreadLocal.ThreadLocalMap的實例變量threadLocals,每個線程都有一個屬于自己的ThreadLocalMap。
ThreadLocalMap內部維護著Entry數組,每個Entry代表一個完整的對象,key是ThreadLocal的弱引用,value是ThreadLocal的泛型值。
每個線程在往ThreadLocal里設置值的時候,都是往自己的ThreadLocalMap里存,讀也是以某個ThreadLocal作為引用,在自己的map里找對應的key,從而實現了線程隔離。
ThreadLocal本身不存儲值,它只是作為一個key來讓線程往ThreadLocalMap里存取值。
我們先來分析一下使用ThreadLocal時的內存,我們都知道,在JVM中,棧內存線程私有,存儲了對象的引用,堆內存線程共享,存儲了對象實例。
所以呢,棧中存儲了ThreadLocal、Thread的引用,堆中存儲了它們的具體實例。
ThreadLocalMap中使用的 key 為 ThreadLocal 的弱引用。
“弱引用:只要垃圾回收機制一運行,不管JVM的內存空間是否充足,都會回收該對象占用的內存。”
那么現在問題就來了,弱引用很容易被回收,如果ThreadLocal(ThreadLocalMap的Key)被垃圾回收器回收了,但是ThreadLocalMap生命周期和Thread是一樣的,它這時候如果不被回收,就會出現這種情況:ThreadLocalMap的key沒了,value還在,這就會造成了內存泄漏問題。
那怎么解決內存泄漏問題呢?
很簡單,使用完ThreadLocal后,及時調用remove()方法釋放內存空間。
ThreadLocal<String> localVariable = new ThreadLocal();try { localVariable.set("鄙人三某”); ……} finally { localVariable.remove();}
那為什么key還要設計成弱引用?
key設計成弱引用同樣是為了防止內存泄漏。
假如key被設計成強引用,如果ThreadLocal Reference被銷毀,此時它指向ThreadLoca的強引用就沒有了,但是此時key還強引用指向ThreadLoca,就會導致ThreadLocal不能被回收,這時候就發生了內存泄漏的問題。
ThreadLocalMap雖然被叫做Map,其實它是沒有實現Map接口的,但是結構還是和HashMap比較類似的,主要關注的是兩個要素:元素數組
和散列方法
。
元素數組
一個table數組,存儲Entry類型的元素,Entry是ThreaLocal弱引用作為key,Object作為value的結構。
private Entry[] table;
散列方法
散列方法就是怎么把對應的key映射到table數組的相應下標,ThreadLocalMap用的是哈希取余法,取出key的threadLocalHashCode,然后和table數組長度減一&運算(相當于取余)。
int i = key.threadLocalHashCode & (table.length - 1);
這里的threadLocalHashCode計算有點東西,每創建一個ThreadLocal對象,它就會新增0x61c88647
,這個值很特殊,它是斐波那契數 也叫 黃金分割數。hash
增量為 這個數字,帶來的好處就是 hash
分布非常均勻。
private static final int HASH_INCREMENT = 0x61c88647; private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); }
我們可能都知道HashMap使用了鏈表來解決沖突,也就是所謂的鏈地址法。
ThreadLocalMap沒有使用鏈表,自然也不是用鏈地址法來解決沖突了,它用的是另外一種方式——開放定址法。開放定址法是什么意思呢?簡單來說,就是這個坑被人占了,那就接著去找空著的坑。
如上圖所示,如果我們插入一個value=27的數據,通過 hash計算后應該落入第 4 個槽位中,而槽位 4 已經有了 Entry數據,而且Entry數據的key和當前不相等。此時就會線性向后查找,一直找到 Entry為 null的槽位才會停止查找,把元素放到空的槽中。
在get的時候,也會根據ThreadLocal對象的hash值,定位到table中的位置,然后判斷該槽位Entry對象中的key是否和get的key一致,如果不一致,就判斷下一個位置。
在ThreadLocalMap.set()方法的最后,如果執行完啟發式清理工作后,未清理到任何數據,且當前散列數組中Entry
的數量已經達到了列表的擴容閾值(len*2/3)
,就開始執行rehash()
邏輯:
if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash();
再著看rehash()具體實現:這里會先去清理過期的Entry,然后還要根據條件判斷size >= threshold - threshold / 4
也就是size >= threshold* 3/4
來決定是否需要擴容。
private void rehash() { //清理過期Entry expungeStaleEntries(); //擴容 if (size >= threshold - threshold / 4) resize();}//清理過期Entryprivate void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); }}
接著看看具體的resize()
方法,擴容后的newTab
的大小為老數組的兩倍,然后遍歷老的table數組,散列方法重新計算位置,開放地址解決沖突,然后放到新的newTab
,遍歷完成之后,oldTab
中所有的entry
數據都已經放入到newTab
中了,然后table引用指向newTab
具體代碼:
父線程能用ThreadLocal來給子線程傳值嗎?毫無疑問,不能。那該怎么辦?
這時候可以用到另外一個類——InheritableThreadLocal
。
使用起來很簡單,在主線程的InheritableThreadLocal實例設置值,在子線程中就可以拿到了。
public class InheritableThreadLocalTest { public static void main(String[] args) { final ThreadLocal threadLocal = new InheritableThreadLocal(); // 主線程 threadLocal.set("不擅技術"); //子線程 Thread t = new Thread() { @Override public void run() { super.run(); System.out.println("鄙人三某 ," + threadLocal.get()); } }; t.start(); }}
那原理是什么呢?
原理很簡單,在Thread類里還有另外一個變量:
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
在Thread.init的時候,如果父線程的inheritableThreadLocals
不為空,就把它賦給當前線程(子線程)的inheritableThreadLocals
。
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals)
Java內存模型(Java Memory Model,JMM),是一種抽象的模型,被定義出來屏蔽各種硬件和操作系統的內存訪問差異。
JMM定義了線程和主內存之間的抽象關系:線程之間的共享變量存儲在主內存
(Main Memory)中,每個線程都有一個私有的本地內存
(Local Memory),本地內存中存儲了該線程以讀/寫共享變量的副本。
Java內存模型的抽象圖:
本地內存是JMM的 一個抽象概念,并不真實存在。它其實涵蓋了緩存、寫緩沖區、寄存器以及其他的硬件和編譯器優化。
圖里面的是一個雙核 CPU 系統架構 ,每個核有自己的控制器和運算器,其中控制器包含一組寄存器和操作控制器,運算器執行算術邏輔運算。每個核都有自己的一級緩存,在有些架構里面還有一個所有 CPU 共享的二級緩存。 那么 Java 內存模型里面的工作內存,就對應這里的 Ll 緩存或者 L2 緩存或者 CPU 寄存器。
原子性、有序性、可見性是并發編程中非常重要的基礎概念,JMM的很多技術都是圍繞著這三大特性展開。
原子性:原子性指的是一個操作是不可分割、不可中斷的,要么全部執行并且執行的過程不會被任何因素打斷,要么就全不執行。
可見性:可見性指的是一個線程修改了某一個共享變量的值時,其它線程能夠立即知道這個修改。
有序性:有序性指的是對于一個線程的執行代碼,從前往后依次執行,單線程下可以認為程序是有序的,但是并發時有可能會發生指令重排。
分析下面幾行代碼的原子性?
int i = 2;int j = i;i++;i = i + 1;
第1句是基本類型賦值,是原子性操作。
第2句先讀i的值,再賦值到j,兩步操作,不能保證原子性。
第3和第4句其實是等效的,先讀取i的值,再+1,最后賦值到i,三步操作了,不能保證原子性。
原子性、可見性、有序性都應該怎么保證呢?
原子性:JMM只能保證基本的原子性,如果要保證一個代碼塊的原子性,需要使用synchronized
。
可見性:Java是利用volatile
關鍵字來保證可見性的,除此之外,final
和synchronized
也能保證可見性。
有序性:synchronized
或者volatile
都可以保證多線程之間操作的有序性。
在執行程序時,為了提高性能,編譯器和處理器常常會對指令做重排序。重排序分3種類型。
編譯器優化的重排序。編譯器在不改變單線程程序語義的前提下,可以重新安排語句的執行順序。
指令級并行的重排序。現代處理器采用了指令級并行技術(Instruction-Level Parallelism,ILP)來將多條指令重疊執行。如果不存在數據依賴性,處理器可以改變語句對應 機器指令的執行順序。
內存系統的重排序。由于處理器使用緩存和讀/寫緩沖區,這使得加載和存儲操作看上去可能是在亂序執行。
從Java源代碼到最終實際執行的指令序列,會分別經歷下面3種重排序,如圖:
我們比較熟悉的雙重校驗單例模式就是一個經典的指令重排的例子,Singleton instance=new Singleton();
對應的JVM指令分為三步:分配內存空間–>初始化對象—>對象指向分配的內存空間,但是經過了編譯器的指令重排序,第二步和第三步就可能會重排序。
JMM屬于語言級的內存模型,它確保在不同的編譯器和不同的處理器平臺之上,通過禁止特定類型的編譯器重排序和處理器重排序,為程序員提供一致的內存可見性保證。
指令重排也是有一些限制的,有兩個規則happens-before
和as-if-serial
來約束。
happens-before的定義:
如果一個操作happens-before另一個操作,那么第一個操作的執行結果將對第二個操作可見,而且第一個操作的執行順序排在第二個操作之前。
兩個操作之間存在happens-before關系,并不意味著Java平臺的具體實現必須要按照 happens-before關系指定的順序來執行。如果重排序之后的執行結果,與按happens-before關系來執行的結果一致,那么這種重排序并不非法
happens-before和我們息息相關的有六大規則:
程序順序規則:一個線程中的每個操作,happens-before于該線程中的任意后續操作。
監視器鎖規則:對一個鎖的解鎖,happens-before于隨后對這個鎖的加鎖。
volatile變量規則:對一個volatile域的寫,happens-before于任意后續對這個volatile域的讀。
傳遞性:如果A happens-before B,且B happens-before C,那么A happens-before C。
start()規則:如果線程A執行操作ThreadB.start()(啟動線程B),那么A線程的 ThreadB.start()操作happens-before于線程B中的任意操作。
join()規則:如果線程A執行操作ThreadB.join()并成功返回,那么線程B中的任意操作 happens-before于線程A從ThreadB.join()操作成功返回。
as-if-serial語義的意思是:不管怎么重排序(編譯器和處理器為了提高并行度),單線程程序的執行結果不能被改變。編譯器、runtime和處理器都必須遵守as-if-serial語義。
為了遵守as-if-serial語義,編譯器和處理器不會對存在數據依賴關系的操作做重排序,因為這種重排序會改變執行結果。但是,如果操作之間不存在數據依賴關系,這些操作就可能被編譯器和處理器重排序。為了具體說明,請看下面計算圓面積的代碼示例。
double pi = 3.14; // Adouble r = 1.0; // B double area = pi * r * r; // C
上面3個操作的數據依賴關系:
A和C之間存在數據依賴關系,同時B和C之間也存在數據依賴關系。因此在最終執行的指令序列中,C不能被重排序到A和B的前面(C排到A和B的前面,程序的結果將會被改變)。但A和B之間沒有數據依賴關系,編譯器和處理器可以重排序A和B之間的執行順序。
所以最終,程序可能會有兩種執行順序:
as-if-serial語義把單線程程序保護了起來,遵守as-if-serial語義的編譯器、runtime和處理器共同編織了這么一個“楚門的世界”:單線程程序是按程序的“順序”來執行的。as- if-serial語義使單線程情況下,我們不需要擔心重排序的問題,可見性的問題。
volatile有兩個作用,保證可見性和有序性。
volatile怎么保證可見性的呢?
相比synchronized的加鎖方式來解決共享變量的內存可見性問題,volatile就是更輕量的選擇,它沒有上下文切換的額外開銷成本。
volatile可以確保對某個變量的更新對其他線程馬上可見,一個變量被聲明為volatile 時,線程在寫入變量時不會把值緩存在寄存器或者其他地方,而是會把值刷新回主內存 當其它線程讀取該共享變量 ,會從主內存重新獲取最新值,而不是使用當前線程的本地內存中的值。
例如,我們聲明一個 volatile 變量 volatile int x = 0,線程A修改x=1,修改完之后就會把新的值刷新回主內存,線程B讀取x的時候,就會清空本地內存變量,然后再從主內存獲取最新值。
volatile怎么保證有序性的呢?
重排序可以分為編譯器重排序和處理器重排序,valatile保證有序性,就是通過分別限制這兩種類型的重排序。
為了實現volatile的內存語義,編譯器在生成字節碼時,會在指令序列中插入內存屏障來禁止特定類型的處理器重排序。
在每個volatile寫操作的前面插入一個StoreStore
屏障
在每個volatile寫操作的后面插入一個StoreLoad
屏障
在每個volatile讀操作的后面插入一個LoadLoad
屏障
在每個volatile讀操作的后面插入一個LoadStore
屏障
synchronized經常用的,用來保證代碼的原子性。
synchronized主要有三種用法:
修飾實例方法: 作用于當前對象實例加鎖,進入同步代碼前要獲得 當前對象實例的鎖
synchronized void method() { //業務代碼}
修飾靜態方法:也就是給當前類加鎖,會作?于類的所有對象實例 ,進?同步代碼前要獲得當前 class 的鎖。因為靜態成員不屬于任何?個實例對象,是類成員( static 表明這是該類的?個靜態資源,不管 new 了多少個對象,只有?份)。
如果?個線程 A 調??個實例對象的?靜態 synchronized ?法,?線程 B 需要調?這個實例對象所屬類的靜態 synchronized ?法,是允許的,不會發?互斥現象,因為訪問靜態 synchronized ?法占?的鎖是當前類的鎖,?訪問?靜態 synchronized ?法占?的鎖是當前實例對象鎖。
synchronized void staic method() { //業務代碼}
修飾代碼塊 :指定加鎖對象,對給定對象/類加鎖。 synchronized(this|object) 表示進?同步代碼庫前要獲得給定對象的鎖。 synchronized(類.class) 表示進?同步代碼前要獲得 當前 class 的鎖
synchronized(this) { //業務代碼}
synchronized是怎么加鎖的呢?
我們使用synchronized的時候,發現不用自己去lock和unlock,是因為JVM幫我們把這個事情做了。
synchronized修飾代碼塊時,JVM采用monitorenter
、monitorexit
兩個指令來實現同步,monitorenter
指令指向同步代碼塊的開始位置, monitorexit
指令則指向同步代碼塊的結束位置。
反編譯一段synchronized修飾代碼塊代碼,javap -c -s -v -l SynchronizedDemo.class
,可以看到相應的字節碼指令。
synchronized修飾同步方法時,JVM采用ACC_SYNCHRONIZED
標記符來實現同步,這個標識指明了該方法是一個同步方法。
同樣可以寫段代碼反編譯看一下。
synchronized鎖住的是什么呢?
monitorenter、monitorexit或者ACC_SYNCHRONIZED都是基于Monitor實現的。
實例對象結構里有對象頭,對象頭里面有一塊結構叫Mark Word,Mark Word指針指向了monitor。
所謂的Monitor其實是一種同步工具,也可以說是一種同步機制。在Java虛擬機(HotSpot)中,Monitor是由ObjectMonitor實現的,可以叫做內部鎖,或者Monitor鎖。
ObjectMonitor的工作原理:
ObjectMonitor有兩個隊列:_WaitSet、_EntryList,用來保存ObjectWaiter 對象列表。
_owner,獲取 Monitor 對象的線程進入 _owner 區時, _count + 1。如果線程調用了 wait() 方法,此時會釋放 Monitor 對象, _owner 恢復為空, _count - 1。同時該等待線程進入 _WaitSet 中,等待被喚醒。
ObjectMonitor() { _header = NULL; _count = 0; // 記錄線程獲取鎖的次數 _waiters = 0, _recursions = 0; //鎖的重入次數 _object = NULL; _owner = NULL; // 指向持有ObjectMonitor對象的線程 _WaitSet = NULL; // 處于wait狀態的線程,會被加入到_WaitSet _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; // 處于等待鎖block狀態的線程,會被加入到該列表 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; }
可以類比一個去醫院就診的例子[18]:
首先,患者在門診大廳前臺或自助掛號機進行掛號;
隨后,掛號結束后患者找到對應的診室就診:
診室每次只能有一個患者就診;
如果此時診室空閑,直接進入就診;
如果此時診室內有其它患者就診,那么當前患者進入候診室,等待叫號;
就診結束后,走出就診室,候診室的下一位候診患者進入就診室。
這個過程就和Monitor機制比較相似:
門診大廳:所有待進入的線程都必須先在入口Entry Set掛號才有資格;
就診室:就診室**_Owner**里里只能有一個線程就診,就診完線程就自行離開
候診室:就診室繁忙時,進入等待區(Wait Set),就診室空閑的時候就從**等待區(Wait Set)**叫新的線程
所以我們就知道了,同步是鎖住的什么東西:
monitorenter,在判斷擁有同步標識 ACC_SYNCHRONIZED 搶先進入此方法的線程會優先擁有 Monitor 的 owner ,此時計數器 +1。
monitorexit,當執行完退出后,計數器 -1,歸 0 后被其他進入的線程獲得。
synchronized怎么保證可見性?
線程加鎖前,將清空工作內存中共享變量的值,從而使用共享變量時需要從主內存中重新讀取最新的值。
線程加鎖后,其它線程無法獲取主內存中的共享變量。
線程解鎖前,必須把共享變量的最新值刷新到主內存中。
synchronized怎么保證有序性?
synchronized同步的代碼塊,具有排他性,一次只能被一個線程擁有,所以synchronized保證同一時刻,代碼是單線程執行的。
因為as-if-serial語義的存在,單線程的程序能保證最終結果是有序的,但是不保證不會指令重排。
所以synchronized保證的有序是執行結果的有序性,而不是防止指令重排的有序性。
synchronized怎么實現可重入的呢?
synchronized 是可重入鎖,也就是說,允許一個線程二次請求自己持有對象鎖的臨界資源,這種情況稱為可重入鎖。
synchronized 鎖對象的時候有個計數器,他會記錄下線程獲取鎖的次數,在執行完對應的代碼塊之后,計數器就會-1,直到計數器清零,就釋放鎖了。
之所以,是可重入的。是因為 synchronized 鎖對象有個計數器,會隨著線程獲取鎖后 +1 計數,當線程執行完畢后 -1,直到清零釋放鎖。
了解鎖升級,得先知道,不同鎖的狀態是什么樣的。這個狀態指的是什么呢?
Java對象頭里,有一塊結構,叫Mark Word
標記字段,這塊結構會隨著鎖的狀態變化而變化。
64 位虛擬機 Mark Word 是 64bit,我們來看看它的狀態變化:
Mark Word存儲對象自身的運行數據,如哈希碼、GC分代年齡、鎖狀態標志、偏向時間戳(Epoch) 等。
synchronized做了哪些優化?
在JDK1.6之前,synchronized的實現直接調用ObjectMonitor的enter和exit,這種鎖被稱之為重量級鎖。從JDK6開始,HotSpot虛擬機開發團隊對Java中的鎖進行優化,如增加了適應性自旋、鎖消除、鎖粗化、輕量級鎖和偏向鎖等優化策略,提升了synchronized的性能。
偏向鎖:在無競爭的情況下,只是在Mark Word里存儲當前線程指針,CAS操作都不做。
輕量級鎖:在沒有多線程競爭時,相對重量級鎖,減少操作系統互斥量帶來的性能消耗。但是,如果存在鎖競爭,除了互斥量本身開銷,還額外有CAS操作的開銷。
自旋鎖:減少不必要的CPU上下文切換。在輕量級鎖升級為重量級鎖時,就使用了自旋加鎖的方式
鎖粗化:將多個連續的加鎖、解鎖操作連接在一起,擴展成一個范圍更大的鎖。
鎖消除:虛擬機即時編譯器在運行時,對一些代碼上要求同步,但是被檢測到不可能存在共享數據競爭的鎖進行消除。
鎖升級的過程是什么樣的?
鎖升級方向:無鎖–>偏向鎖—> 輕量級鎖---->重量級鎖,這個方向基本上是不可逆的。
我們看一下升級的過程:
偏向鎖的獲取:
判斷是否為可偏向狀態–MarkWord中鎖標志是否為‘01’,是否偏向鎖是否為‘1’
如果是可偏向狀態,則查看線程ID是否為當前線程,如果是,則進入步驟’5’,否則進入步驟‘3’
通過CAS操作競爭鎖,如果競爭成功,則將MarkWord中線程ID設置為當前線程ID,然后執行‘5’;競爭失敗,則執行‘4’
CAS獲取偏向鎖失敗表示有競爭。當達到safepoint時獲得偏向鎖的線程被掛起,偏向鎖升級為輕量級鎖,然后被阻塞在安全點的線程繼續往下執行同步代碼塊
執行同步代碼
偏向鎖的撤銷:
偏向鎖不會主動釋放(撤銷),只有遇到其他線程競爭時才會執行撤銷,由于撤銷需要知道當前持有該偏向鎖的線程棧狀態,因此要等到safepoint時執行,此時持有該偏向鎖的線程(T)有‘2’,‘3’兩種情況;
撤銷----T線程已經退出同步代碼塊,或者已經不再存活,則直接撤銷偏向鎖,變成無鎖狀態----該狀態達到閾值20則執行批量重偏向
升級----T線程還在同步代碼塊中,則將T線程的偏向鎖升級為輕量級鎖,當前線程執行輕量級鎖狀態下的鎖獲取步驟----該狀態達到閾值40則執行批量撤銷
輕量級鎖的獲取:
進行加鎖操作時,jvm會判斷是否已經時重量級鎖,如果不是,則會在當前線程棧幀中劃出一塊空間,作為該鎖的鎖記錄,并且將鎖對象MarkWord復制到該鎖記錄中
復制成功之后,jvm使用CAS操作將對象頭MarkWord更新為指向鎖記錄的指針,并將鎖記錄里的owner指針指向對象頭的MarkWord。如果成功,則執行‘3’,否則執行‘4’
更新成功,則當前線程持有該對象鎖,并且對象MarkWord鎖標志設置為‘00’,即表示此對象處于輕量級鎖狀態
更新失敗,jvm先檢查對象MarkWord是否指向當前線程棧幀中的鎖記錄,如果是則執行‘5’,否則執行‘4’
表示鎖重入;然后當前線程棧幀中增加一個鎖記錄第一部分(Displaced Mark Word)為null,并指向Mark Word的鎖對象,起到一個重入計數器的作用。
表示該鎖對象已經被其他線程搶占,則進行自旋等待(默認10次),等待次數達到閾值仍未獲取到鎖,則升級為重量級鎖
大體上省簡的升級過程:
完整的升級過程:
可以從鎖的實現、功能特點、性能等幾個維度去回答這個問題:
鎖的實現: synchronized是Java語言的關鍵字,基于JVM實現。而ReentrantLock是基于JDK的API層面實現的(一般是lock()和unlock()方法配合try/finally 語句塊來完成。)
性能: 在JDK1.6鎖優化以前,synchronized的性能比ReenTrantLock差很多。但是JDK6開始,增加了適應性自旋、鎖消除等,兩者性能就差不多了。
功能特點: ReentrantLock 比 synchronized 增加了一些高級功能,如等待可中斷、可實現公平鎖、可實現選擇性通知。
ReentrantLock提供了一種能夠中斷等待鎖的線程的機制,通過lock.lockInterruptibly()來實現這個機制
ReentrantLock可以指定是公平鎖還是非公平鎖。而synchronized只能是非公平鎖。所謂的公平鎖就是先等待的線程先獲得鎖。
synchronized與wait()和notify()/notifyAll()方法結合實現等待/通知機制,ReentrantLock類借助Condition接口與newCondition()方法實現。
ReentrantLock需要手工聲明來加鎖和釋放鎖,一般跟finally配合釋放鎖。而synchronized不用手動釋放鎖。
下面的表格列出出了兩種鎖之間的區別:
AbstractQueuedSynchronizer 抽象同步隊列,簡稱 AQS ,它是Java并發包的根基,并發包中的鎖就是基于AQS實現的。
AQS是基于一個FIFO的雙向隊列,其內部定義了一個節點類Node,Node 節點內部的 SHARED 用來標記該線程是獲取共享資源時被阻掛起后放入AQS 隊列的, EXCLUSIVE 用來標記線程是 取獨占資源時被掛起后放入AQS 隊列
AQS 使用一個 volatile 修飾的 int 類型的成員變量 state 來表示同步狀態,修改同步狀態成功即為獲得鎖,volatile 保證了變量在多線程之間的可見性,修改 State 值時通過 CAS 機制來保證修改的原子性
獲取state的方式分為兩種,獨占方式和共享方式,一個線程使用獨占方式獲取了資源,其它線程就會在獲取失敗后被阻塞。一個線程使用共享方式獲取了資源,另外一個線程還可以通過CAS的方式進行獲取。
如果共享資源被占用,需要一定的阻塞等待喚醒機制來保證鎖的分配,AQS 中會將競爭共享資源失敗的線程添加到一個變體的 CLH 隊列中。
先簡單了解一下CLH:Craig、Landin and Hagersten 隊列,是 單向鏈表實現的隊列。申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現 前驅節點釋放了鎖就結束自旋
AQS 中的隊列是 CLH 變體的虛擬雙向隊列,通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配:
AQS 中的 CLH 變體等待隊列擁有以下特性:
AQS 中隊列是個雙向鏈表,也是 FIFO 先進先出的特性
通過 Head、Tail 頭尾兩個節點來組成隊列結構,通過 volatile 修飾保證可見性
Head 指向節點為已獲得鎖的節點,是一個虛擬節點,節點本身不持有具體線程
獲取不到同步狀態,會將節點進行自旋獲取鎖,自旋一定次數失敗后會將線程阻塞,相對于 CLH 隊列性能較好
ps:AQS源碼里面有很多細節可問,建議有時間好好看看AQS源碼。
ReentrantLock 是可重入的獨占鎖,只能有一個線程可以獲取該鎖,其它獲取該鎖的線程會被阻塞而被放入該鎖的阻塞隊列里面。
看看ReentrantLock的加鎖操作:
// 創建非公平鎖 ReentrantLock lock = new ReentrantLock(); // 獲取鎖操作 lock.lock(); try { // 執行代碼邏輯 } catch (Exception ex) { // ... } finally { // 解鎖操作 lock.unlock(); }
new ReentrantLock()
構造函數默認創建的是非公平鎖 NonfairSync。
公平鎖 FairSync
公平鎖是指多個線程按照申請鎖的順序來獲取鎖,線程直接進入隊列中排隊,隊列中的第一個線程才能獲得鎖
公平鎖的優點是等待鎖的線程不會餓死。缺點是整體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程以外的所有線程都會阻塞,CPU 喚醒阻塞線程的開銷比非公平鎖大
非公平鎖 NonfairSync
非公平鎖是多個線程加鎖時直接嘗試獲取鎖,獲取不到才會到等待隊列的隊尾等待。但如果此時鎖剛好可用,那么這個線程可以無需阻塞直接獲取到鎖
非公平鎖的優點是可以減少喚起線程的開銷,整體的吞吐效率高,因為線程有幾率不阻塞直接獲得鎖,CPU 不必喚醒所有線程。缺點是處于等待隊列中的線程可能會餓死,或者等很久才會獲得鎖
默認創建的對象lock()的時候:
如果鎖當前沒有被其它線程占用,并且當前線程之前沒有獲取過該鎖,則當前線程會獲取到該鎖,然后設置當前鎖的擁有者為當前線程,并設置 AQS 的狀態值為1 ,然后直接返回。如果當前線程之前己經獲取過該鎖,則這次只是簡單地把 AQS 的狀態值加1后返回。
如果該鎖己經被其他線程持有,非公平鎖會嘗試去獲取鎖,獲取失敗的話,則調用該方法線程會被放入 AQS 隊列阻塞掛起。
new ReentrantLock()
構造函數默認創建的是非公平鎖 NonfairSync
public ReentrantLock() { sync = new NonfairSync();}
同時也可以在創建鎖構造函數中傳入具體參數創建公平鎖 FairSync
ReentrantLock lock = new ReentrantLock(true);--- ReentrantLock// true 代表公平鎖,false 代表非公平鎖public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
FairSync、NonfairSync 代表公平鎖和非公平鎖,兩者都是 ReentrantLock 靜態內部類,只不過實現不同鎖語義。
非公平鎖和公平鎖的兩處不同:
非公平鎖在調用 lock 后,首先就會調用 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被占用,那么直接就獲取到鎖返回了。
非公平鎖在 CAS 失敗后,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷等待隊列是否有線程處于等待狀態,如果有則不去搶鎖,乖乖排到后面。
相對來說,非公平鎖會有更好的性能,因為它的吞吐量比較大。當然,非公平鎖讓獲取鎖的時間變得更加不確定,可能會導致在阻塞隊列中的線程長期處于饑餓狀態。
CAS叫做CompareAndSwap,?較并交換,主要是通過處理器的指令來保證操作的原?性的。
CAS 指令包含 3 個參數:共享變量的內存地址 A、預期的值 B 和共享變量的新值 C。
只有當內存中地址 A 處的值等于 B 時,才能將內存中地址 A 處的值更新為新值 C。作為一條 CPU 指令,CAS 指令本身是能夠保證原子性的 。
CAS的經典三大問題:
并發環境下,假設初始條件是A,去修改數據時,發現是A就會執行修改。但是看到的雖然是A,中間可能發生了A變B,B又變回A的情況。此時A已經非彼A,數據即使成功修改,也可能有問題。
怎么解決ABA問題?
加版本號
每次修改變量,都在這個變量的版本號上加1,這樣,剛剛A->B->A,雖然A的值沒變,但是它的版本號已經變了,再判斷版本號就會發現此時的A已經被改過了。參考樂觀鎖的版本號,這種做法可以給數據帶上了一種實效性的檢驗。
Java提供了AtomicStampReference類,它的compareAndSet方法首先檢查當前的對象引用值是否等于預期引用,并且當前印戳(Stamp)標志是否等于預期標志,如果全部相等,則以原子方式將引用值和印戳標志的值更新為給定的更新值。
自旋CAS,如果一直循環執行,一直不成功,會給CPU帶來非常大的執行開銷。
怎么解決循環性能開銷問題?
在Java中,很多使用自旋CAS的地方,會有一個自旋次數的限制,超過一定次數,就停止自旋。
CAS 保證的是對一個變量執行操作的原子性,如果對多個變量操作時,CAS 目前無法直接保證操作的原子性的。
怎么解決只能保證一個變量的原子操作問題?
可以考慮改用鎖來保證操作的原子性
可以考慮合并多個變量,將多個變量封裝成一個對象,通過AtomicReference來保證原子性。
使用循環原子類,例如AtomicInteger,實現i++原子操作
使用juc包下的鎖,如ReentrantLock ,對i++操作加鎖lock.lock()來實現原子性
使用synchronized,對i++操作加鎖
當程序更新一個變量時,如果多線程同時更新這個變量,可能得到期望之外的值,比如變量i=1,A線程更新i+1,B線程也更新i+1,經過兩個線程操作之后可能i不等于3,而是等于2。因為A和B線程在更新變量i的時候拿到的i都是1,這就是線程不安全的更新操作,一般我們會使用synchronized來解決這個問題,synchronized會保證多線程不會同時更新變量i。
其實除此之外,還有更輕量級的選擇,Java從JDK 1.5開始提供了java.util.concurrent.atomic包,這個包中的原子操作類提供了一種用法簡單、性能高效、線程安全地更新一個變量的方式。
因為變量的類型有很多種,所以在Atomic包里一共提供了13個類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用和原子更新屬性(字段)。
Atomic包里的類基本都是使用Unsafe實現的包裝類。
使用原子的方式更新基本類型,Atomic包提供了以下3個類:
AtomicBoolean:原子更新布爾類型。
AtomicInteger:原子更新整型。
AtomicLong:原子更新長整型。
通過原子的方式更新數組里的某個元素,Atomic包提供了以下4個類:
AtomicIntegerArray:原子更新整型數組里的元素。
AtomicLongArray:原子更新長整型數組里的元素。
AtomicReferenceArray:原子更新引用類型數組里的元素。
AtomicIntegerArray類主要是提供原子的方式更新數組里的整型
原子更新基本類型的AtomicInteger,只能更新一個變量,如果要原子更新多個變量,就需要使用這個原子更新引用類型提供的類。Atomic包提供了以下3個類:
AtomicReference:原子更新引用類型。
AtomicReferenceFieldUpdater:原子更新引用類型里的字段。
AtomicMarkableReference:原子更新帶有標記位的引用類型。可以原子更新一個布爾類型的標記位和引用類型。構造方法是AtomicMarkableReference(V initialRef,boolean initialMark)。
如果需原子地更新某個類里的某個字段時,就需要使用原子更新字段類,Atomic包提供了以下3個類進行原子字段更新:
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新長整型字段的更新器。
AtomicStampedReference:原子更新帶有版本號的引用類型。該類將整數值與引用關聯起來,可用于原子的更新數據和數據的版本號,可以解決使用CAS進行原子更新時可能出現的 ABA問題。
一句話概括:使用CAS實現。
以AtomicInteger的添加方法為例:
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
通過Unsafe
類的實例來進行添加操作,來看看具體的CAS操作:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
compareAndSwapInt 是一個native方法,基于CAS來操作int類型變量。其它的原子操作類基本都是大同小異。
死鎖是指兩個或兩個以上的線程在執行過程中,因爭奪資源而造成的互相等待的現象,在無外力作用的情況下,這些線程會一直相互等待而無法繼續運行下去。
那么為什么會產生死鎖呢? 死鎖的產生必須具備以下四個條件:
互斥條件:指線程對己經獲取到的資源進行它性使用,即該資源同時只由一個線程占用。如果此時還有其它線程請求獲取獲取該資源,則請求者只能等待,直至占有資源的線程釋放該資源。
請求并持有條件:指一個 線程己經持有了至少一個資源,但又提出了新的資源請求,而新資源己被其它線程占有,所以當前線程會被阻塞,但阻塞 的同時并不釋放自己已經獲取的資源。
不可剝奪條件:指線程獲取到的資源在自己使用完之前不能被其它線程搶占,只有在自己使用完畢后才由自己釋放該資源。
環路等待條件:指在發生死鎖時,必然存在一個線程——資源的環形鏈,即線程集合 {T0,T1,T2,…… ,Tn} 中 T0 正在等待一 T1 占用的資源,Tl1正在等待 T2用的資源,…… Tn 在等待己被 T0占用的資源。
該如何避免死鎖呢?答案是至少破壞死鎖發生的一個條件。
其中,互斥這個條件我們沒有辦法破壞,因為用鎖為的就是互斥。不過其他三個條件都是有辦法破壞掉的,到底如何做呢?
對于“請求并持有”這個條件,可以一次性請求所有的資源。
對于“不可剝奪”這個條件,占用部分資源的線程進一步申請其他資源時,如果申請不到,可以主動釋放它占有的資源,這樣不可搶占這個條件就破壞掉了。
對于“環路等待”這個條件,可以靠按序申請資源來預防。所謂按序申請,是指資源是有線性順序的,申請的時候可以先申請資源序號小的,再申請資源序號大的,這樣線性化后就不存在環路了。
可以使用jdk自帶的命令行工具排查:
使用jps查找運行的Java進程:jps -l
使用jstack查看線程堆棧信息:jstack -l 進程id
基本就可以看到死鎖的信息。
還可以利用圖形化工具,比如JConsole。出現線程死鎖以后,點擊JConsole線程面板的檢測到死鎖
按鈕,將會看到線程的死鎖信息。
CountDownLatch,倒計數器,有兩個常見的應用場景[18]:
場景1:協調子線程結束動作:等待所有子線程運行結束
CountDownLatch允許一個或多個線程等待其他線程完成操作。
例如,我們很多人喜歡玩的王者榮耀,開黑的時候,得等所有人都上線之后,才能開打。
CountDownLatch模仿這個場景(參考[18]):
創建大喬、蘭陵王、安其拉、哪吒和鎧等五個玩家,主線程必須在他們都完成確認后,才可以繼續運行。
在這段代碼中,new CountDownLatch(5)
用戶創建初始的latch數量,各玩家通過countDownLatch.countDown()
完成狀態確認,主線程通過countDownLatch.await()
等待。
public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); Thread 大喬 = new Thread(countDownLatch::countDown); Thread 蘭陵王 = new Thread(countDownLatch::countDown); Thread 安其拉 = new Thread(countDownLatch::countDown); Thread 哪吒 = new Thread(countDownLatch::countDown); Thread 鎧 = new Thread(() -> { try { // 稍等,上個衛生間,馬上到... Thread.sleep(1500); countDownLatch.countDown(); } catch (InterruptedException ignored) {} }); 大喬.start(); 蘭陵王.start(); 安其拉.start(); 哪吒.start(); 鎧.start(); countDownLatch.await(); System.out.println("所有玩家已經就位!"); }
場景2. 協調子線程開始動作:統一各線程動作開始的時機
王者游戲中也有類似的場景,游戲開始時,各玩家的初始狀態必須一致。不能有的玩家都出完裝了,有的才降生。
所以大家得一塊出生,在
在這個場景中,仍然用五個線程代表大喬、蘭陵王、安其拉、哪吒和鎧等五個玩家。需要注意的是,各玩家雖然都調用了start()
線程,但是它們在運行時都在等待countDownLatch
的信號,在信號未收到前,它們不會往下執行。
public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Thread 大喬 = new Thread(() -> waitToFight(countDownLatch)); Thread 蘭陵王 = new Thread(() -> waitToFight(countDownLatch)); Thread 安其拉 = new Thread(() -> waitToFight(countDownLatch)); Thread 哪吒 = new Thread(() -> waitToFight(countDownLatch)); Thread 鎧 = new Thread(() -> waitToFight(countDownLatch)); 大喬.start(); 蘭陵王.start(); 安其拉.start(); 哪吒.start(); 鎧.start(); Thread.sleep(1000); countDownLatch.countDown(); System.out.println("敵方還有5秒達到戰場,全軍出擊!"); } private static void waitToFight(CountDownLatch countDownLatch) { try { countDownLatch.await(); // 在此等待信號再繼續 System.out.println("收到,發起進攻!"); } catch (InterruptedException e) { e.printStackTrace(); } }
CountDownLatch的核心方法也不多:
await()
:等待latch降為0;
boolean await(long timeout, TimeUnit unit)
:等待latch降為0,但是可以設置超時時間。比如有玩家超時未確認,那就重新匹配,總不能為了某個玩家等到天荒地老。
countDown()
:latch數量減1;
getCount()
:獲取當前的latch數量。
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一 組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。
它和CountDownLatch類似,都可以協調多線程的結束動作,在它們結束后都可以執行特定動作,但是為什么要有CyclicBarrier,自然是它有和CountDownLatch不同的地方。
不知道你聽沒聽過一個新人UP主小約翰可汗,小約翰生平有兩大恨——“想結衣結衣不依,迷愛理愛理不理。”我們來還原一下事情的經過:小約翰在親政后認識了新垣結衣,于是決定第一次選妃,向結衣表白,等待回應。然而新垣結衣回應嫁給了星野源,小約翰傷心欲絕,發誓生平不娶,突然發現了鈴木愛理,于是小約翰決定第二次選妃,求愛理搭理,等待回應。
我們拿代碼模擬這一場景,發現CountDownLatch無能為力了,因為CountDownLatch的使用是一次性的,無法重復利用,而這里等待了兩次。此時,我們用CyclicBarrier就可以實現,因為它可以重復利用。
運行結果:
CyclicBarrier最最核心的方法,仍然是await():
如果當前線程不是第一個到達屏障的話,它將會進入等待,直到其他線程都到達,除非發生被中斷、屏障被拆除、屏障被重設等情況;
上面的例子抽象一下,本質上它的流程就是這樣就是這樣:
兩者最核心的區別[18]:
CountDownLatch是一次性的,而CyclicBarrier則可以多次設置屏障,實現重復利用;
CountDownLatch中的各個子線程不可以等待其他線程,只能完成自己的任務;而CyclicBarrier中的各個線程可以等待其他線程
它們區別用一個表格整理:
CyclicBarrier | CountDownLatch |
---|---|
CyclicBarrier是可重用的,其中的線程會等待所有的線程完成任務。屆時,屏障將被拆除,并可以選擇性地做一些特定的動作。 | CountDownLatch是一次性的,不同的線程在同一個計數器上工作,直到計數器為0. |
CyclicBarrier面向的是線程數 | CountDownLatch面向的是任務數 |
在使用CyclicBarrier時,你必須在構造中指定參與協作的線程數,這些線程必須調用await()方法 | 使用CountDownLatch時,則必須要指定任務數,至于這些任務由哪些線程完成無關緊要 |
CyclicBarrier可以在所有的線程釋放后重新使用 | CountDownLatch在計數器為0時不能再使用 |
在CyclicBarrier中,如果某個線程遇到了中斷、超時等問題時,則處于await的線程都會出現問題 | 在CountDownLatch中,如果某個線程出現問題,其他線程不受影響 |
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。
聽起來似乎很抽象,現在汽車多了,開車出門在外的一個老大難問題就是停車 。停車場的車位是有限的,只能允許若干車輛停泊,如果停車場還有空位,那么顯示牌顯示的就是綠燈和剩余的車位,車輛就可以駛入;如果停車場沒位了,那么顯示牌顯示的就是綠燈和數字0,車輛就得等待。如果滿了的停車場有車離開,那么顯示牌就又變綠,顯示空車位數量,等待的車輛就能進停車場。
我們把這個例子類比一下,車輛就是線程,進入停車場就是線程在執行,離開停車場就是線程執行完畢,看見紅燈就表示線程被阻塞,不能執行,Semaphore的本質就是協調多個線程對共享資源的獲取。
我們再來看一個Semaphore的用途:它可以用于做流量控制,特別是公用資源有限的應用場景,比如數據庫連接。
假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程并發地讀取,但是如果讀到內存后,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,就可以使用Semaphore來做流量控制,如下:
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); }}
在代碼中,雖然有30個線程在執行,但是只允許10個并發執行。Semaphore的構造方法Semaphore(int permits
)接受一個整型的數字,表示可用的許可證數量。Semaphore(10)
表示允許10個線程獲取許可證,也就是最大并發數是10。Semaphore的用法也很簡單,首先線程使用 Semaphore的acquire()方法獲取一個許可證,使用完之后調用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。
Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger用于進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。
這兩個線程通過 exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象,這時候會交換兩人的數據,并使用交叉規則得出2個交配結果。Exchanger也可以用于校對工作,比如我們需要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之后,系統需要加載這兩個Excel,并對兩個Excel數據進行校對,看看是否錄入一致。
public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { String A = "銀行流水A"; // A錄入銀行流水數據 exgr.exchange(A); } catch (InterruptedException e) { } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "銀行流水B"; // B錄入銀行流水數據 String A = exgr.exchange("B"); System.out.println("A和B數據是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B); } catch (InterruptedException e) { } } }); threadPool.shutdown(); }}
假如兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以使用exchange(V x, long timeOut, TimeUnit unit)
設置最大等待時長
線程池: 簡單理解,它就是一個管理線程的池子。
它幫我們管理線程,避免增加創建線程和銷毀線程的資源損耗。因為線程其實也是一個對象,創建一個對象,需要經過類加載過程,銷毀一個對象,需要走GC垃圾回收流程,都是需要資源開銷的。
提高響應速度。 如果任務到達了,相對于從線程池拿線程,重新去創建一條線程執行,速度肯定慢很多。
重復利用。 線程用完,再放回池子,可以達到重復利用的效果,節省資源。
之前我們有一個和第三方對接的需求,需要向第三方推送數據,引入了多線程來提升數據推送的效率,其中用到了線程池來管理線程。
主要代碼如下:
完整可運行代碼地址:https://gitee.com/fighter3/thread-demo.git
線程池的參數如下:
corePoolSize:線程核心參數選擇了CPU數×2
maximumPoolSize:最大線程數選擇了和核心線程數相同
keepAliveTime:非核心閑置線程存活時間直接置為0
unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒
workQueue:線程池等待隊列,使用 LinkedBlockingQueue阻塞隊列
同時還用了synchronized 來加鎖,保證數據不會被重復推送:
synchronized (PushProcessServiceImpl.class) {}
ps:這個例子只是簡單地進行了數據推送,實際上還可以結合其他的業務,像什么數據清洗啊、數據統計啊,都可以套用。
用一個通俗的比喻:
有一個營業廳,總共有六個窗口,現在開放了三個窗口,現在有三個窗口坐著三個營業員小姐姐在營業。
老三去辦業務,可能會遇到什么情況呢?
老三發現有空間的在營業的窗口,直接去找小姐姐辦理業務。
老三發現沒有空閑的窗口,就在排隊區排隊等。
老三發現沒有空閑的窗口,等待區也滿了,蚌埠住了,經理一看,就讓休息的小姐姐趕緊回來上班,等待區號靠前的趕緊去新窗口辦,老三去排隊區排隊。小姐姐比較辛苦,假如一段時間發現他們可以不用接著營業,經理就讓她們接著休息。
老三一看,六個窗口都滿了,等待區也沒位置了。老三急了,要鬧,經理趕緊出來了,經理該怎么辦呢?
我們銀行系統已經癱瘓
誰叫你來辦的你找誰去
看你比較急,去隊里加個塞
今天沒辦法,不行你看改一天
上面的這個流程幾乎就跟 JDK 線程池的大致流程類似,
營業中的 3個窗口對應核心線程池數:corePoolSize
總的營業窗口數6對應:maximumPoolSize
打開的臨時窗口在多少時間內無人辦理則關閉對應:unit
排隊區就是等待隊列:workQueue
無法辦理的時候銀行給出的解決方法對應:RejectedExecutionHandler
threadFactory 該參數在 JDK 中是 線程工廠,用來創建線程對象,一般不會動。
所以我們線程池的工作流程也比較好理解了:
線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。
當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
如果正在運行的線程數量小于 corePoolSize,那么馬上創建線程運行這個任務;
如果正在運行的線程數量大于或等于 corePoolSize,那么將這個任務放入隊列;
如果這時候隊列滿了,而且正在運行的線程數量小于 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;
如果隊列滿了,而且正在運行的線程數量大于或等于 maximumPoolSize,那么線程池會根據拒絕策略來對應處理。
當一個線程完成任務時,它會從隊列中取下一個任務來執行。
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大于 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。
線程池有七大參數,需要重點關注corePoolSize
、maximumPoolSize
、workQueue
、handler
這四個。
corePoolSize
此值是用來初始化線程池中核心線程數,當線程池中線程池數< corePoolSize
時,系統默認是添加一個任務才創建一個線程池。當線程數 = corePoolSize時,新任務會追加到workQueue中。
maximumPoolSize
maximumPoolSize
表示允許的最大線程數 = (非核心線程數+核心線程數),當BlockingQueue
也滿了,但線程池中總線程數 < maximumPoolSize
時候就會再次創建新的線程。
keepAliveTime
非核心線程 =(maximumPoolSize - corePoolSize ) ,非核心線程閑置下來不干活最多存活時間。
unit
線程池中非核心線程保持存活的時間的單位
TimeUnit.DAYS; 天
TimeUnit.HOURS; 小時
TimeUnit.MINUTES; 分鐘
TimeUnit.SECONDS; 秒
TimeUnit.MILLISECONDS; 毫秒
TimeUnit.MICROSECONDS; 微秒
TimeUnit.NANOSECONDS; 納秒
workQueue
線程池等待隊列,維護著等待執行的Runnable
對象。當運行當線程數= corePoolSize時,新的任務會被添加到workQueue
中,如果workQueue
也滿了則嘗試用非核心線程執行任務,等待隊列應該盡量用有界的。
threadFactory
創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等。
handler
corePoolSize
、workQueue
、maximumPoolSize
都不可用的時候執行的飽和策略。
類比前面的例子,無法辦理業務時的處理方式,幫助記憶:
AbortPolicy :直接拋出異常,默認使用此策略
CallerRunsPolicy:用調用者所在的線程來執行任務
DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,也就是隊列里靠前的任務
DiscardPolicy :當前任務直接丟棄
想實現自己的拒絕策略,實現RejectedExecutionHandler接口即可。
常用的阻塞隊列主要有以下幾種:
ArrayBlockingQueue:ArrayBlockingQueue(有界隊列)是一個用數組實現的有界阻塞隊列,按FIFO排序量。
LinkedBlockingQueue:LinkedBlockingQueue(可設置容量隊列)是基于鏈表結構的阻塞隊列,按FIFO排序任務,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列
DelayQueue:DelayQueue(延遲隊列)是一個任務定時周期的延遲執行的隊列。根據指定的執行時間從小到大排序,否則根據插入到隊列的先后排序。newScheduledThreadPool線程池使用了這個隊列。
PriorityBlockingQueue:PriorityBlockingQueue(優先級隊列)是具有優先級的無界阻塞隊列
SynchronousQueue:SynchronousQueue(同步隊列)是一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列。
execute 用于提交不需要返回值的任務
threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } });
submit()方法用于提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個 future對象可以判斷任務是否執行成功,并且可以通過future的get()方法來獲取返回值
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理無法執行任務異常 } finally { // 關閉線程池 executor.shutdown();}
可以通過調用線程池的shutdown
或shutdownNow
方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。
shutdown() 將線程池狀態置為shutdown,并不會立即停止:
停止接收外部submit的任務
內部正在跑的任務和隊列里等待的任務,會執行完
等到第二步完成后,才真正停止
shutdownNow() 將線程池狀態置為stop。一般會立即停止,事實上不一定:
和shutdown()一樣,先停止接收外部提交的任務
忽略隊列里等待的任務
嘗試將正在跑的任務interrupt中斷
返回未執行的任務列表
shutdown 和shutdownnow簡單來說區別如下:
shutdownNow()能立即停止線程池,正在跑的和正在等待的任務都停下了。這樣做立即生效,但是風險也比較大。
shutdown()只是關閉了提交通道,用submit()是無效的;而內部的任務該怎么跑還是怎么跑,跑完再徹底停止線程池。
線程在Java中屬于稀缺資源,線程池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。
計算密集型:大部分都在用CPU跟內存,加密,邏輯操作業務處理等。
IO密集型:數據庫鏈接,網絡通訊傳輸等。
一般的經驗,不同類型線程池的參數配置:
計算密集型一般推薦線程池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些數據在硬盤中需要多來一個線程將數據讀入內存)。如果線程池數太大,可能會頻繁的 進行線程上下文切換跟任務調度。獲得當前CPU核心數代碼如下:
Runtime.getRuntime().availableProcessors();
IO密集型:線程數適當大一點,機器的Cpu核心數*2。
混合型:可以考慮根絕情況將它拆分成CPU密集型和IO密集型任務,如果執行時間相差不大,拆分可以提升吞吐量,反之沒有必要。
當然,實際應用中沒有固定的公式,需要結合測試和監控來進行調整。
面試常問,主要有四種,都是通過工具類Excutors創建出來的,需要注意,阿里巴巴《Java開發手冊》里禁止使用這種方式來創建線程池。
newFixedThreadPool (固定數目線程的線程池)
newCachedThreadPool (可緩存線程的線程池)
newSingleThreadExecutor (單線程的線程池)
newScheduledThreadPool (定時及周期執行的線程池)
前三種線程池的構造直接調用ThreadPoolExecutor的構造方法。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
線程池特點
核心線程數為1
最大線程數也為1
阻塞隊列是無界隊列LinkedBlockingQueue,可能會導致OOM
keepAliveTime為0
工作流程:
提交任務
線程池是否有一條線程在,如果沒有,新建線程執行任務
如果有,將任務加到阻塞隊列
當前的唯一線程,從隊列取任務,執行完一個,再繼續取,一個線程執行任務。
適用場景
適用于串行執行任務的場景,一個任務一個任務地執行。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
線程池特點:
核心線程數和最大線程數大小一樣
沒有所謂的非空閑時間,即keepAliveTime為0
阻塞隊列為無界隊列LinkedBlockingQueue,可能會導致OOM
工作流程:
提交任務
如果線程數少于核心線程,創建核心線程執行任務
如果線程數等于核心線程,把任務添加到LinkedBlockingQueue阻塞隊列
如果線程執行完任務,去阻塞隊列取任務,繼續執行。
使用場景
FixedThreadPool 適用于處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
線程池特點:
核心線程數為0
最大線程數為Integer.MAX_VALUE,即無限大,可能會因為無限創建線程,導致OOM
阻塞隊列是SynchronousQueue
非核心線程空閑存活時間為60秒
當提交任務的速度大于處理任務的速度時,每次提交一個任務,就必然會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。由于空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
工作流程:
提交任務
因為沒有核心線程,所以任務直接加到SynchronousQueue隊列。
判斷是否有空閑線程,如果有,就去取出任務執行。
如果沒有空閑線程,就新建一個線程執行。
執行完任務的線程,還可以存活60秒,如果在這期間,接到任務,可以繼續活下去;否則,被銷毀。
適用場景
用于并發執行大量短期的小任務。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
線程池特點
最大線程數為Integer.MAX_VALUE,也有OOM的風險
阻塞隊列是DelayedWorkQueue
keepAliveTime為0
scheduleAtFixedRate() :按某種速率周期執行
scheduleWithFixedDelay():在某個延遲后執行
工作機制
線程從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大于等于當前時間。
線程執行這個ScheduledFutureTask。
線程修改ScheduledFutureTask的time變量為下次將要被執行的時間。
線程把這個修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
使用場景
周期性執行任務的場景,需要限制線程數量的場景
使用無界隊列的線程池會導致什么問題嗎?
例如newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程獲取一個任務后,任務的執行時間比較長,會導致隊列的任務越積越多,導致機器內存使用不停飆升,最終導致OOM。
在使用線程池處理任務的時候,任務代碼可能拋出RuntimeException,拋出異常后,線程池可能捕獲它,也可能創建一個新的線程來代替異常的線程,我們可能無法感知任務出現了異常,因此我們需要考慮線程池異常情況。
常見的異常處理方式:
線程池有這幾個狀態:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
//線程池狀態 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
線程池各個狀態切換圖:
RUNNING
該狀態的線程池會接收新任務,并處理阻塞隊列中的任務;
調用線程池的shutdown()方法,可以切換到SHUTDOWN狀態;
調用線程池的shutdownNow()方法,可以切換到STOP狀態;
SHUTDOWN
該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
隊列為空,并且線程池中執行的任務也為空,進入TIDYING狀態;
STOP
該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
線程池中執行的任務為空,進入TIDYING狀態;
TIDYING
該狀態表明所有的任務已經運行終止,記錄的任務數量為0。
terminated()執行完畢,進入TERMINATED狀態
TERMINATED
該狀態表示線程池徹底終止
線程池提供了幾個 setter方法來設置線程池的參數。
這里主要有兩個思路:
在我們微服務的架構下,可以利用配置中心如Nacos、Apollo等等,也可以自己開發配置中心。業務服務讀取線程池配置,獲取相應的線程池實例來修改線程池的參數。
如果限制了配置中心的使用,也可以自己去擴展ThreadPoolExecutor,重寫方法,監聽線程池參數變化,來動態修改線程池參數。
線程池配置沒有固定的公式,通常事前會對線程池進行一定評估,常見的評估方案如下:
上線之前也要進行充分的測試,上線之后要建立完善的線程池監控機制。
事中結合監控告警機制,分析線程池的問題,或者可優化點,結合線程池動態參數配置機制來調整配置。
事后要注意仔細觀察,隨時調整。
具體的調優案例可以查看參考[7]美團技術博客。
這道題在阿里的面試中出現頻率比較高
線程池實現原理可以查看 要是以前有人這么講線程池,我早就該明白了! ,當然,我們自己實現, 只需要抓住線程池的核心流程-參考[6]:
我們自己的實現就是完成這個核心流程:
線程池中有N個工作線程
把任務提交給線程池運行
如果線程池已滿,把任務放入隊列
最后當有空閑時,獲取隊列中任務來執行
實現代碼[6]:
這樣,一個實現了線程池主要流程的類就完成了。
我們可以對正在處理和阻塞隊列的任務做事務管理或者對阻塞隊列中的任務持久化處理,并且當斷電或者系統崩潰,操作無法繼續下去的時候,可以通過回溯日志的方式來撤銷正在處理
的已經執行成功的操作。然后重新執行整個阻塞隊列。
也就是說,對阻塞隊列持久化;正在處理任務事務控制;斷電之后正在處理任務的回滾,通過日志恢復該次操作;服務器重啟后阻塞隊列中的數據再加載。
關于一些并發容器,可以去看看 面渣逆襲:Java集合連環三十問 ,里面有CopyOnWriteList
和ConcurrentHashMap
這兩種線程安全容器類的問答。。
Fork/Join框架是Java7提供的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
要想掌握Fork/Join框架,首先需要理解兩個點,分而治之和工作竊取算法。
分而治之
Fork/Join框架的定義,其實就體現了分治思想:將一個規模為N的問題分解為K個規模較小的子問題,這些子問題相互獨立且與原問題性質相同。求出子問題的解,就可得到原問題的解。
工作竊取算法
大任務拆成了若干個小任務,把這些小任務放到不同的隊列里,各自創建單獨線程來執行隊列里的任務。
那么問題來了,有的線程干活塊,有的線程干活慢。干完活的線程不能讓它空下來,得讓它去幫沒干完活的線程干活。它去其它線程的隊列里竊取一個任務來執行,這就是所謂的工作竊取。
工作竊取發生的時候,它們會訪問同一個隊列,為了減少竊取任務線程和被竊取任務線程之間的競爭,通常任務會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
看一個Fork/Join框架應用的例子,計算1~n之間的和:1+2+3+…+n
設置一個分割閾值,任務大于閾值就拆分任務
任務有結果,所以需要繼承RecursiveTask
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 16; // 閾值 private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任務足夠小就計算任務 boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大于閾值,就分裂成兩個子任務計算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待子任務執行完,并得到其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一個計算任務,負責計算1+2+3+4 CountTask task = new CountTask(1, 100); // 執行一個任務 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } } }
ForkJoinTask與一般Task的主要區別在于它需要實現compute方法,在這個方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果比較大,就必須分割成兩個子任務,每個子任務在調用fork方法時,又會進compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務并返回結果。使用join方法會等待子任務執行完并得到其結果。
到此,相信大家對“Java并發知識點有哪些”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。