您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java編程生產者消費者實現的方法有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Java編程生產者消費者實現的方法有哪些”吧!
實現生產者消費者的四種方式
一、最基礎的
二、java.util.concurrent.lock 中的 Lock 框架
三、阻塞隊列BlockingQueue的實現
Blockqueue 接口的一些方法
四、信號量 Semaphore 的實現
利用 wait() 和 notify() 方法實現,當緩沖區滿或為空時都調用 wait() 方法等待,當生產者生產了一個產品或消費者消費了一個產品后會喚醒所有線程;
package com.practice; public class testMain { private static Integer count = 0; private static final Integer FULL = 10; private static String LOCK = "lock"; public static void main(String[] args) { testMain testMain = new testMain(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } synchronized (LOCK){ while(count == FULL){//緩存空間滿了 try{ LOCK.wait();//線程阻塞 }catch (Exception e){ e.printStackTrace(); } } count++;//生產者 System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有"+count); LOCK.notifyAll();//喚醒所有線程 } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } synchronized (LOCK){ while(count == 0){ try{ LOCK.wait(); }catch (Exception e){ } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有 "+count); LOCK.notifyAll();//喚醒所有線程 } } } } }
通過對 lock 的 lock() 方法和 unlock() 方法實現對鎖的顯示控制,而 synchronize()
則是對鎖的隱形控制,可重入鎖也叫做遞歸鎖,指的是同一個線程外層函數獲得鎖之后,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響;
簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個線程再次得到鎖,那么獲計數器就加1,函數調用結束計數器就減1,然后鎖需要釋放兩次才能獲得真正釋放,已經獲取鎖的線程進入其他需要相同鎖的同步代碼塊不會被阻塞
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private static Integer count = 0; private static Integer FULL = 10; //創建一個鎖對象 private Lock lock = new ReentrantLock(); //創建兩個條件變量,一個為緩沖非滿,一個緩沖區非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args){ ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } // 獲取鎖 lock.lock(); try { while (count == FULL) { try{ notFull.await(); }catch(InterruptedException e){ e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count); }finally { lock.unlock(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); } catch (Exception e){ e.printStackTrace(); } lock.lock(); try{ while(count==0){ try{ notEmpty.await(); }catch (InterruptedException e){ e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有 " + count); }finally { lock.unlock();//解鎖 } } } } }
被阻塞的情況主要分為如下兩種,BlockingQueue 是線程安全的
1,當隊列滿了的時候進行入隊操作;
2,當隊列空的時候進行出隊操作
四類方法分別對應于:
1,ThrowsException,如果操作不能馬上進行,則拋出異常;
2,SpecialValue 如果操作不能馬上進行,將會返回一個特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定時間未執行返回一個特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 實現生產者消費模型 */ public class BlockQueueTest { public static Integer count = 0; //創建一個阻塞隊列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } try{ blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有 " + count); }catch (InterruptedException e){ e.printStackTrace(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ blockingQueue.take();//消費 count--; System.out.println(Thread.currentThread().getName() + " 消費者消費,目前總共有 "+ count); }catch (InterruptedException e){ e.printStackTrace(); } } } } }
Semaphore (信號量) 用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Java中的 Semaphone 維護了一個許可集,一開始設定這個許可集的數量,使用 acquire()
方法獲得一個許可,當許可不足時會被阻塞,release()
添加一個許可。
下面代碼中,還加入了 mutex
信號量,維護消費者和生產者之間的同步關系,保證生產者消費者之間的交替進行
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Integer count = 0; //創建三個信號量 final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1);//互斥鎖,控制共享數據的互斥訪問 public static void main(String[] args) { SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ notFull.acquire();//獲取一個信號量 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有 "+count); } catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release();//添加 notEmpty.release(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch(InterruptedException e){ e.printStackTrace(); } try{ notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有"+count); }catch (InterruptedException e){ e.printStackTrace(); }finally { mutex.release(); notFull.release(); } } } } }
感謝各位的閱讀,以上就是“Java編程生產者消費者實現的方法有哪些”的內容了,經過本文的學習后,相信大家對Java編程生產者消費者實現的方法有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。