中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java多線程同步器代碼詳解

發布時間:2020-08-30 20:02:00 來源:腳本之家 閱讀:263 作者:-Thinker 欄目:編程語言

同步器

為每種特定的同步問題提供了解決方案,同步器是一些使線程能夠等待另一個線程的對象,允許它們協調動作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier 和Exchanger

Semaphore

Semaphore【信號標;旗語】,通過計數器控制對共享資源的訪問。

測試類:

package concurrent;
import concurrent.thread.SemaphoreThread;
import java.util.concurrent.Semaphore;
/**
  * 拿客
  * www.coderknock.com
  * QQ群:213732117
  * 創建時間:2016年08月08日
  * 描述:
  */
public class SemaphoreTest {
	public static void main(String[] args) {
		//在Thread里聲明并不是同一個對象
		Semaphore semaphore = new Semaphore(3);
		SemaphoreThread testA = new SemaphoreThread("A", semaphore);
		SemaphoreThread testB = new SemaphoreThread("B", semaphore);
		SemaphoreThread testC = new SemaphoreThread("C", semaphore);
		SemaphoreThread testD = new SemaphoreThread("D", semaphore);
		SemaphoreThread testE = new SemaphoreThread("E", semaphore);
		SemaphoreThread testF = new SemaphoreThread("F", semaphore);
		SemaphoreThread testG = new SemaphoreThread("G", semaphore);
		testA.start();
		testB.start();
		testC.start();
		testD.start();
		testE.start();
		testF.start();
		testG.start();
	}
}

線程寫法:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Semaphore;
/**
  * 拿客
  * www.coderknock.com
  * QQ群:213732117
  * 創建時間:2016年08月08日
  * 描述:
  */
public class SemaphoreThread extends Thread {
	private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
	//創建有3個信號量的信號量計數器
	public Semaphore semaphore;
	public SemaphoreThread(String name, Semaphore semaphore) {
		setName(name);
		this.semaphore = semaphore;
	}
	@Override
	    public void run() {
		try {
			logger.debug(getName() + " 取號等待... " + System.currentTimeMillis());
			//取出一個信號
			semaphore.acquire();
			logger.debug(getName() + " 提供服務... " + System.currentTimeMillis());
			sleep(1000);
			logger.debug(getName() + " 完成服務... " + System.currentTimeMillis());
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug(getName() + " 釋放... " + System.currentTimeMillis());
		//釋放一個信號
		semaphore.release();
	}
}

執行結果【以下所有輸出結果中[]中為線程名稱- 后為輸出的內容】:

  [C] - C 取號等待... 1470642024037
  [F] - F 取號等待... 1470642024036
  [E] - E 取號等待... 1470642024036
  [B] - B 取號等待... 1470642024037
  [D] - D 取號等待... 1470642024037
  [A] - A 取號等待... 1470642023965
  [D] - D 提供服務... 1470642024039
  [C] - C 提供服務... 1470642024039
  [G] - G 取號等待... 1470642024036
  [F] - F 提供服務... 1470642024040
  [D] - D 完成服務... 1470642025039
  [C] - C 完成服務... 1470642025039
  [D] - D 釋放... 1470642025040
  [F] - F 完成服務... 1470642025040
  [C] - C 釋放... 1470642025041
  [B] - B 提供服務... 1470642025042
  [A] - A 提供服務... 1470642025042
  [F] - F 釋放... 1470642025043
  [E] - E 提供服務... 1470642025043
  [A] - A 完成服務... 1470642026043
  [B] - B 完成服務... 1470642026043
  [B] - B 釋放... 1470642026043
  [A] - A 釋放... 1470642026043
  [G] - G 提供服務... 1470642026044
  [E] - E 完成服務... 1470642026045
  [E] - E 釋放... 1470642026045
  [G] - G 完成服務... 1470642027045
  [G] - G 釋放... 1470642027046

可以看到,當3個信號量被領取完之后,之后的線程會阻塞在領取信號的位置,當有信號量釋放之后才會繼續執行。

CountDownLatch

CountDownLatch【倒計時鎖】,線程中調用countDownLatch.await()使進程進入阻塞狀態,當達成指定次數后(通過countDownLatch.countDown())繼續執行每個線程中剩余的內容。

一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

