您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關JAVA中怎么利用阻塞隊列實現一個并發容器,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
在并發編程中,有時候需要使用線程安全的隊列。如果要實現一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。非阻塞的實現方式則可以使用循環CAS的方式來實現。
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。
支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。
阻塞隊列常用于生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是 從隊列里取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。
拋出異常:當隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常。
返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列里取出一個元素,如果沒有則返回null。
一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞消費者線程,直到隊列不為空。
超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程 一段時間,如果超過了指定的時間,生產者線程就會退出。
注意: 如果是無界阻塞隊列,隊列不可能會出現滿的情況,所以使用put或offer方法永 遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。
ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,在初始化時需要指定隊列的長度。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證線程公平的訪問隊列,但是在初始化的隊列的時候指定阻塞隊列的公平性,如:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
。它使用ReentrantLock
來實現隊列的線程安全。
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。出隊和入隊使用兩把鎖來實現。
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采取自然順序升序排列。也可以自定義類實現compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證 同優先級元素的順序。底層使用數組實現,默認初始容量是11,最大值是Integer.MAX_VALUE - 8
。容量不夠時會進行擴容
// 入隊 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } // 擴容 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊 列中的元素必須實現Delayed接口和Comparable<Delayed>接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。 只有在延遲期滿時才能從隊列中提取元素。
緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從 DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。
DelayQueue隊列的元素必須實現Delayed接口。我們可以參考ScheduledThreadPoolExecutor 里ScheduledFutureTask類的實現,一共有三步。 第一步:在對象創建的時候,初始化基本數據。使用time記錄當前對象延遲到什么時候可 以使用,使用sequenceNumber來標識元素在隊列中的先后順序。代碼如下:
private static final AtomicLong sequencer = new AtomicLong(0); ScheduledFutureTask(Runnable r, V result, long ns, long period) { ScheduledFutureTask(Runnable r, V result, long ns, long period){ super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } }
第二步:實現getDelay方法,該方法返回當前元素還需要延時多長時間,單位是納秒,代碼 如下:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
注意當time小于當前時間時,getDelay會返回負數,這時才可以出隊。
第三步:實現compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末 尾。實現代碼如下:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) { ScheduledThreadPoolExecutor.ScheduledFutureTask<?> x = (ScheduledThreadPoolExecutor.ScheduledFutureTask<?>)other; // 過期時間小的排前面,大的排后面,如果一樣就使用sequenceNumber 來排序。 long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } // 快要過期的排在前面 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
延時阻塞隊列的實現很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時 間,就阻塞當前線程。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else // 隊列為NULL,阻塞線程直到超時 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; // don't retain ref while waiting // 等待時間小于第一個元素的過期時間 if (nanos < delay || leader != null) // 阻塞線程直到超時 nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待時間大于第一個元素的過期時間,阻塞線程直到第一個元素過期 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 喚醒其他阻塞線程 available.signal(); lock.unlock(); } }
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作, 否則不能繼續添加元素。 它支持公平訪問隊列。默認情況下線程采用非公平性策略訪問隊列。
SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費 者線程。隊列本身并不存儲任何元素,非常適合傳遞性場景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻 塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等 待接收元素,transfer方法會將元素存放在隊列的tail節點,并等到該元素被消費者消費了才返 回。
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以 從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊 時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙 端隊列的最后一個元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法卻等同于takeFirst,不知道是不是JDK的bug,使用時還是用帶有First 和Last后綴的方法更清楚。
在初始化LinkedBlockingDeque時可以設置容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。
看完上述內容,你們對JAVA中怎么利用阻塞隊列實現一個并發容器有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。