中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ConcurrentLinkedQueue 1.8 源碼淺析

發布時間:2020-08-09 06:01:53 來源:網絡 閱讀:1014 作者:wx5c78c8b1dbb1b 欄目:編程語言

[TOC]

ConcurrentLinkedQueue 1.8 源碼淺析

一,簡介

ConcurrentlinkedQueue 還是一個基于鏈表的,×××的,線程安全的單端隊列,它采用先進先出(FIFO)的規則對節點進行排序,當我們加入一個元素時,它會插入隊列的尾部,當我們獲取元素時,會從隊列的首部獲取元素。它沒有使用鎖來保證線程安全,使用的是“wait-free”算法來保證整個隊列的線程安全。

二,基本成員簡介
Node 節點對象
        // 存儲的數據
        volatile E item;
        // 下一個節點引用
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        // 構造一個node節點
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        // 修改節點的item
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        // 懶修改節點的next
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        // cas修改節點的next節點
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
head 頭節點
 private transient volatile Node<E> head;
tail 尾節點
private transient volatile Node<E> tail;
三,常用方法解析
無參構造方法
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

構造節點是其實就是構造了一個node的item為null的節點,然后head和tail指向這個節點,如下圖所示:

ConcurrentLinkedQueue 1.8 源碼淺析

add 方法
public boolean add(E e) {
        return offer(e);
    }

我們可以看出其實調用的是offer方法,具體參考offer方法的講解。

offer 方法

