中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JUC并發編程LinkedBlockingQueue隊列源碼分析

發布時間:2023-05-08 16:26:01 來源:億速云 閱讀:136 作者:iii 欄目:開發技術

這篇文章主要介紹了JUC并發編程LinkedBlockingQueue隊列源碼分析的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇JUC并發編程LinkedBlockingQueue隊列源碼分析文章都會有所收獲,下面我們一起來看看吧。

LinkedBlockingQueue介紹

在JUC包下關于線程安全的隊列實現有很多,那么此篇文章講解LinkedBlockingQueue的實現原理,相信各位讀者在線程池中能看到LinkedBlockingQueue或者SynchronousQueue隊列來作為儲存任務和消費任務的通道。一個并發安全的隊列,在多線程中充當著安全的傳輸任務的責任。

既然是介紹LinkedBlockingQueue,那么從構造方法入手最合適不過。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化一個偽節點,讓head和last都指向這個偽節點
    // 為什么需要偽節點的存在?
    // 因為可以保證不會發生極端情況(假設沒有偽節點,并且只存在一個節點的情況下,生產者和消費者并發執行就可能出現極端情況)
    last = head = new Node<E>(null);
}

JUC并發編程LinkedBlockingQueue隊列源碼分析

為什么需要存在偽節點,因為可以保證不會發生極端情況(假設沒有偽節點,并且只存在一個節點的情況下,生產者和消費者并發執行就可能出現極端情況,用偽節點就能很好的解決這個極端問題)

/**
 * 因為是隊列,用鏈表實現,所以頭尾指針肯定不可少。
 * */
transient Node<E> head;
private transient Node<E> last;
/**
 * 我們可以很清楚的看到,這里使用了2套ReentrantLock和對應的condition條件等待隊列。
 * 目的也很明顯,讓生產者和消費者并行。
 * */
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

2套ReentrantLock和對應的condition條件等待隊列,很明顯目的是為了讓生產者和消費者并行,所以就需要一個偽節點處理極端并發情況。

為了,一些沒有接觸過隊列的讀者,所以這里還是介紹一下API把

API用途注意事項
offer生產者不會阻塞,如果插入失敗,或者隊列已經滿了,直接返回
poll消費者不會阻塞,如果消費失敗,或者隊列當前為空,直接返回
put生產者會阻塞,如果插入失敗或者隊列已經滿了,阻塞直到插入成功
take消費者會阻塞,如果消費失敗或者當前隊列為空,阻塞直到消費成功

put方法-生產者

public void put(E e) throws InterruptedException {
    // 不能插入null
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 創建插入的節點。
    Node<E> node = new Node<E>(e);
    // 拿到生產者的鎖對象
    final ReentrantLock putLock = this.putLock;
    // 拿到全局計數器,注意這里用的是AtomicInteger,所以自增的原子性已經保證。
    final AtomicInteger count = this.count;
    // 上的是可響應中斷鎖。
    putLock.lockInterruptibly();
    try {
        // 如果當前隊列已經滿了,此時我們就要去阻塞,等待隊列被消費,我們要被喚醒,醒來生產節點。
        while (count.get() == capacity) {
            // 進入條件等待隊列阻塞。
            // 注意,只要阻塞,是會釋放鎖的,其他生產者線程可以搶到鎖。
            notFull.await();
        }
        // 插入到隊列尾部
        enqueue(node);
        // 因為插入了節點,所以全局計數需要+1
        // 但是這里請注意細節,getAndIncrement方法返回的是舊值。
        c = count.getAndIncrement();
        // 這里是一個很sao的點
        // 注意,這里只要當前隊列沒滿,喚醒的是生產者的條件等待隊列。
        // 為什么要這么做?
        // 很簡單,首先需要考慮,生產者和消費者是并發執行了。 
        // 其次,只要隊列沒滿就能一直生產,那么隊列一旦滿了后,后來的線程就都去條件隊列阻塞,所以線程生產完一個節點就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務)
        if (c + 1 < capacity)
            // 喚醒條件等待隊列中頭部節點。
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 這里也是一個很sao的點
    // 再次強調,getAndIncrement方法是返回的舊值
    // 所以當前生產者如果生產的是第一個節點,那么c ==0
    // 而隊列中沒有節點,消費者是要阻塞的
    // 也即,這里給隊列生產了一個節點,要喚醒消費者去消費節點。
    if (c == 0)
        signalNotEmpty();
}
// 插入到隊列尾部
// 因為ReentrantLock保證了整體的原子性,所以這里細節部分不需要保證原子性了。
private void enqueue(Node<E> node) {
    // 插入到尾部
    last = last.next = node;
}

第一次看到這個代碼難免會發生震撼,為什么在生產者代碼里面喚醒生產者?不是正常寫的生產者消費者模型,不都是生產者生產一個喚醒消費者消費嗎?怎么這里不一樣??????

因為這里生產者和消費者并行處理,當隊列滿了以后,后來的生產者線程都會去阻塞,所以生產者線程生產完一個節點就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務)