  用給定的計數 初始化 CountDownLatch。由于調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

測試類:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;

public class package concurrent;
import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 創建時間:2016年08月08日
 * 描述:
 */
public class CountDownLatchTest {
	private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);
	public static void main(String[] args) throws InterruptedException {
		//設定當達成三個計數時觸發
		CountDownLatch countDownLatch = new CountDownLatch(3);
		new CountDownLatchThread("A", countDownLatch).start();
		new CountDownLatchThread("B", countDownLatch).start();
		new CountDownLatchThread("C", countDownLatch).start();
		new CountDownLatchThread("D", countDownLatch).start();
		new CountDownLatchThread("E", countDownLatch).start();
		for (int i = 3; i > 0; i--) {
			Thread.sleep(1000);
			logger.debug(i);
			countDownLatch.countDown();
		}
	}
}

線程類:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchThread extends Thread {
	private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
	//計數器
	private CountDownLatch countDownLatch;
	public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
		setName(name);
		this.countDownLatch = countDownLatch;
	}
	@Override
	  public void run() {
		logger.debug("執行操作...");
		try {
			sleep(1000);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("等待計數器達到標準...");
		try {
			//讓線程進入阻塞狀態,等待計數達成后釋放
			countDownLatch.await();
			logger.debug("計數達成,繼續執行...");
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

執行結果:

 [E] - 執行操作...
 [B] - 執行操作...
 [A] - 執行操作...
 [C] - 執行操作...
 [D] - 執行操作...
 [main] DEBUG concurrent.CountDownLatchTest - 3
 [B] - 等待計數器達到標準...
 [E] - 等待計數器達到標準...
 [C] - 等待計數器達到標準...
 [D] - 等待計數器達到標準...
 [A] - 等待計數器達到標準...
 [main] DEBUG concurrent.CountDownLatchTest - 2
 [main] DEBUG concurrent.CountDownLatchTest - 1
 [E] - 計數達成,繼續執行...
 [C] - 計數達成,繼續執行...
 [B] - 計數達成,繼續執行...
 [D] - 計數達成,繼續執行...
 [A] - 計數達成,繼續執行...

CyclicBarrier

CyclicBarrier【Cyclic周期,循環的 Barrier屏障,障礙】循環的等待阻塞的線程個數到達指定數量后使參與計數的線程繼續執行并可執行特定線程(使用不同構造函數可以不設定到達后執行),其他線程仍處于阻塞等待再一次達成指定個數。

測試類:

package concurrent;
import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
	private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);
	public static void main(String[] args) {
		//可以使用CyclicBarrier(int parties)不設定到達后執行的內容
		CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
			logger.debug("---計數到達后執行的內容----");
		}
		);
		new CyclicBarrierThread("A", cyclicBarrier).start();
		new CyclicBarrierThread("B", cyclicBarrier).start();
		new CyclicBarrierThread("C", cyclicBarrier).start();
		new CyclicBarrierThread("D", cyclicBarrier).start();
		new CyclicBarrierThread("E", cyclicBarrier).start();
		new CyclicBarrierThread("A2", cyclicBarrier).start();
		new CyclicBarrierThread("B2", cyclicBarrier).start();
		new CyclicBarrierThread("C2", cyclicBarrier).start();
		new CyclicBarrierThread("D2", cyclicBarrier).start();
		new CyclicBarrierThread("E2", cyclicBarrier).start();
		//需要注意的是,如果線程數不是上面設置的等待數量的整數倍,比如這個程序中又加了個線程,
		// 那么當達到5個數量時,只會執行達到時的五個線程的內容,
		// 剩余一個線程會出于阻塞狀態導致主線程無法退出,程序無法結束
		// new CyclicBarrierThread("F", cyclicBarrier).start();//將這行注釋去掉程序無法自動結束
	}
}

