中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java concurrency集合之ArrayBlockingQueue_動力節點Java學院整理

發布時間:2020-09-26 18:20:33 來源:腳本之家 閱讀:118 作者:mrr 欄目:編程語言

ArrayBlockingQueue介紹

ArrayBlockingQueue是數組實現的線程安全的有界的阻塞隊列。

線程安全是指,ArrayBlockingQueue內部通過“互斥鎖”保護競爭資源,實現了多線程對競爭資源的互斥訪問。而有界,則是指ArrayBlockingQueue對應的數組是有界限的。 阻塞隊列,是指多線程訪問競爭資源時,當競爭資源已被某線程獲取時,其它要獲取該資源的線程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先進先出)原則對元素進行排序,元素都是從尾部插入到隊列,從頭部開始返回。

注意:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是數組實現的,并且是有界限的;而ConcurrentLinkedQueue是鏈表實現的,是無界限的。 

ArrayBlockingQueue原理和數據結構

ArrayBlockingQueue的數據結構,如下圖所示:

Java concurrency集合之ArrayBlockingQueue_動力節點Java學院整理

說明:

    1. ArrayBlockingQueue繼承于AbstractQueue,并且它實現了BlockingQueue接口。

    2. ArrayBlockingQueue內部是通過Object[]數組保存數據的,也就是說ArrayBlockingQueue本質上是通過數組實現的。ArrayBlockingQueue的大小,即數組的容量是創建ArrayBlockingQueue時指定的。   

3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。

ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關于具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue默認會使用非公平鎖。   

4. ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象(notEmpty和notFull)。而且,Condition又依賴于ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問 -- (01)若某線程(線程A)要取數據時,數組正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向數組中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。(02)若某線程(線程H)要插入數據時,數組已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。   

ArrayBlockingQueue函數列表

// 創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,并以 collection 迭代器的遍歷順序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則拋出 IllegalStateException。
boolean add(E e)
// 自動移除此隊列中的所有元素。
void clear()
// 如果此隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 移除此隊列中所有可用的元素,并將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,并將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在此隊列中的元素上按適當順序進行迭代的迭代器。
Iterator<E> iterator()
// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false。
boolean offer(E e)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。
E peek()
// 獲取并移除此隊列的頭,如果此隊列為空,則返回 null。
E poll()
// 獲取并移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則等待可用的空間。
void put(E e)
// 返回在無阻塞的理想情況下(不存在內存或資源約束)此隊列能接受的其他元素數量。
int remainingCapacity()
// 從此隊列中移除指定元素的單個實例(如果存在)。
boolean remove(Object o)
// 返回此隊列中元素的數量。
int size()
// 獲取并移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。
E take()
// 返回一個按適當順序包含此隊列中所有元素的數組。
Object[] toArray()
// 返回一個按適當順序包含此隊列中所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

下面從ArrayBlockingQueue的創建,添加,取出,遍歷這幾個方面對ArrayBlockingQueue進行分析。

1. 創建

下面以ArrayBlockingQueue(int capacity, boolean fair)來進行說明。

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

說明:

(01) items是保存“阻塞隊列”數據的數組。它的定義如下:

final Object[] items;

(02) fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。

notEmpty和notFull是鎖的兩個Condition條件。它們的定義如下:

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細的對鎖進行控制,它依賴于Lock,通過某個條件對多線程進行控制。

notEmpty表示“鎖的非空條件”。當某線程想從隊列中取數據時,而此時又沒有數據,則該線程通過notEmpty.await()進行等待;當其它線程向隊列中插入了元素之后,就調用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態的線程”。

同理,notFull表示“鎖的滿條件”。當某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當其它線程從隊列中取出元素之后,就喚醒該等待的線程。 

2. 添加

下面以offer(E e)為例,對ArrayBlockingQueue的添加方法進行說明。

public boolean offer(E e) {
  // 創建插入的元素是否為null,是的話拋出NullPointerException異常
  checkNotNull(e);
  // 獲取“該阻塞隊列的獨占鎖”
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 如果隊列已滿,則返回false。
    if (count == items.length)
      return false;
    else {
    // 如果隊列未滿,則插入e,并返回true。
      insert(e);
      return true;
    }
  } finally {
    // 釋放鎖
    lock.unlock();
  }
}

說明:offer(E e)的作用是將e插入阻塞隊列的尾部。如果隊列已滿,則返回false,表示插入失敗;否則,插入元素,并返回true。

(01) count表示”隊列中的元素個數“。除此之外,隊列中還有另外兩個遍歷takeIndex和putIndex。takeIndex表示下一個被取出元素的索引,putIndex表示下一個被添加元素的索引。它們的定義如下:

// 隊列中的元素個數
int takeIndex;
// 下一個被取出元素的索引
int putIndex;
// 下一個被添加元素的索引
int count;

(02) insert()的源碼如下:

private void insert(E x) {
  // 將x添加到”隊列“中
  items[putIndex] = x;
  // 設置”下一個被取出元素的索引“
  putIndex = inc(putIndex);
  // 將”隊列中的元素個數”+1
  ++count;
  // 喚醒notEmpty上的等待線程
  notEmpty.signal();
}

insert()在插入元素之后,會喚醒notEmpty上面的等待線程。

inc()的源碼如下:

final int inc(int i) {
  return (++i == items.length) ? 0 : i;
}

若i+1的值等于“隊列的長度”,即添加元素之后,隊列滿;則設置“下一個被添加元素的索引”為0。 

3. 取出

下面以take()為例,對ArrayBlockingQueue的取出方法進行說明。

