中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中怎么使用BlockingQueue實現并發

發布時間:2021-08-03 15:46:55 來源:億速云 閱讀:126 作者:Leah 欄目:編程語言

Java中怎么使用BlockingQueue實現并發,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。


1 概述

阻塞隊列(BlockingQueue)是一個支持兩種附加操作的隊列。支持附加阻塞的插入和移除操作。

  • 支持阻塞的插入:當隊列滿時,插入操作會被阻塞,直到隊列不滿。

  • 支持阻塞的移除:當隊列空時,移除操作會被阻塞,直到隊列不空。

阻塞隊列不可用時,操作處理方式

方法\處理方式拋出異常返回特殊值一直阻塞超時退出
插入方法add(e)offer(e)put(e)offer(e, time, unit)
移除方法remove()poll()take()poll(time, unit)
檢查方法element()peek()
  • 拋出異常:隊列滿時,若繼續插入元素會拋出IllegalStateException;當隊列為空時,若獲取元素則會拋出NoSuchElementException異常。

  • 返回特殊值:向隊列插入元素時,會返回是否插入成功true/false;獲取元素時,成功則返回元素,失敗則返回null。

  • 一直阻塞:當阻塞隊列滿時,若繼續使用put新增元素時會被阻塞,直到隊列不為空或者響應中斷退出;當阻塞隊列為空時,繼續使用take獲取元素時會被阻塞,直到隊列不為空。

  • 超時退出:當阻塞隊列滿時,使用offer(e, time, unit)新增元素會被阻塞至超時退出;當隊列為空時,使用poll(time, unit)獲取元素時會被阻塞至超時退出。

注意:

  • 阻塞隊列中不允許插入null,會拋出NPE異常。

  • 可以訪問阻塞隊列中的任意元素,調用remove(Object o)可以將隊列之中的特定對象移除,但會遍歷全部元素,并不高效。

2 阻塞隊列的實現

2.1 ArrayBlockingQueue

由數組構成的有界阻塞隊列,內部由數組final Object[] items實現。默認情況下不保證線程公平的訪問隊列,所謂公平訪問隊列指阻塞的線程,可以按照阻塞的先后順序訪問隊列。

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);  // 使用公平鎖/非公平鎖
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

隊列大小初始化后不可修改。參數fair控制內部ReentrantLock是否采用公平鎖。

2.2 LinkedBlockingQueue

鏈表實現的有界阻塞隊列。內部結構是單鏈表。默認大小為Integer.MAX_VALUE,可以指定大小。

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  // 指定隊列大小
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

// 單鏈表節點Node
static class Node<E> {
  E item;
  Node<E> next;
  Node(E x) { item = x; }
}
2.3 PriorityBlockingQueue

支持優先級的無界阻塞隊列。默認情況下采取自然順序升序排列。也可以自定義compareTo()方法來指定元素的排列順序,或者初始化隊列時,指定構造參數Comparator來對元素進行排序。同優先級順序無法保證。

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();  // 非公平鎖
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}


// offer方法部分代碼
Comparator<? super E> cmp = comparator;
if (cmp == null)
  siftUpComparable(n, e, array);
else
  siftUpUsingComparator(n, e, array, cmp);

由offer代碼可以看出,Comparator的優先級是大于Comparable.compareTo方法的。

注意:PriorityBlockingQueue不會阻塞數據生產者(隊列無界),只會在沒有數據時阻塞消費者。生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則將有可能耗盡堆空間。

2.4 DelayQueue

支持延時獲取元素的無界隊列。隊列使用PriorityQueue實現。隊列中的元素必須實現java.util.concurrent.Delayed接口,在創建元素時指定多久才能才能從隊列中取到元素。

DelayQueue非常有用,可以將DelayQueu應用在以下應用場景。

  • 緩存系統的設計:用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能獲取到元素時,表示緩存有限期到了。

  • 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行。比如TimerQueue就是使用DelayQueue實現的。

2.5 SynchronousQueue

不存儲元素的阻塞隊列。每個put操作都必須等待一個take操作,反之亦然。

// fair為true,等待線程將以FIFO的順序進行訪問
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

將生產者線程處理的數據直接傳遞給消費者線程。隊列本身不存儲任何元素,非常適合傳遞性場景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue

3 阻塞隊列的原理

利用Lock鎖的多條件(Condition)阻塞控制。下面簡單分析下ArrayBlockingQueue部分代碼。

3.1 ArrayBlockingQueue屬性
/** The queued items */
// 數據元素數組
final Object[] items;