源碼解析:

    public boolean offer(E e) {
        // 入隊元素不能為null
        checkNotNull(e);
        // 創建新的節點
        final Node<E> newNode = new Node<E>(e);
        // 死循環,設置節點
        // p獲取尾節點
        for (Node<E> t = tail, p = t;;) {
            // q是p的next節點
            Node<E> q = p.next;
            // 獲取尾節點的next節點
            // 尾節點沒有下一個節點
            if (q == null) {
                // p is last node
                // 這一步說明p是尾節點,新的節點設置為尾節點的next節點
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    // 設置尾節點,當之前的尾節點和現在插入的節點之間有一個節點時
                    // 并不是每一次都cas設置尾節點(優化手段,是怎么想到這種優化的??)
                    if (p != t) // hop two nodes at a time
                        // cas設置尾節點,可能會失敗
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 多線程操作時候,由于poll時候會把舊的head變為自引用,然后將head的next設置為新的head
            // 所以這里需要重新找新的head
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // 尋找尾節點
                // Check for tail updates after two hops.
                // p!=t
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

分析插入過程,我們插入使用3個線程來調用offer 方法,ThreadA,ThreadB同時運行,ThreadC最后插入,分析下offer方法的流程。

第一步,隊列屬于初始化狀態,ThreadA,ThreadB同時調用offer方法;創建節點,死循環設置節點,獲取尾節點的next節點,此時q== null,兩個線程都同時可能看見,然后cas設置尾節點的next節點(隊列狀態如圖A所示),我們假設是ThreadA線程cas設置成功了,然后p==t此時的尾節點其實沒有發生變化;此時我們來看ThreadB由于A成功了,所以ThreadB cas失敗了,重新循環,此時q != null了,p == q顯然不等于,再看下一個else判斷p!=t,此時顯然p == t,所以才是p = q,然后再次循環,此時的q==null,我們假設沒有線程來和ThreadB競爭,所以cas設置成功,然后p!=t嗎,顯然滿足所以設置尾節點,此時的設置尾節點的節點和之前的尾節點之間剛剛好有一個節點(如圖B所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

ConcurrentLinkedQueue 1.8 源碼淺析

第二步,ThreadC插入,此時的尾節點是ThreadB插入的節點假設是B,獲取B的next節點,q == null,然后cas設置節點,完成,p==t,所以不用更新尾節點(如圖C所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

peek 方法

注意:不會刪除元素,要和poll方法區別

    public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    // 更新頭結點
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
poll 方法
    public E poll() {
        restartFromHead:
        // 循環
        for (;;) {
            // 獲取頭結點
            for (Node<E> h = head, p = h, q;;) {
                // 獲取節點的內容
                E item = p.item;
                // item不為null ,使用cas設置item為空
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    // 更新頭結點,和尾節點一樣,不是每次都更新
                    // 頭結點item為null是,下個節點就必須更新頭結點
                    // 頭結點item不為null時,規則和更新尾節點一樣
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 那么獲取p節點的下一個節點,如果p節點的下一節點為null,則表明隊列已經空了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // p == q,說明別的線程調用了updateHead,
                // 自己的next 指向了自己,重新循環,獲取最新的頭結點
                else if (p == q)
                    continue restartFromHead;
                // 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點
                else
                    p = q;
            }
        }
    }

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

分析我們按照offer時候元素來執行poll方法,ThreadD和ThreadE同時執行來分析下隊列的變化(主要分析p==q的產生)。

初始狀態(如圖C所示)

ConcurrentLinkedQueue 1.8 源碼淺析

第一步,ThreadD和ThreadE執行poll操作,item等于null,所以執行執行下面的操作(q = p.next) == null不等于,p == q不等于,所以p = q,其實就是上圖的ThreadA插入的節點,此時的item已經不為null了,所以執行cas設置item為null的操作,假設ThreadD執行成功了,那么此時p!=h就滿足了,所以此時要更新頭結點調用updateHead,這個方法會更新頭結點,并且把原來的頭節點的next設置為自己)(如圖D所示);接下我們分析ThreadE,cas失敗了需要重新執行,此時的item已經不為null,所以執行執行下面的操作(q = p.next) == null不等于,p == q這使其實已經是等于了,因為ThreadD改變了了以前頭結點的next節點為自己,所以需要重新遍歷,獲取最新的頭結點,此時的頭結點其實就是ThreadA插入的節點,然后item為null,接著執行下面的判斷,最終p就是p.next節點也就是ThreadB節點,然后cas設置item為null,由于p=p.next,所以p發生了變化,所以需要設置ThreadB為頭結點(如圖E所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

ConcurrentLinkedQueue 1.8 源碼淺析
看到上面的執行流程可能就有人有疑問了,這不是每次都更新頭結點嗎,沒有優化啊,只看poll方法確實是這樣,那什么時候會產生不是每次都更新頭節點了,那就是當頭節點的item不為null的時候,但是如果按初始化的狀況來看,頭結點的item一直是null,但是當我看了peek方法之后才發現,peek可以改變這個情況,可以設置item不為null的頭結點,其實我們可以在poll方法前調用下peek方法,其實就啟動了優化策略,不是每次更新頭結點,不知道作者是不是這么設計的,反正就是牛皮

size 方法
public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

我們可以發現size沒有加鎖,就是遍歷了整個隊列,但是遍歷的同時可能在發生poll或者offer,所以size不是特別的精確,用的時候要注意。

四,總結

ConcurrentLinkedQueue是×××的隊列,所以使用時一定要注意內存溢出的問題,還有在執行size方法時一定要注意這個是不準確的值;在學poll和offer方法時,一定要理解更新head和tail節點的時機,這種優化手段值得我們去學習,我覺得這就是學習源碼的作用,就是學習作者的源碼思想。

參考《Java 并發編程的藝術》

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

关岭| 田东县| 南宁市| 濮阳市| 高邮市| 博乐市| 怀远县| 蓝田县| 兴隆县| 章丘市| 合山市| 陆川县| 渝北区| 北碚区| 阆中市| 许昌市| 贡嘎县| 贵阳市| 清涧县| 大港区| 任丘市| 阿克陶县| 松原市| 惠州市| 兴安县| 安溪县| 松溪县| 道真| 溆浦县| 淳安县| 鲁甸县| 陈巴尔虎旗| 龙山县| 南涧| 温州市| 娄烦县| 龙门县| 普洱| 和龙市| 洛隆县| 宜兰市|