public E take() throws InterruptedException {
  // 獲取“隊列的獨占鎖”
  final ReentrantLock lock = this.lock;
  // 獲取“鎖”,若當前線程是中斷狀態,則拋出InterruptedException異常
  lock.lockInterruptibly();
  try {
    // 若“隊列為空”,則一直等待。
    while (count == 0)
      notEmpty.await();
    // 取出元素
    return extract();
  } finally {
    // 釋放“鎖”
    lock.unlock();
  }
}

說明:take()的作用是取出并返回隊列的頭。若隊列為空,則一直等待。

extract()的源碼如下:

private E extract() {
  final Object[] items = this.items;
  // 強制將元素轉換為“泛型E”
  E x = this.<E>cast(items[takeIndex]);
  // 將第takeIndex元素設為null,即刪除。同時,幫助GC回收。
  items[takeIndex] = null;
  // 設置“下一個被取出元素的索引”
  takeIndex = inc(takeIndex);
  // 將“隊列中元素數量”-1
  --count;
  // 喚醒notFull上的等待線程。
  notFull.signal();
  return x;
}

說明:extract()在刪除元素之后,會喚醒notFull上的等待線程。 

4. 遍歷

下面對ArrayBlockingQueue的遍歷方法進行說明。

public Iterator<E> iterator() {
  return new Itr();
}

Itr是實現了Iterator接口的類,它的源碼如下:

private class Itr implements Iterator<E> {
  // 隊列中剩余元素的個數
  private int remaining; // Number of elements yet to be returned
  // 下一次調用next()返回的元素的索引
  private int nextIndex; // Index of element to be returned by next
  // 下一次調用next()返回的元素
  private E nextItem;  // Element to be returned by next call to next
  // 上一次調用next()返回的元素
  private E lastItem;  // Element returned by last call to next
  // 上一次調用next()返回的元素的索引
  private int lastRet;  // Index of last element returned, or -1 if none
  Itr() {
    // 獲取“阻塞隊列”的鎖
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
      lastRet = -1;
      if ((remaining = count) > 0)
        nextItem = itemAt(nextIndex = takeIndex);
    } finally {
      // 釋放“鎖”
      lock.unlock();
    }
  }
  public boolean hasNext() {
    return remaining > 0;
  }
  public E next() {
    // 獲取“阻塞隊列”的鎖
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
      // 若“剩余元素<=0”,則拋出異常。
      if (remaining <= 0)
        throw new NoSuchElementException();
      lastRet = nextIndex;
      // 獲取第nextIndex位置的元素
      E x = itemAt(nextIndex); // check for fresher value
      if (x == null) {
        x = nextItem;     // we are forced to report old value
        lastItem = null;   // but ensure remove fails
      }
      else
        lastItem = x;
      while (--remaining > 0 && // skip over nulls
          (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
        ;
      return x;
    } finally {
      lock.unlock();
    }
  }
  public void remove() {
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
      int i = lastRet;
      if (i == -1)
        throw new IllegalStateException();
      lastRet = -1;
      E x = lastItem;
      lastItem = null;
      // only remove if item still at index
      if (x != null && x == items[i]) {
        boolean removingHead = (i == takeIndex);
        removeAt(i);
        if (!removingHead)
          nextIndex = dec(nextIndex);
      }
    } finally {
      lock.unlock();
    }
  }
}

ArrayBlockingQueue示例

import java.util.*;
import java.util.concurrent.*;
/*
 *  ArrayBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 *
 *  下面是“多個線程同時操作并且遍歷queue”的示例
 *  (01) 當queue是ArrayBlockingQueue對象時,程序能正常運行。
 *  (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
 *
 * 
 */
public class ArrayBlockingQueueDemo1{
  // TODO: queue是LinkedList對象時,程序會出錯。
  //private static Queue<String> queue = new LinkedList<String>();
  private static Queue<String> queue = new ArrayBlockingQueue<String>(20);
  public static void main(String[] args) {
    // 同時啟動兩個線程對queue進行操作!
    new MyThread("ta").start();
    new MyThread("tb").start();
  }
  private static void printAll() {
    String value;
    Iterator iter = queue.iterator();
    while(iter.hasNext()) {
      value = (String)iter.next();
      System.out.print(value+", ");
    }
    System.out.println();
  }
  private static class MyThread extends Thread {
    MyThread(String name) {
      super(name);
    }
    @Override
    public void run() {
        int i = 0;
      while (i++ < 6) {
        // “線程名” + "-" + "序號"
        String val = Thread.currentThread().getName()+i;
        queue.add(val);
        // 通過“Iterator”遍歷queue。
        printAll();
      }
    }
  }
}

(某一次)運行結果:

ta1, ta1, 
tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, tb1, tb2, 
ta2, ta1, tb2, tb1, ta3, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, tb5, ta3, ta6, 
tb3, ta4, tb4, ta5, tb5, ta6, tb6, 

結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

承德县| 黔江区| 康平县| 拜城县| 兴和县| 宁晋县| 南投县| 鄯善县| 钟祥市| 大丰市| 昌吉市| 安宁市| 当阳市| 墨江| 武义县| 花莲县| 延寿县| 黎平县| 河东区| 萝北县| 平湖市| 阿拉善左旗| 双桥区| 信宜市| 错那县| 永登县| 赣州市| 高唐县| 当涂县| 孟连| 泾川县| 珠海市| 教育| 阿瓦提县| 锦屏县| 定远县| 宜兰市| 萨迦县| 湖北省| 南溪县| 白山市|