您好,登錄后才能下訂單哦!
Java中怎樣實現線程同步,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
但其并發編程的根本,就是使線程間進行正確的通信。其中兩個比較重要的關鍵點,如下:
線程通信:重點關注線程同步的幾種方式; 正確通信:重點關注是否有線程安全問題; Java中提供了很多線程同步操作,比如:synchronized關鍵字、wait/notifyAll、ReentrantLock、Condition、一些并發包下的工具類、Semaphore,ThreadLocal、AbstractQueuedSynchronizer等。本文主要說明一下這幾種同步方式的使用及優劣。
自JDK5開始,新增了Lock接口以及它的一個實現類ReentrantLock。ReentrantLock可重入鎖是J.U.C包內置的一個鎖對象,可以用來實現同步,基本使用方法如下:
`public class ReentrantLockTest {
private ReentrantLock lock = new ReentrantLock(); public void execute() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " do something synchronize"); try { Thread.sleep(5000l); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } finally { lock.unlock(); } } public static void main(String[] args) { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); Thread thread1 = new Thread(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { reentrantLockTest.execute(); } }); Thread thread2 = new Thread(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { reentrantLockTest.execute(); } }); thread1.start(); thread2.start(); }
}` 上面例子表示 同一時間段只能有1個線程執行execute方法,輸出如下:
Thread-0 do something synchronize // 隔了5秒鐘 輸入下面 Thread-1 do something synchronize
可重入鎖中可重入表示的意義在于 對于同一個線程,可以繼續調用加鎖的方法,而不會被掛起。可重入鎖內部維護一個計數器,對于同一個線程調用lock方法,計數器+1,調用unlock方法,計數器-1。
舉個例子再次說明一下可重入的意思:在一個加鎖方法execute中調用另外一個加鎖方法anotherLock并不會被掛起,可以直接調用(調用execute方法時計數器+1,然后內部又調用了anotherLock方法,計數器+1,變成了2):
`public void execute() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " do something synchronize"); try { anotherLock(); Thread.sleep(5000l); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } finally { lock.unlock(); } } public void anotherLock() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " invoke anotherLock"); } finally { lock.unlock(); } }` 輸出: `Thread-0 do something synchronize Thread-0 invoke anotherLock // 隔了5秒鐘 輸入下面 Thread-1 do something synchronize Thread-1 invoke anotherLock`
synchronized跟ReentrantLock一樣,也支持可重入鎖。但是它是 一個關鍵字,是一種語法級別的同步方式,稱為內置鎖:
`public class SynchronizedKeyWordTest {
public synchronized void execute() { System.out.println(Thread.currentThread().getName() + " do something synchronize"); try { anotherLock(); Thread.sleep(5000l); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } public synchronized void anotherLock() { System.out.println(Thread.currentThread().getName() + " invoke anotherLock"); } public static void main(String[] args) { SynchronizedKeyWordTest reentrantLockTest = new SynchronizedKeyWordTest(); Thread thread1 = new Thread(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { reentrantLockTest.execute(); } }); Thread thread2 = new Thread(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { reentrantLockTest.execute(); } }); thread1.start(); thread2.start(); }
}` 輸出結果跟ReentrantLock一樣,這個例子說明內置鎖可以作用在方法上。synchronized關鍵字也可以修飾靜態方法,此時如果調用該靜態方法,將會鎖住整個類。
同步是一種高開銷的操作,因此應該盡量減少同步的內容。通常沒有必要同步整個方法,使用synchronized代碼塊同步關鍵代碼即可。
synchronized跟ReentrantLock相比,有幾點局限性:
加鎖的時候不能設置超時。ReentrantLock有提供tryLock方法,可以設置超時時間,如果超過了這個時間并且沒有獲取到鎖,就會放棄,而synchronized卻沒有這種功能; ReentrantLock可以使用多個Condition,而synchronized卻只能有1個 不能中斷一個試圖獲得鎖的線程; ReentrantLock可以選擇公平鎖和非公平鎖; ReentrantLock可以獲得正在等待線程的個數,計數器等; 所以,Lock的操作與synchronized相比,靈活性更高,而且Lock提供多種方式獲取鎖,有Lock、ReadWriteLock接口,以及實現這兩個接口的ReentrantLock類、ReentrantReadWriteLock類。
關于Lock對象和synchronized關鍵字選擇的考量:
最好兩個都不用,使用一種java.util.concurrent包提供的機制,能夠幫助用戶處理所有與鎖相關的代碼。 如果synchronized關鍵字能滿足用戶的需求,就用synchronized,因為它能簡化代碼。 如果需要更高級的功能,就用ReentrantLock類,此時要注意及時釋放鎖,否則會出現死鎖,通常在finally代碼釋放鎖。 在性能考量上來說,如果競爭資源不激烈,兩者的性能是差不多的,而當競爭資源非常激烈時(即有大量線程同時競爭),此時Lock的性能要遠遠優于synchronized。所以說,在具體使用時要根據適當情況選擇。
Condition條件對象的意義在于 對于一個已經獲取Lock鎖的線程,如果還需要等待其他條件才能繼續執行的情況下,才會使用Condition條件對象。
Condition可以替代傳統的線程間通信,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。
為什么方法名不直接叫wait()/notify()/nofityAll()?因為Object的這幾個方法是final的,不可重寫!
public class ConditionTest {
public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Thread thread1 = new Thread(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " wait for condition"); try { condition.await(); System.out.println(Thread.currentThread().getName() + " continue"); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } finally { lock.unlock(); } } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " sleep 5 secs"); try { Thread.sleep(5000l); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } condition.signalAll(); } finally { lock.unlock(); } } }); thread1.start(); thread2.start(); }
} 這個例子中thread1執行到condition.await()時,當前線程會被掛起,直到thread2調用了condition.signalAll()方法之后,thread1才會重新被激活執行。
這里需要注意的是thread1調用Condition的await方法之后,thread1線程釋放鎖,然后馬上加入到Condition的等待隊列,由于thread1釋放了鎖,thread2獲得鎖并執行,thread2執行signalAll方法之后,Condition中的等待隊列thread1被取出并加入到AQS中,接下來thread2執行完畢之后釋放鎖,由于thread1已經在AQS的等待隊列中,所以thread1被喚醒,繼續執行。
傳統線程的通信方式,Condition都可以實現。Condition的強大之處在于它可以為多個線程間建立不同的Condition。
注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。
Java線程的狀態轉換圖與相關方法,如下:
在圖中,紅框標識的部分方法,可以認為已過時,不再使用。上圖中的方法能夠參與到線程同步中的方法,如下:
wait/notifyAll方式跟ReentrantLock/Condition方式的原理是一樣的。 Java中每個對象都擁有一個內置鎖,在內置鎖中調用wait,notify方法相當于調用鎖的Condition條件對象的await和signalAll方法。
public class WaitNotifyAllTest {
public synchronized void doWait() { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " wait for condition"); try { this.wait(); System.out.println(Thread.currentThread().getName() + " continue"); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } public synchronized void doNotify() { try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " sleep 5 secs"); Thread.sleep(5000l); this.notifyAll(); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } public static void main(String[] args) { WaitNotifyAllTest waitNotifyAllTest = new WaitNotifyAllTest(); Thread thread1 = new Thread(new Runnable() { @Override public void run() { waitNotifyAllTest.doWait(); } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { waitNotifyAllTest.doNotify(); } }); thread1.start(); thread2.start(); }
} 這里需要注意的是 調用wait/notifyAll方法的時候一定要獲得當前線程的鎖,否則會發生IllegalMonitorStateException異常。
線程中調用了wait方法,則進入阻塞狀態,只有等另一個線程調用與wait同一個對象的notify方法。這里有個特殊的地方,調用wait或者notify,前提是需要獲取鎖,也就是說,需要在同步塊中做以上操作。
該方法主要作用是在該線程中的run方法結束后,才往下執行。
`package com.thread.simple; public class ThreadJoin { public static void main(String[] args) { Thread thread= new Thread(new Runnable() { @Override public void run() { System.err.println("線程"+Thread.currentThread().getId()+" 打印信息"); } }); thread.start();`
try { thread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.err.println("主線程打印信息"); }
}
線程本身的調度方法,使用時線程可以在run方法執行完畢時,調用該方法,告知線程已可以出讓CPU資源。
public class Test1 { public static void main(String[] args) throws InterruptedException { new MyThread("低級", 1).start(); new MyThread("中級", 5).start(); new MyThread("高級", 10).start(); } } class MyThread extends Thread { public MyThread(String name, int pro) { super(name);// 設置線程的名稱 this.setPriority(pro);// 設置優先級 } @Override public void run() { for (int i = 0; i < 30; i++) { System.out.println(this.getName() + "線程第" + i + "次執行!"); if (i % 5 == 0) Thread.yield(); } } }
5.sleep方法: 通過sleep(millis)使線程進入休眠一段時間,該方法在指定的時間內無法被喚醒,同時也不會釋放對象鎖;
/**
可以明顯看到打印的數字在時間上有些許的間隔 */ public class Test1 {
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<100;i++){
System.out.println("main"+i);
Thread.sleep(100);
}
}
} sleep方法告訴操作系統 至少在指定時間內不需為線程調度器為該線程分配執行時間片,并不釋放鎖(如果當前已經持有鎖)。實際上,調用sleep方法時并不要求持有任何鎖。
所以,sleep方法并不需要持有任何形式的鎖,也就不需要包裹在synchronized中。
ThreadLocal是一種把變量放到線程本地的方式來實現線程同步的。比如:SimpleDateFormat不是一個線程安全的類,可以使用ThreadLocal實現同步,如下:
`public class ThreadLocalTest {
private static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } }; public static void main(String[] args) { Thread thread1 = new Thread(new Runnable() { @Override public void run() { Date date = new Date(); System.out.println(dateFormatThreadLocal.get().format(date)); } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { Date date = new Date(); System.out.println(dateFormatThreadLocal.get().format(date)); } }); thread1.start(); thread2.start(); }
}` ThreadLocal與同步機制的對比選擇:
ThreadLocal與同步機制都是 為了解決多線程中相同變量的訪問沖突問題。 前者采用以 "空間換時間" 的方法,后者采用以 "時間換空間" 的方式。
volatile關鍵字為域變量的訪問提供了一種免鎖機制,使用volatile修飾域相當于告訴虛擬機該域可能會被其他線程更新,因此每次使用該域就要重新計算,而不是使用寄存器中的值,volatile不會提供任何原子操作,它也不能用來修飾final類型的變量。
//只給出要修改的代碼,其余代碼與上同 public class Bank { //需要同步的變量加上volatile private volatile int account = 100; public int getAccount() { return account; } //這里不再需要synchronized public void save(int money) { account += money; } }
多線程中的非同步問題主要出現在對域的讀寫上,如果讓域自身避免這個問題,則就不需要修改操作該域的方法。用final域,有鎖保護的域和volatile域可以避免非同步的問題。
Semaphore信號量被用于控制特定資源在同一個時間被訪問的個數。類似連接池的概念,保證資源可以被合理的使用。可以使用構造器初始化資源個數:
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(2); public static void main(String[] args) { for(int i = 0; i < 5; i ++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " " + new Date()); Thread.sleep(5000l); semaphore.release(); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); } } }).start(); } }
} 輸出:
Thread-1 Mon Apr 18 18:03:46 CST 2016 Thread-0 Mon Apr 18 18:03:46 CST 2016 Thread-3 Mon Apr 18 18:03:51 CST 2016 Thread-2 Mon Apr 18 18:03:51 CST 2016 Thread-4 Mon Apr 18 18:03:56 CST 2016
CountDownLatch是一個計數器,它的構造方法中需要設置一個數值,用來設定計數的次數。每次調用countDown()方法之后,這個計數器都會減去1,CountDownLatch會一直阻塞著調用await()方法的線程,直到計數器的值變為0。
public class CountDownLatchTest {
public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(5); for(int i = 0; i < 5; i ++) { new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " " + new Date() + " run"); try { Thread.sleep(5000l); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); } }).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all thread over"); }
} 輸出:
Thread-2 Mon Apr 18 18:18:30 CST 2016 run Thread-3 Mon Apr 18 18:18:30 CST 2016 run Thread-4 Mon Apr 18 18:18:30 CST 2016 run Thread-0 Mon Apr 18 18:18:30 CST 2016 run Thread-1 Mon Apr 18 18:18:30 CST 2016 run all thread over
CyclicBarrier阻塞調用的線程,直到條件滿足時,阻塞的線程同時被打開。
調用await()方法的時候,這個線程就會被阻塞,當調用await()的線程數量到達屏障數的時候,主線程就會取消所有被阻塞線程的狀態。
在CyclicBarrier的構造方法中,還可以設置一個barrierAction。在所有的屏障都到達之后,會啟動一個線程來運行這里面的代碼。
public class CyclicBarrierTest {
public static void main(String[] args) { Random random = new Random(); CyclicBarrier cyclicBarrier = new CyclicBarrier(5); for(int i = 0; i < 5; i ++) { new Thread(new Runnable() { @Override public void run() { int secs = random.nextInt(5); System.out.println(Thread.currentThread().getName() + " " + new Date() + " run, sleep " + secs + " secs"); try { Thread.sleep(secs * 1000); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + new Date() + " runs over"); } }).start(); } }
} 相比CountDownLatch,CyclicBarrier是可以被循環使用的,而且遇到線程中斷等情況時,還可以利用reset()方法,重置計數器,從這些方面來說,CyclicBarrier會比CountDownLatch更加靈活一些。
有時需要使用線程同步的根本原因在于 對普通變量的操作不是原子的。那么什么是原子操作呢?
原子操作就是指將讀取變量值、修改變量值、保存變量值看成一個整體來操作 即-這幾種行為要么同時完成,要么都不完成。
在java.util.concurrent.atomic包中提供了創建原子類型變量的工具類,使用該類可以簡化線程同步。比如:其中AtomicInteger以原子方式更新int的值:
class Bank { private AtomicInteger account = new AtomicInteger(100);
public AtomicInteger getAccount() { return account; } public void save(int money) { account.addAndGet(money); }
}
AQS是很多同步工具類的基礎,比如:ReentrantLock里的公平鎖和非公平鎖,Semaphore里的公平鎖和非公平鎖,CountDownLatch里的鎖等他們的底層都是使用AbstractQueuedSynchronizer完成的。
基于AbstractQueuedSynchronizer自定義實現一個獨占鎖:
public class MySynchronizer extends AbstractQueuedSynchronizer {
@Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { setState(0); setExclusiveOwnerThread(null); return true; } public void lock() { acquire(1); } public void unlock() { release(1); } public static void main(String[] args) { MySynchronizer mySynchronizer = new MySynchronizer(); Thread thread1 = new Thread(new Runnable() { @Override public void run() { mySynchronizer.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); System.out.println(Thread.currentThread().getName() + " will sleep 5 secs"); try { Thread.sleep(5000l); System.out.println(Thread.currentThread().getName() + " continue"); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " interrupted"); Thread.currentThread().interrupt(); } } finally { mySynchronizer.unlock(); } } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { mySynchronizer.lock(); try { System.out.println(Thread.currentThread().getName() + " run"); } finally { mySynchronizer.unlock(); } } }); thread1.start(); thread2.start(); }
}
前面幾種同步方式都是基于底層實現的線程同步,但是在實際開發當中,應當盡量遠離底層結構。本節主要是使用LinkedBlockingQueue來實現線程的同步。
LinkedBlockingQueue是一個基于鏈表的隊列,先進先出的順序(FIFO),范圍任意的blocking queue。
package com.xhj.thread;
import java.util.Random; import java.util.concurrent.LinkedBlockingQueue;
/**
用阻塞隊列實現線程同步 LinkedBlockingQueue的使用 / public class BlockingSynchronizedThread { /*
定義一個阻塞隊列用來存儲生產出來的商品 / private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); /*
定義生產商品個數 / private static final int size = 10; /*
定義啟動線程的標志,為0時,啟動生產商品的線程;為1時,啟動消費商品的線程 */
package com.xhj.thread; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; /** * 用阻塞隊列實現線程同步 LinkedBlockingQueue的使用 */ public class BlockingSynchronizedThread { /** * 定義一個阻塞隊列用來存儲生產出來的商品 */ private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); /** * 定義生產商品個數 */ private static final int size = 10; /** * 定義啟動線程的標志,為0時,啟動生產商品的線程;為1時,啟動消費商品的線程 */ private int flag = 0; private class LinkBlockThread implements Runnable { @Override public void run() { int new_flag = flag++; System.out.println("啟動線程 " + new_flag); if (new_flag == 0) { for (int i = 0; i < size; i++) { int b = new Random().nextInt(255); System.out.println("生產商品:" + b + "號"); try { queue.put(b); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("倉庫中還有商品:" + queue.size() + "個"); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } else { for (int i = 0; i < size / 2; i++) { try { int n = queue.take(); System.out.println("消費者買去了" + n + "號商品"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("倉庫中還有商品:" + queue.size() + "個"); try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception } } } } } public static void main(String[] args) { BlockingSynchronizedThread bst = new BlockingSynchronizedThread(); LinkBlockThread lbt = bst.new LinkBlockThread(); Thread thread1 = new Thread(lbt); Thread thread2 = new Thread(lbt); thread1.start(); thread2.start(); } }
看完上述內容,你們掌握Java中怎樣實現線程同步的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。