線程類:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierThread extends Thread {
	private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);
	private CyclicBarrier cyclicBarrier;
	public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
		super(name);
		this.cyclicBarrier = cyclicBarrier;
	}
	@Override
	  public void run() {
		logger.debug("執行操作...");
		try {
			int time = new Random().nextint(10) * 1000;
			logger.debug("休眠" + time/1000 + "秒");
			sleep(time);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		logger.debug("等待計數器達到標準...");
		try {
			//讓線程進入阻塞狀態,等待計數達成后釋放
			cyclicBarrier.await();
			logger.debug("計數達成,繼續執行...");
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
		catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

執行結果:

 [A] - 執行操作...
 [A] - 休眠0秒
 [E2] - 執行操作...
 [E2] - 休眠5秒
 [D2] - 執行操作...
 [D2] - 休眠4秒
 [C2] - 執行操作...
 [C2] - 休眠4秒
 [B2] - 執行操作...
 [B2] - 休眠6秒
 [A2] - 執行操作...
 [A2] - 休眠8秒
 [E] - 執行操作...
 [E] - 休眠5秒
 [D] - 執行操作...
 [D] - 休眠0秒
 [C] - 執行操作...
 [C] - 休眠3秒
 [B] - 執行操作...
 [B] - 休眠7秒
 [A] - 等待計數器達到標準...
 [D] - 等待計數器達到標準...
 [C] - 等待計數器達到標準...
 [D2] - 等待計數器達到標準...
 [C2] - 等待計數器達到標準...
 [C2] DEBUG concurrent.CyclicBarrierTest - ---計數到達后執行的內容----
 [C2] - 計數達成,繼續執行...
 [A] - 計數達成,繼續執行...
 [C] - 計數達成,繼續執行...
 [D2] - 計數達成,繼續執行...
 [D] - 計數達成,繼續執行...
 [E2] - 等待計數器達到標準...
 [E] - 等待計數器達到標準...
 [B2] - 等待計數器達到標準...
 [B] - 等待計數器達到標準...
 [A2] - 等待計數器達到標準...
 [A2] DEBUG concurrent.CyclicBarrierTest - ---計數到達后執行的內容----
 [E] - 計數達成,繼續執行...
 [B2] - 計數達成,繼續執行...
 [E2] - 計數達成,繼續執行...
 [B] - 計數達成,繼續執行...
 [A2] - 計數達成,繼續執行...

可以想象成以前不正規的長途汽車站的模式:

不正規的長途汽車站會等待座位坐滿之后才發車,到達目的地之后繼續等待然后循環進行。每個人都是一個Thread,上車后觸發cyclicBarrier.await();,當坐滿時就是達到指定達成數的時候,車輛發車就是達成后統一執行的內容,發車后車上的人們就可以聊天之類的操作了【我們暫且理解為上車后人們就都不能動了O(∩_∩)O~】。

CountDownLatch與CyclicBarrier區別:

CountDownLatch是一個或多個線程等待計數達成后繼續執行,await()調用并沒有參與計數。

CyclicBarrier則是N個線程等待彼此執行到零界點之后再繼續執行,await()調用的同時參與了計數,并且CyclicBarrier支持條件達成后執行某個動作,而且這個過程是循環性的。

Exchanger

Exchanger 用于線程間進行數據交換

  可以在對中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange 方法,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue 的雙向形式。  Exchanger 可能在應用程序(比如遺傳算法和管道設計)中很有用。

  用法示例:以下是重點介紹的一個類,該類使用 Exchanger 在線程間交換緩沖區,因此,在需要時,填充緩沖區的線程獲取一個新騰空的緩沖區,并將填滿的緩沖區傳遞給騰空緩沖區的線程。 測試類:

package concurrent;
import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;
import java.util.HashMap;
import java.util.concurrent.Exchanger;

public class ExchangerTest {
	public static void main(String[] args) {
		Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>();
		new ExchangerThread("A", exchanger).start();
		new ExchangerThread("B", exchanger).start();
	}
}

實體類:

package concurrent.pojo;
import com.alibaba.fastjson.JSON;
import java.util.Date;
import java.util.List;

public class ExchangerPojo {
	private int intVal;
	private String strVal;
	private List<String> strList;
	private Date date;
	public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) {
		this.intVal = intVal;
		this.strVal = strVal;
		this.strList = strList;
		this.date = date;
	}
	public int getIntVal() {
		return intVal;
	}
	public void setIntVal(int intVal) {
		this.intVal = intVal;
	}
	public String getStrVal() {
		return strVal;
	}
	public void setStrVal(String strVal) {
		this.strVal = strVal;
	}
	public List<String> getStrList() {
		return strList;
	}
	public void setStrList(List<String> strList) {
		this.strList = strList;
	}
	public Date getDate() {
		return date;
	}
	public void setDate(Date date) {
		this.date = date;
	}
	@Override
	  public String toString() {
		return JSON.toJSONString(this);
	}
}

線程類:

package concurrent.thread;
import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.Exchanger;

public class ExchangerThread extends Thread {
	private Exchanger<HashMap<String, ExchangerPojo>> exchanger;
	private static final Logger logger = LogManager.getLogger(ExchangerThread.class);
	public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) {
		super(name);
		this.exchanger = exchanger;
	}
	@Override
	  public void run() {
		HashMap<String, ExchangerPojo> map = new HashMap<>();
		logger.debug(getName() + "提供者提供數據...");
		Random random = new Random();
		for (int i = 0; i < 3; i++) {
			int index = random.nextint(10);
			List<String> list = new ArrayList<>();
			for (int j = 0; j < index; j++) {
				list.add("list ---> " + j);
			}
			ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的數據", list, new Date());
			map.put("第" + i + "個數據", pojo);
		}
		try {
			int time = random.nextint(10);
			logger.debug(getName() + "等待" + time + "秒....");
			for (int i = time; i > 0; i--) {
				sleep(1000);
				logger.debug(getName() + "---->" + i);
			}
			//等待exchange是會進入阻塞狀態,可以在一個線程中與另一線程多次交互,此處就不寫多次了
			HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map);
			time = random.nextint(10);
			logger.debug(getName() + "接受到數據等待" + time + "秒....");
			for (int i = time; i > 0; i--) {
				sleep(1000);
				logger.debug(getName() + "---->" + i);
			}
			getMap.forEach((x, y) -> {
				logger.debug(x + " -----> " + y.toString());
			}
			);
		}
		catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

執行結果:

 [B] - B提供者提供數據...
 [A] - A提供者提供數據...
 [A] - A等待2秒....
 [B] - B等待0秒....
 [A] - A---->2
 [A] - A---->1
 [B] - B接受到數據等待1秒....
 [A] - A接受到數據等待4秒....
 [B] - B---->1
 [A] - A---->4
 [B] - 第0個數據 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的數據"}
 [B] - 第1個數據 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的數據"}
 [B] - 第2個數據 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的數據"}
 [A] - A---->3
 [A] - A---->2
 [A] - A---->1
 [A] - 第0個數據 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的數據"}
 [A] - 第1個數據 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數據"}
 [A] - 第2個數據 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數據"}

Phaser

Phaser個人感覺兼具了CountDownLatch與CyclicBarrier的功能,并提供了分階段的能力。

實現分階段的CyclicBarrier的功能

測試代碼:

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;

public class PhaserTest {
	private static final Logger logger = LogManager.getLogger(PhaserTest.class);
	public static void main(String[] args) {
		Phaser phaser = new Phaser() {
			/**此方法有2個作用:
       * 1、當每一個階段執行完畢,此方法會被自動調用,因此,重載此方法寫入的代碼會在每個階段執行完畢時執行,相當于CyclicBarrier的barrierAction。
       * 2、當此方法返回true時,意味著Phaser被終止,因此可以巧妙的設置此方法的返回值來終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當整個線程執行了4個階段后,程序終止。
       * */
			@Override
			      protected Boolean onAdvance(int phase, int registeredParties) {
				logger.debug("階段--->" + phase);
				logger.debug("注冊的線程數量--->" + registeredParties);
				return super.onAdvance(phase, registeredParties);
			}
		}
		;
		for (int i = 3; i > 0; i--) {
			new PhaserThread("第" + i + "個", phaser).start();
		}
	}
}

線程代碼:

package concurrent.thread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Random;
import java.util.concurrent.Phaser;

public class PhaserThread extends Thread {
	private Phaser phaser;
	private static final Logger logger = LogManager.getLogger(PhaserThread.class);
	public PhaserThread(String name, Phaser phaser) {
		super(name);
		this.phaser = phaser;
		//把當前線程注冊到Phaser
		this.phaser.register();
		logger.debug("name為" + name + "的線程注冊了" + this.phaser.getRegisteredParties() + "個線程");
	}
	@Override
	  public void run() {
		logger.debug("進入...");
		phaser.arrive();
		for (int i = 6; i > 0; i--) {
			int time = new Random().nextint(5);
			try {
				logger.debug("睡眠" + time + "秒");
				sleep(time * 1000);
				if (i == 1) {
					logger.debug("未完成的線程數量:" + phaser.getUnarrivedParties());
					logger.debug("最后一次觸發,并注銷自身");
					phaser.arriveAndDeregister();
					logger.debug("未完成的線程數量:" + phaser.getUnarrivedParties());
				} else {
					logger.debug("未完成的線程數量:" + phaser.getUnarrivedParties());
					logger.debug(i + "--->觸發并阻塞...");
					phaser.arriveAndAwaitAdvance();
					//相當于CyclicBarrier.await();
					logger.debug("未完成的線程數量:" + phaser.getUnarrivedParties());
				}
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		logger.debug("注銷完成之后注冊的線程數量--->" + phaser.getRegisteredParties());
	}
}

執行結果:

 [main] - name為第3個的線程注冊了1個線程
 [main] - name為第2個的線程注冊了2個線程
 [main] - name為第1個的線程注冊了3個線程
 [第3個] - 進入...
 [第2個] - 進入...
 [第3個] - 睡眠2秒
 [第2個] - 睡眠1秒
 [第1個] - 進入...
 [第1個] - 階段--->0
 [第1個] - 注冊的線程數量--->3
 [第1個] - 睡眠4秒
 [第2個] - 未完成的線程數量:3
 [第2個] - 6--->觸發并阻塞...
 [第3個] - 未完成的線程數量:2
 [第3個] - 6--->觸發并阻塞...
 [第1個] - 未完成的線程數量:1
 [第1個] - 6--->觸發并阻塞...
 [第1個] - 階段--->1
 [第1個] - 注冊的線程數量--->3
 [第1個] - 未完成的線程數量:3
 [第3個] - 未完成的線程數量:3
 [第2個] - 未完成的線程數量:3
 [第1個] - 睡眠1秒
 [第3個] - 睡眠0秒
 [第2個] - 睡眠4秒
 [第3個] - 未完成的線程數量:3
 [第3個] - 5--->觸發并阻塞...
 [第1個] - 未完成的線程數量:2
 [第1個] - 5--->觸發并阻塞...
 [第2個] - 未完成的線程數量:1
 [第2個] - 5--->觸發并阻塞...
 [第2個] - 階段--->2
 [第2個] - 注冊的線程數量--->3
 [第2個] - 未完成的線程數量:3
 [第3個] - 未完成的線程數量:3
 [第1個] - 未完成的線程數量:3
 [第2個] - 睡眠0秒
 [第3個] - 睡眠2秒
 [第2個] - 未完成的線程數量:3
 [第1個] - 睡眠2秒
 [第2個] - 4--->觸發并阻塞...
 [第3個] - 未完成的線程數量:2
 [第1個] - 未完成的線程數量:2
 [第3個] - 4--->觸發并阻塞...
 [第1個] - 4--->觸發并阻塞...
 [第1個] - 階段--->3
 [第1個] - 注冊的線程數量--->3
 [第1個] - 未完成的線程數量:3
 [第3個] - 未完成的線程數量:3
 [第2個] - 未完成的線程數量:3
 [第1個] - 睡眠2秒
 [第3個] - 睡眠1秒
 [第2個] - 睡眠4秒
 [第3個] - 未完成的線程數量:3
 [第3個] - 3--->觸發并阻塞...
 [第1個] - 未完成的線程數量:2
 [第1個] - 3--->觸發并阻塞...
 [第2個] - 未完成的線程數量:1
 [第2個] - 3--->觸發并阻塞...
 [第2個] - 階段--->4
 [第2個] - 注冊的線程數量--->3
 [第2個] - 未完成的線程數量:3
 [第3個] - 未完成的線程數量:3
 [第1個] - 未完成的線程數量:3
 [第2個] - 睡眠2秒
 [第1個] - 睡眠2秒
 [第3個] - 睡眠4秒
 [第2個] - 未完成的線程數量:3
 [第1個] - 未完成的線程數量:3
 [第2個] - 2--->觸發并阻塞...
 [第1個] - 2--->觸發并阻塞...
 [第3個] - 未完成的線程數量:1
 [第3個] - 2--->觸發并阻塞...
 [第3個] - 階段--->5
 [第3個] - 注冊的線程數量--->3
 [第3個] - 未完成的線程數量:3
 [第1個] - 未完成的線程數量:3
 [第2個] - 未完成的線程數量:3
 [第3個] - 睡眠2秒
 [第1個] - 睡眠3秒
 [第2個] - 睡眠0秒
 [第2個] - 未完成的線程數量:3
 [第2個] - 最后一次觸發,并注銷自身
 [第2個] - 未完成的線程數量:2
 [第2個] - 注銷完成之后注冊的線程數量--->2
 [第3個] - 未完成的線程數量:2
 [第3個] - 最后一次觸發,并注銷自身
 [第3個] - 未完成的線程數量:1
 [第3個] - 注銷完成之后注冊的線程數量--->1
 [第1個] - 未完成的線程數量:1
 [第1個] - 最后一次觸發,并注銷自身
 [第1個] - 階段--->6
 [第1個] - 注冊的線程數量--->0
 [第1個] - 未完成的線程數量:0
 [第1個] - 注銷完成之后注冊的線程數量--->0

上面代碼中,當所有線程進行到arriveAndAwaitAdvance()時會觸發計數并且將線程阻塞,等計數數量等于注冊線程數量【即所有線程都執行到了約定的地方時,會放行,是所有線程得以繼續執行,并觸發onAction事件】。我們可以在onAction中根據不同階段執行不同內容的操作。

實現分階段的CountDownLatch的功能

只需將上面的測試類更改如下:

package concurrent;
import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Phaser;
import static jodd.util.ThreadUtil.sleep;

public class PhaserTest {
	private static final Logger logger = LogManager.getLogger(PhaserTest.class);
	public static void main(String[] args) {
		//這里其實相當于已經注冊了3個線程,但是并沒有實際的線程
		int coutNum=3;
		Phaser phaser = new Phaser(coutNum) {
			/**此方法有2個作用:
       * 1、當每一個階段執行完畢,此方法會被自動調用,因此,重載此方法寫入的代碼會在每個階段執行完畢時執行,相當于CyclicBarrier的barrierAction。
       * 2、當此方法返回true時,意味著Phaser被終止,因此可以巧妙的設置此方法的返回值來終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當整個線程執行了4個階段后,程序終止。
       * */
			@Override
			      protected Boolean onAdvance(int phase, int registeredParties) {
				logger.debug("階段--->" + phase);
				logger.debug("注冊的線程數量--->" + registeredParties);
				return registeredParties==coutNum;
				//當后只剩下coutNum個線程時說明所有真實的注冊的線程已經運行完成,測試可以終止Phaser
			}
		}
		;
		for (int i = 3; i > 0; i--) {
			new PhaserThread("第" + i + "個", phaser).start();
		}
		//當phaser未終止時循環注冊這塊兒可以使用實際的業務處理
		while (!phaser.isTerminated()) {
			sleep(1000);
			logger.debug("觸發一次");
			phaser.arrive();
			//相當于countDownLatch.countDown();
		}
	}
}

 總結

以上就是本文關于Java多線程同步器代碼詳解的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站:

Java多線程中斷機制三種方法及示例

淺談Java多線程處理中Future的妙用(附源碼)

Java通過賣票理解多線程

如有不足之處,歡迎留言指出。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

额济纳旗| 富源县| 旺苍县| 科技| 临沧市| 茌平县| 射阳县| 康平县| 蓬安县| 南江县| 子洲县| 和政县| 印江| 黎平县| 江川县| 重庆市| 曲沃县| 太仆寺旗| 德安县| 乐清市| 平山县| 榆社县| 永宁县| 岗巴县| 和平县| 舞阳县| 二连浩特市| 三江| 喜德县| 寿阳县| 长阳| 桃园县| 诸城市| 营山县| 兰考县| 阿城市| 偏关县| 祁东县| 永泰县| 虎林市| 饶河县|