您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關怎么在Java多線程中實現一個通信模式,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
首先先介紹下官方的一個正式的介紹:
等待/通知機制,是指一個線程A調用了對象object的wait()方法進入等待狀態,而另一個線程B調用了對象object的notify或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而還行后續操作。
而我的理解是(舉例說明):
假設工廠里有兩條流水線,某個工作流程需要這兩個流水線配合完成,這兩個流水線分別是A和B,其中A負責準備各種配件,B負責租裝配件之后產出輸出到工作臺。B的工作需要A的配件準備充分,否則就會一直等待A準備好配件,并且A準備好配件后會通過一個開頭通知告訴B我已經準備好了,你那邊不用一直等待了,可以繼續執行任務了。流程A與流程B就是對應的線程A與線程B之間的通信,即可以理解為相互配合,具體也就是“”通知/等待“”機制!
那么,我們都知道超類Object有wait()方法與notify()/notifyAll()方法,在進行正式代碼舉例之前,應該先加深下對這三個方法的理解與一些細節(有一些細節確實容易被忽略)
調用wait()方法,會釋放鎖(這一點我想大部分人都知道),線程狀態由RUNNING->WAITNG,當前線程進入對象等待隊列中;
調用notify()/notifyAll()方法不會立馬釋放鎖(這一點我大家人也應該知道,但是什么時候釋放鎖呢?--------請看下一條),notify()方法是將等待隊列中的線程移到同步隊列中,而notifyAll()則是全部移到同步隊列中,被移出的線程狀態WAITING-->BLOCKED;
當前調用notify()/notifyAll()的線程釋放鎖了才算釋放鎖,才有機會喚醒wait線程返回(為什么有才有機會返回呢?------繼續看下一條)
從wait()返回的前提是必須獲得調用對象鎖,也就是說notify()與notifyAll()釋放鎖之后,wait()進入BLOCKED狀態,如果其他線程有競爭當前鎖的話,wait線程繼續爭取鎖資格(不好理解的話,請看下面的代碼舉例)
使用wait()、notify()、notifyAll()方法時需要先調對象加鎖(這可能是最容易忽視的點了,至于為什么,請先看了代碼之后,看本篇博文最后補充:wait()、notify()、notifyAll()加鎖的原因----防止線程即饑餓)
結合上述的“工廠流程裝配配件并產出的例子”,我們有兩個線程(流水線)WaitThread與NotifyThread、其中WaitThread是被通知的任務,完成主要的工作(組裝配件完成產品),需要時刻判斷標志位(開關);NotifyThread是需要通知的任務,需要對WaitThread進行“監督通知”,兩個配合才能更好完成產品的組裝并輸出。
public class WaitNotify { static Object lock = new Object(); static boolean flag = false; public static void main(String[] args) { new Thread(new WaitThread(), "WaitThread").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new NotifyThread(), "NotifyThread").start(); } /** * 流水線A,完成主要任務 */ static class WaitThread implements Runnable{ @Override public void run() { // 獲取object對象鎖 synchronized (lock){ // 條件不滿足時一直在等,等另外的線程改變該條件,并通知該wait線程 while (!flag){ try { System.out.println(Thread.currentThread() + " is waiting, flag is "+flag); // wait()方法調用就會釋放鎖,當前線程進入等待隊列。 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // TODO 條件已經滿足,不繼續while,完成任務 System.out.println(Thread.currentThread() + " is running, flag is "+flag); } } } /** * 流水線B,對開關進行控制,并通知流水線A */ static class NotifyThread implements Runnable{ @Override public void run() { // 獲取等wait線程同一個object對象鎖 synchronized (lock){ flag = true; // 通知wait線程,我已經改變了條件,你可以繼續返回執行了(返回之后繼續判斷while) // 但是此時通知notify()操作并立即不會釋放鎖,而是要等當前線程釋放鎖 // TODO 我準備好配件了,我需要通知全部的組裝流水線A..... lock.notifyAll(); System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag); } } } }
運行main函數,輸出:
Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true
車床流水工作開啟,流水線的開關一開始是關閉的(flag=false),流水線B(NotifyThread)去開啟后,開始自動喚醒流水線A(WaitThread),整個流水線開始工作了......
Thread[WaitThread,5,main] is waiting, flag is false: 一開始流水線A發現自己沒有配件可租裝,所以等流水線A準備好配件(這樣是不是覺得特別傻,哈哈哈,真正的流水線不會浪費時間等的,而且會有很多條流水線B準備配件的,這里只是舉例說明,望理解!);
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true:流水線B準備好了配件,開啟開關(flag=ture),并通知流水線A,讓流水線A開始工作;
Thread[WaitThread,5,main] is running, flag is true,流水線B收到了通知,再次檢查開關是否開啟了,開啟的話就開始返回繼續完成工作了。
其實結合上述我舉的例子還是很好理解的,下面是大概的一個粗略時序圖:
上述已經表達了這個注意的細節:從wait()返回的前提是必須獲得調用對象鎖,我們再增加能競爭lock的同步代碼塊(紅字部分)。
public class WaitNotify { static Object lock = new Object(); static boolean flag = false; public static void main(String[] args) { new Thread(new WaitThread(), "WaitThread").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new NotifyThread(), "NotifyThread").start(); } /** * 流水線A,完成主要任務 */ static class WaitThread implements Runnable{ @Override public void run() { // 獲取object對象鎖 synchronized (lock){ // 條件不滿足時一直在等,等另外的線程改變該條件,并通知該wait線程 while (!flag){ try { System.out.println(Thread.currentThread() + " is waiting, flag is "+flag); // wait()方法調用就會釋放鎖,當前線程進入等待隊列。 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // TODO 條件已經滿足,不繼續while,完成任務 System.out.println(Thread.currentThread() + " is running, flag is "+flag); } } } /** * 流水線B,對開關進行控制,并通知流水線A */ static class NotifyThread implements Runnable{ @Override public void run() { // 獲取等wait線程同一個object對象鎖 synchronized (lock){ flag = true; // 通知wait線程,我已經改變了條件,你可以繼續返回執行了(返回之后繼續判斷while) // 但是此時通知notify()操作并立即不會釋放鎖,而是要等當前線程釋放鎖 // TODO 我準備好配件了,我需要通知全部的組裝流水線A..... lock.notifyAll(); System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag); } // 模擬跟流水線B競爭 synchronized (lock){ System.out.println(Thread.currentThread() + " hold lock again"); } } } }
輸出結果:
Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[NotifyThread,5,main] hold lock again
Thread[WaitThread,5,main] is running, flag is true
其中第三條跟第四條順序可能會反著來的,這就是因為lock鎖可能被紅字部分的synchronized代碼塊競爭獲取(這樣wait()方法可能獲取不到lock鎖,不會返回),也可能被waitThread獲取從wait()方法返回。
Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true
Thread[NotifyThread,5,main] hold lock again
Thread.join()作用:當線程A等待thread線程終止之后才從thread.join()返回, 每個線程終止的前提是前驅線程終止,每個線程等待前驅線程終止后,才從join方法返回,這里涉及了等待/通知機制(等待前驅線程結束,接收前驅線程結束通知)。
Thread.join()源碼中,使用while選好判斷前驅線程是否活著,如果前驅線程還活著就一直wait等待,當然如果超時的話就直接返回。
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } // 這里的while(){wait(millis)} 就是利用等待/通知中的等待模式,只不過加上了超時設置 if (millis == 0) { // while循環,當線程還活著的時候就一直循環等待,直到線程終止 while (isAlive()) { // wait等待 wait(0); } // 條件滿足時返回 } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
線程池的本質是使用一個線程安全的工作隊列連接工作者線程和客戶端線程,客戶端線程將任務放入工作隊列后便返回,而工作者線程則不斷地從工作隊列中取出工作并執行。那么,在這里的等待/通知模式的應用就是:
工作隊列中線程job沒有的話也就是工作隊列為空的情況下,等待客戶端放入工作隊列線程任務,并通知工作線程繼續從工作隊列中獲取線程執行。
注:關于線程池的應用源碼這里不做介紹,因為一時也講不完(自己也還沒有完全消化),先簡單介紹下應用到的地方還有概念。
補充:其實數據庫的連接池也類似線程池這種工作流程,也會涉及等待/通知模式。
介紹了那么多應用,這種模式應該有個統一的范式來套用。對的,必然是有的:
對于等待者(也可以稱之為消費者):
synchronized (對象lock) { while (條件不滿足) { 對象.wait(); } // TODO 處理邏輯 }
對于通知者(也可以稱之為生產者):
synchronized (對象lock) { while (條件滿足) { 改變條件 對象.notify(); } }
注意:實際開發中最好采用的是超時等待/通知模式,在thread.join()源碼方法中完美體現
(1)其實根據wait()注意事項也能明白,wait()是釋放鎖的,那么不加鎖哪來釋放鎖!
(2)wait()與notify()或者notifyAll()必須是搭配一起使用的,否則線程調用object.wait()之后,沒有超時機制,也沒有調用notify()或者notifyAll()喚醒的話,就一直處于WAITING狀態,造成調用wait()的線程一直都是饑餓狀態。
(3)由于第2條的,我們已知:即便我們使用了notify()或者notifyAll()去喚醒線程,但是沒有在適當的時機喚醒(比如調用wait()之前就喚醒了),那么仍然調用wait()線程處于WAITING狀態,所以我們必須保證wait()方法要么不執行,要么就執行完在被喚醒。也就是下列代碼中1那里不能允許插入調用notify/notifyAll,自然而然就增加synchronized關鍵字,保證wait()操作整體執行不被破壞!
synchronized (對象lock) { while (條件不滿足) { // 1 這里如果先執行了notify/notifyAll方法,那么2執行之后,該線程就一直WAITING 對象.wait(); // 2 } // TODO 處理邏輯 }
用圖片展示執行順序就是:
(4)注意synchronized代碼塊中,代碼錯誤或者其它原因線程終止的話,沒有執行到wait()方法的話,是會自動釋放鎖的,不必擔心會死鎖。
關于怎么在Java多線程中實現一個通信模式就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。