中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JAVA中怎么利用阻塞隊列實現一個并發容器

發布時間:2021-07-01 14:56:25 來源:億速云 閱讀:143 作者:Leah 欄目:大數據

今天就跟大家聊聊有關JAVA中怎么利用阻塞隊列實現一個并發容器,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

在并發編程中,有時候需要使用線程安全的隊列。如果要實現一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。非阻塞的實現方式則可以使用循環CAS的方式來實現。

阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。

  2. 支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。

應用場景

阻塞隊列常用于生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是 從隊列里取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。

插入和移除操作的4中處理方式

JAVA中怎么利用阻塞隊列實現一個并發容器

  • 拋出異常:當隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常。

  • 返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列里取出一個元素,如果沒有則返回null。

  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞消費者線程,直到隊列不為空。

  • 超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程 一段時間,如果超過了指定的時間,生產者線程就會退出。

注意: 如果是無界阻塞隊列,隊列不可能會出現滿的情況,所以使用put或offer方法永 遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。

Java里的阻塞隊列

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。

  • LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。

  • PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。

  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。

  • SynchronousQueue:一個不存儲元素的阻塞隊列。

  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。

  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,在初始化時需要指定隊列的長度。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證線程公平的訪問隊列,但是在初始化的隊列的時候指定阻塞隊列的公平性,如:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);。它使用ReentrantLock來實現隊列的線程安全。

核心屬性

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

核心方法

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。出隊和入隊使用兩把鎖來實現。

核心屬性

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采取自然順序升序排列。也可以自定義類實現compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證 同優先級元素的順序。底層使用數組實現,默認初始容量是11,最大值是Integer.MAX_VALUE - 8。容量不夠時會進行擴容

核心方法

    // 入隊
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    // 擴容
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊 列中的元素必須實現Delayed接口和Comparable<Delayed>接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。 只有在延遲期滿時才能從隊列中提取元素。

應用場景

  • 緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

  • 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從 DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。

如何實現Delayed接口

DelayQueue隊列的元素必須實現Delayed接口。我們可以參考ScheduledThreadPoolExecutor 里ScheduledFutureTask類的實現,一共有三步。 第一步:在對象創建的時候,初始化基本數據。使用time記錄當前對象延遲到什么時候可 以使用,使用sequenceNumber來標識元素在隊列中的先后順序。代碼如下:

private static final AtomicLong sequencer = new AtomicLong(0);

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    ScheduledFutureTask(Runnable r, V result, long ns, long period){
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
}

第二步:實現getDelay方法,該方法返回當前元素還需要延時多長時間,單位是納秒,代碼 如下:

public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

注意當time小于當前時間時,getDelay會返回負數,這時才可以出隊。

第三步:實現compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末 尾。實現代碼如下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
        ScheduledThreadPoolExecutor.ScheduledFutureTask<?> x = (ScheduledThreadPoolExecutor.ScheduledFutureTask<?>)other;
        // 過期時間小的排前面,大的排后面,如果一樣就使用sequenceNumber 來排序。
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    // 快要過期的排在前面
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

如何實現延時阻塞隊列

延時阻塞隊列的實現很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時 間,就阻塞當前線程。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    // 隊列為NULL,阻塞線程直到超時
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 等待時間小于第一個元素的過期時間
                if (nanos < delay || leader != null)
                    // 阻塞線程直到超時
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待時間大于第一個元素的過期時間,阻塞線程直到第一個元素過期
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 喚醒其他阻塞線程
            available.signal();
        lock.unlock();
    }
}

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作, 否則不能繼續添加元素。 它支持公平訪問隊列。默認情況下線程采用非公平性策略訪問隊列。

SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費 者線程。隊列本身并不存儲任何元素,非常適合傳遞性場景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻 塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法

如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等 待接收元素,transfer方法會將元素存放在隊列的tail節點,并等到該元素被消費者消費了才返 回。

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以 從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊 時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙 端隊列的最后一個元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法卻等同于takeFirst,不知道是不是JDK的bug,使用時還是用帶有First 和Last后綴的方法更清楚。

在初始化LinkedBlockingDeque時可以設置容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。

看完上述內容,你們對JAVA中怎么利用阻塞隊列實現一個并發容器有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

民权县| 道孚县| 光山县| 阿瓦提县| 额尔古纳市| 襄汾县| 江北区| 朝阳县| 水城县| 彰化县| 浦江县| 西平县| 梁平县| 镇巴县| 东山县| 屏边| 迁安市| 大洼县| 来安县| 中山市| 兴义市| 平邑县| 滨州市| 平湖市| 河南省| 深泽县| 达州市| 阳信县| 平罗县| 恩平市| 台安县| 德江县| 全椒县| 砚山县| 东城区| 恩施市| 麦盖提县| 聊城市| 亚东县| 大庆市| 武定县|