大致流程如下:

  • 創建Node節點

  • 上生產者鎖

  • 如果隊列已經滿了,就去生產者條件隊列阻塞

  • 如果沒滿,或者喚醒后,就插入到last指針的后面

  • 全局節點計數器+1

  • 如果當前隊列還有空間,就喚醒在阻塞的同胞。

  • 釋放鎖

  • 如果在生產之前隊列為空,本次生產后就需要喚醒在阻塞的消費者線程,讓他們醒來消費我剛生產的節點

take方法-消費者

public E take() throws InterruptedException {
    E x;
    int c = -1;
    // 全局計數器
    final AtomicInteger count = this.count;
    // 消費者的鎖對象
    final ReentrantLock takeLock = this.takeLock;
    // 可響應中斷鎖。
    takeLock.lockInterruptibly();
    try {
        // 如果當前隊列中沒有節點,此時消費者需要去阻塞,因為不阻塞他只會浪費CPU性能,又消費不到節點。
        while (count.get() == 0) {
            // 去消費者的條件隊列阻塞。
            notEmpty.await();
        }
        // 醒來后,去消費節點。
        x = dequeue();
        // 給全局計數器-1,但是這里也要注意,返回的是舊值
        c = count.getAndDecrement();
        // 如果隊列中還有節點就喚醒其他消費者去消費節點。
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 這里也是一個sao點
    // 請注意,這里的c是舊值,因為getAndDecrement返回的是舊值
    // 所以,如果當前消費線程消費節點之前隊列是滿的,當消費完畢后,我有必要去喚醒因為隊列滿了而阻塞等待的生產者,因為當前已經空出一個空間了。
    if (c == capacity)
        // 喚醒生產者
        signalNotFull();
    return x;
}
// 消費者消費節點
// 所以需要HelpGC 
// 不過這里要注意,head都是指向偽節點。
private E dequeue() {
    // 拿到頭節點,
    Node<E> h = head;
    // 拿到頭節點的next節點,next節點作為下一個head節點。
    // 因為head節點是指向偽節點,所以head.next節點就是當前要消費的節點。
    Node<E> first = h.next;
    // 將當前的頭結點的next指向自己。
    h.next = h; // help GC
    // 設置新的頭結點,也即把當前消費的節點做為下次的偽節點
    // head節點指向的都是偽節點
    head = first;
    // 拿到當前消費者想要的數據
    E x = first.item;
    first.item = null;
    return x;
}

這里跟put生產者基本思想一致,只不過這里是消費者,因為是生產者消費者并行,所以這里也是喚醒同胞,因為當隊列為空所有的消費者都會阻塞,所以每次消費者線程消費完節點后 ,有義務喚醒同胞。

大致流程如下:

  • 拿到全局計數器

  • 上消費者鎖

  • 如果當前隊列為空,當前消費者線程就要去阻塞

  • 如果不為空,或者被喚醒以后消費節點,把消費的節點作為下一次的偽節點,也即作為head節點

  • 全局計數器-1

  • 喚醒同胞

  • 釋放鎖

  • 如果在消費之前隊列已經滿了,那么可能會有生產者線程在阻塞,所以我有義務去喚醒他們

關于“JUC并發編程LinkedBlockingQueue隊列源碼分析”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“JUC并發編程LinkedBlockingQueue隊列源碼分析”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

汕尾市| 平遥县| 清涧县| 定远县| 德钦县| 松江区| 绩溪县| 安福县| 阳谷县| 额尔古纳市| 嫩江县| 平陆县| 高陵县| 喜德县| 安徽省| 奎屯市| 泸州市| 隆林| 泸水县| 白玉县| 望奎县| 绵竹市| 台中县| 宜阳县| 福海县| 墨玉县| 宁晋县| 双牌县| 定安县| 吐鲁番市| 巨野县| 原阳县| 宁海县| 洛扎县| 资源县| 汝南县| 鄂托克前旗| 正蓝旗| 乐至县| 河西区| 申扎县|