/** items index for next take, poll, peek or remove */
// 下一個待獲取元素索引
int takeIndex;

/** items index for next put, offer, or add */
// 下一個待插入元素索引
int putIndex;

/** Number of elements in the queue */
// 隊列中元素個數
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
// 所有訪問的主鎖
final ReentrantLock lock;

/** Condition for waiting takes */
// 消費者監視器
private final Condition notEmpty;

/** Condition for waiting puts */
// 生產者監視器
private final Condition notFull;


// 
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
3.2 put操作
// 在隊列尾部插入元素,若隊列已滿則等待隊列非滿。
public void put(E e) throws InterruptedException {
  // 校驗插入元素,為空則拋出NPE
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  // 1. 嘗試獲取鎖(響應中斷)
  lock.lockInterruptibly();
  try {
    // 2. 當隊列滿時
    while (count == items.length)
      // 2.1 若隊列滿,則阻塞當前線程。等待`notFull.signal()`喚醒。
      notFull.await();
    // 3. 非滿則執行入隊操作
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

// 在`putIndex`處放置當前元素,只有獲取lock鎖后才會調用
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  // 在`putIndex`處放置元素
  items[putIndex] = x;
  // putIndex等于數組長度時,重置為0索引。
  if (++putIndex == items.length)
    putIndex = 0;
  // 數量加1
  count++;
  // 4. 喚醒一個等待線程(等待取元素的線程)
  notEmpty.signal();
}

put總體流程:

  1. 獲取lock鎖,拿到鎖后繼續執行,否則自旋競爭鎖。

  2. 判斷阻塞隊列是否滿。滿了了則調用await阻塞當前線程。同時釋放lock鎖。

  3. 如果沒滿,則調用enqueue方法將元素put進阻塞隊列。此時還有一種可能是:第2步中被阻塞的線程被喚醒且又拿到了lock鎖。

  4. 喚醒一個標記為notEmpty(消費者)的線程。

3.3 take操作
// 從頭部獲取元素,若隊列為空則等待隊列非空。
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 1. 獲取鎖
  lock.lockInterruptibly();
  try {
    // 2. 當隊列為空時
    while (count == 0)
      // 2.1 當隊列為空時,阻塞當前線程。等待`notEmpty.signal()`喚醒。
      notEmpty.await();
    // 3. 非空則進行入隊操作
    return dequeue();
  } finally {
    lock.unlock();
  }
}

// 從`takeIndex`位置獲取當前元素,只有獲取到lock鎖后才會調用
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  // 從`takeIndex`位置獲取元素,然后清除該位置元素
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  // 
  if (++takeIndex == items.length)
    takeIndex = 0;
  // 隊列元素減1
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 4. 喚醒一個標記為notFull(生產者)的線程
  notFull.signal();
  return x;
}

take的整體流程:

  1. 獲取lock鎖,拿到鎖則執行下一步流程;未拿到則自旋競爭鎖。

  2. 當前隊列是否為空,若為空則調用notEmpty.await阻塞當前線程,同時釋放鎖,等待被喚醒。

  3. 若非空,則調用dequeue進行出隊操作。此時還有一種可能:第2步中的阻塞的線程被喚醒并且又拿到了lock鎖。

  4. 喚醒一個被標記為notFull(生產者)的線程。

3.4 總結
  1. puttake操作都需要先獲得鎖,沒有獲得鎖的線程無法進行操作。

  2. 拿到鎖后,并不一定能順利執行put/take操作,還需要判斷隊列是否可用(是否滿/空),不可用則會被阻塞,并釋放鎖。

  3. 在2中被阻塞的線程會被喚醒,但喚醒之后依然需要拿到鎖之后才能繼續向下執行。否則,自旋拿鎖,拿到鎖后再while判斷隊列是否可用。

看完上述內容,你們掌握Java中怎么使用BlockingQueue實現并發的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

元朗区| 钦州市| 莱州市| 手游| 庆城县| 射洪县| 齐河县| 绥中县| 娄烦县| 大方县| 望城县| 察雅县| 亳州市| 齐河县| 扶沟县| 江油市| 玛曲县| 新巴尔虎左旗| 张家港市| 固镇县| 禹城市| 黄山市| 宁波市| 南投县| 洛川县| 万州区| 建湖县| 平陆县| 陇南市| 湖北省| 穆棱市| 怀柔区| 湄潭县| 太仓市| 山东| 江油市| 遂川县| 舟曲县| 铁力市| 西乡县| 左云县|