中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ConcurrentHashMap有什么用

發布時間:2021-12-08 09:23:53 來源:億速云 閱讀:124 作者:iii 欄目:云計算

本篇內容主要講解“ConcurrentHashMap有什么用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“ConcurrentHashMap有什么用”吧!

?  
  1. HashTable的      put,      get,      remove等方法是通過      synchronized來修飾保證其線程安全性的。
  2. HashTable是 不允許key跟value為null的。
  3. 問題是      synchronized是個關鍵字級別的重量鎖,在get數據的時候任何寫入操作都不允許。相對來說性能不好。因此目前主要用的      ConcurrentHashMap來保證線程安全性。
?  

ConcurrentHashMap主要分為JDK<=7跟JDK>=8的兩個版本,ConcurrentHashMap的空間利用率更低一般只有10%~20%,接下來分別介紹。

 

JDK7

先宏觀說下JDK7中的大致組成,ConcurrentHashMap由Segment數組結構和HashEntry數組組成。Segment是一種可重入鎖,是一種數組和鏈表的結構,一個Segment中包含一個HashEntry數組,每個HashEntry又是一個鏈表結構。正是通過Segment分段鎖,ConcurrentHashMap實現了高效率的并發。缺點是并發程度是有segment數組來決定的,并發度一旦初始化無法擴容。先繪制個ConcurrentHashMap的形象直觀圖。ConcurrentHashMap有什么用要想理解currentHashMap,可以簡單的理解為將數據「分表分庫」ConcurrentHashMap是由 Segment 數組 結構和HashEntry 數組 結構組成。

?  
  • Segment 是一種可重入鎖      ReentrantLock的子類 ,在      ConcurrentHashMap 里扮演鎖的角色,      HashEntry則用于存儲鍵值對數據。
  • ConcurrentHashMap 里包含一個      Segment 數組來實現鎖分離,      Segment的結構和      HashMap 類似,一個      Segment里包含一個      HashEntry 數組,每個      HashEntry 是一個鏈表結構的元素, 每個      Segment守護者一個      HashEntry 數組里的元素,當對      HashEntry數組的數據進行修改時,必須首先獲得它對應的      Segment 鎖。
?  
  1. 我們先看下segment類:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
     transient volatile HashEntry<K,V>[] table; //包含一個HashMap 可以理解為
}
 

可以理解為我們的每個segment都是實現了Lock功能的HashMap。如果我們同時有多個segment形成了segment數組那我們就可以實現并發咯。

我們看下   currentHashMap的構造函數,先總結幾點。      
    1. 每一個segment里面包含的table(HashEntry數組)初始化大小也一定是2的次冪
    2. 這里設置了若干個用于位計算的參數。
    3. initialCapacity:初始容量大小 ,默認16。
    4. loadFactor: 擴容因子,默認0.75,當一個Segment存儲的元素數量大于initialCapacity* loadFactor時,該Segment會進行一次擴容。
    5. concurrencyLevel:并發度,默認16。并發度可以理解為程序運行時能夠      「同時更新」ConccurentHashMap且不產生鎖競爭的最大線程數,實際上就是ConcurrentHashMap中的      分段鎖個數,即Segment[]的數組長度。如果并發度設置的過小,會帶來嚴重的鎖競爭問題;如果并發度設置的過大,原本位于同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。
    6. segment的數組大小最終一定是2的次冪

構造函數詳解:

   //initialCapacity 是我們保存所以KV數據的初始值
   //loadFactor這個就是HashMap的負載因子
   // 我們segment數組的初始化大小
      @SuppressWarnings("unchecked")
       public ConcurrentHashMap(int initialCapacity,
                                float loadFactor, int concurrencyLevel) {
           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
               throw new IllegalArgumentException();
           if (concurrencyLevel > MAX_SEGMENTS) // 最大允許segment的個數,不能超過 1< 24
               concurrencyLevel = MAX_SEGMENTS;
           int sshift = 0; // 類似擾動函數
           int ssize = 1; 
           while (ssize < concurrencyLevel) {
               ++sshift;
               ssize <<= 1; // 確保segment一定是2次冪
           }
           this.segmentShift = 32 - sshift;  
           //有點類似與擾動函數,跟下面的參數配合使用實現 當前元素落到那個segment上面。
           this.segmentMask = ssize - 1; // 為了 取模 專用
           if (initialCapacity > MAXIMUM_CAPACITY) //不能大于 1< 30
               initialCapacity = MAXIMUM_CAPACITY;
   
           int c = initialCapacity / ssize; //總的數組大小 被 segment 分散后 需要多少個table
           if (c * ssize < initialCapacity)
               ++c; //確保向上取值
           int cap = MIN_SEGMENT_TABLE_CAPACITY; 
           // 每個table初始化大小為2
           while (cap < c) // 單獨的一個segment[i] 對應的table 容量大小。
               cap <<= 1;
           // 將table的容量初始化為2的次冪
           Segment<K,V> s0 =
               new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
               // 負載因子,閾值,每個segment的初始化大小。跟hashmap 初始值類似。
               // 并且segment的初始化是懶加載模式,剛開始只有一個s0,其余的在需要的時候才會增加。
           Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
           UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
           this.segments = ss;
       }
 
  1. hash 不管是我們的get操作還是put操作要需要通過hash來對數據進行定位。
   //  整體思想就是通過多次不同方式的位運算來努力將數據均勻的分不到目標table中,都是些擾動函數
   private int hash(Object k) {
       int h = hashSeed;
       if ((0 != h) && (k instanceof String)) {
           return sun.misc.Hashing.stringHash42((String) k);
       }
       h ^= k.hashCode();
       // single-word Wang/Jenkins hash.
       h += (h <<  15) ^ 0xffffcd7d;
       h ^= (h >>> 10);
       h += (h <<   3);
       h ^= (h >>>  6);
       h += (h <<   2) + (h << 14);
       return h ^ (h >>> 16);
   }
 
  1. get 相對來說比較簡單,無非就是通過     hash找到對應的     segment,繼續通過     hash找到對應的     table,然后就是遍歷這個鏈表看是否可以找到,并且要注意     get的時候是沒有加鎖的。
   public V get(Object key) {
       Segment<K,V> s;
       HashEntry<K,V>[] tab;
       int h = hash(key); // JDK7中標準的hash值獲取算法
       long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到對應的segment上
       if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
           //  無非就是獲得hash值對應的segment 是否存在,
           for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                e != null; e = e.next) {
               // 看下這個hash值對應的是segment(HashEntry)中的具體位置。然后遍歷查詢該鏈表
               K k;
               if ((k = e.key)  key || (e.hash  h && key.equals(k)))
                   return e.value;
           }
       }
       return null;
   }
 
  1. put 相同的思路,先找到     hash值對應的     segment位置,然后看該     segment位置是否初始化了(因為segment是懶加載模式)。選擇性初始化,最終執行put操作。
   @SuppressWarnings("unchecked")
   public V put(K key, V value) {
       Segment<K,V> s;
       if (value  null)
           throw new NullPointerException();
       int hash = hash(key);// 還是獲得最終hash值
       int j = (hash >>> segmentShift) & segmentMask; // hash值位操作對應的segment數組位置
       if ((s = (Segment<K,V>)UNSAFE.getObject          
            (segments, (j << SSHIFT) + SBASE))  null)
           s = ensureSegment(j); 
       // 初始化時候因為只有第一個segment,如果落在了其余的segment中 則需要現初始化。
       return s.put(key, hash, value, false);
       // 直接在數據中執行put操作。
   }
 

其中put操作基本思路跟HashMap幾乎一樣,只是在開始跟結束進行了加鎖的操作tryLock and unlock,然后JDK7中都是先擴容再添加數據的,并且獲得不到鎖也會進行自旋的tryLock或者lock阻塞排隊進行等待(同時獲得鎖前提前new出新數據)。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨占鎖,獲取失敗嘗試獲取自旋鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // first 是數組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key)  key ||
                    (e.hash  hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順著鏈表走
                e = e.next;
            }
            else {
                // node 是不是 null,這個要看獲取鎖的過程。沒獲得鎖的線程幫我們創建好了節點,直接頭插法
                // 如果不為 null,那就直接將它設置為鏈表表頭;如果是 null,初始化并設置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 將新的結點設置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}
 

如果加鎖失敗了調用scanAndLockForPut,完成查找或新建節點的工作。當獲取到鎖后直接將該節點加入鏈表即可,「提升」了put操作的性能,這里涉及到自旋。大致過程:

?  
  1. 在我獲取不到鎖的時候我進行tryLock,準備好new的數據,同時還有一定的次數限制,還要考慮別的已經獲得線程的節點修改該頭節點。
?  
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 循環獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e  null) {
                if (node  null) // speculatively create node
              // 進到這里說明數組該位置的鏈表是空的,沒有任何元素
             // 當然,進到這里的另一個原因是 tryLock() 失敗,所以該槽存在并發,不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順著鏈表往下走
                e = e.next;
        }
    // 重試次數如果超過 MAX_SCAN_RETRIES(單核 1 次多核 64 次),那么不搶了,進入到阻塞隊列等待鎖
    //    lock() 是阻塞方法,直到獲取鎖后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1)  0 &&
                 // 進入這里,說明有新的元素進到了鏈表,并且成為了新的表頭
                 // 這邊的策略是,重新執行 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
 
  1. Size

    這個size方法比較有趣,他是先無鎖的統計下所有的數據量看下前后兩次是否數據一樣,如果一樣則返回數據,如果不一樣則要把全部的segment進行加鎖,統計,解鎖。并且size方法只是返回一個統計性的數字,因此size謹慎使用哦。

public int size() {
       // Try a few times to get accurate count. On failure due to
       // continuous async changes in table, resort to locking.
       final Segment<K,V>[] segments = this.segments;
       int size;
       boolean overflow; // true if size overflows 32 bits
       long sum;         // sum of modCounts
       long last = 0L;   // previous sum
       int retries = -1; // first iteration isn't retry
       try {
           for (;;) {
               if (retries++  RETRIES_BEFORE_LOCK) {  //  超過2次則全部加鎖
                   for (int j = 0; j < segments.length; ++j)
                       ensureSegment(j).lock(); // 直接對全部segment加鎖消耗性太大
               }
               sum = 0L;
               size = 0;
               overflow = false;
               for (int j = 0; j < segments.length; ++j) {
                   Segment<K,V> seg = segmentAt(segments, j);
                   if (seg != null) {
                       sum += seg.modCount; // 統計的是modCount,涉及到增刪該都會加1
                       int c = seg.count;
                       if (c < 0 || (size += c) < 0)
                           overflow = true;
                   }
               }
               if (sum  last) // 每一個前后的修改次數一樣 則認為一樣,但凡有一個不一樣則直接break。
                   break;
               last = sum;
           }
       } finally {
           if (retries > RETRIES_BEFORE_LOCK) {
               for (int j = 0; j < segments.length; ++j)
                   segmentAt(segments, j).unlock();
           }
       }
       return overflow ? Integer.MAX_VALUE : size;
   }
 
  1. rehash     segment 數組初始化后就不可變了,也就是說     「并發性不可變」,不過     segment里的     table可以擴容為2倍,該方法沒有考慮并發,因為執行該方法之前已經獲取了鎖。其中JDK7中的     rehash思路跟JDK8 中擴容后處理鏈表的思路一樣,個人不過感覺沒有8寫的精髓好看。
// 方法參數上的 node 是這次擴容后,需要添加到新的數組中的數據。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創建新數組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那么 sizeMask 為 31,對應二進制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
    // 遍歷原數組,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新數組中的位置,
            // 假設原數組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask; // 新位置
            if (next  null)   // 該位置處只有一個元素
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前鏈表的頭結點 e 的新位置
                int lastIdx = idx;
                // for 循環找到一個 lastRun 結點,這個結點之后的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有結點組成的這個鏈表放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的結點,
                //這些結點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
 
  1. CAS操作 在JDK7里在     ConcurrentHashMap中通過原子操作     sun.misc.Unsafe查找元素、替換元素和設置元素。通過這樣的硬件級別獲得數據可以保證及時是多線程我也每次獲得的數據是最新的。這些原子操作起著非常關鍵的作用,你可以在所有     ConcurrentHashMap的基本功能中看到,隨機距離如下:
     final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab  null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
   static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }
   

常見問題

  1. ConcurrentHashMap實現原理是怎么樣的或者ConcurrentHashMap如何在保證高并發下線程安全的同時實現了性能提升?
?  

ConcurrentHashMap允許多個修改操作并發進行,其關鍵在于使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的HashTable,只要多個修改操作發生在不同的段上,它們就可以并發進行。

?  
  1. 在高并發下的情況下如何保證取得的元素是最新的?
?  

用于存儲鍵值對數據的HashEntry,在設計上它的成員變量value跟next都是volatile類型的,這樣就保證別的線程對value值的修改,get方法可以馬上看到。

?  

ConcurrentHashMap的弱一致性體現在迭代器,clear和get方法,原因在于沒有加鎖。

    1. 比如迭代器在遍歷數據的時候是一個Segment一個Segment去遍歷的,如果在遍歷完一個Segment時正好有一個線程在剛遍歷完的Segment上插入數據,就會體現出不一致性。clear也是一樣。
    2. get方法和containsKey方法都是遍歷對應索引位上所有節點,都是不加鎖來判斷的,如果是修改性質的因為可見性的存在可以直接獲得最新值,不過如果是新添加值則無法保持一致性。
 

JDK8

JDK8相比與JDK7主要區別如下:

?  
  1. 取消了segment數組,直接用table保存數據,鎖的粒度更小,減少并發沖突的概率。采用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,進一步減少并發沖突的概率,并發控制使用Synchronized和CAS來操作。
  2. 存儲數據時采用了數組+ 鏈表+紅黑樹的形式。
?  
  1. CurrentHashMap重要參數:
?  

private static final int MAXIMUM_CAPACITY = 1 << 30; // 數組的最大值 

private static final int DEFAULT_CAPACITY = 16; // 默認數組長度 

static final int TREEIFY_THRESHOLD = 8; // 鏈表轉紅黑樹的一個條件 

static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹轉鏈表的一個條件 

static final int MIN_TREEIFY_CAPACITY = 64; // 鏈表轉紅黑樹的另一個條件

static final int MOVED     = -1;  // 表示正在擴容轉移 

static final int TREEBIN   = -2; // 表示已經轉換成樹 

static final int RESERVED  = -3; // hash for transient reservations 

static final int HASH_BITS = 0x7fffffff; // 獲得hash值的輔助參數

transient volatile Node<K,V>[] table;// 默認沒初始化的數組,用來保存元素 

private transient volatile Node<K,V>[] nextTable; // 轉移的時候用的數組 

static final int NCPU = Runtime.getRuntime().availableProcessors();// 獲取可用的CPU個數 

private transient volatile Node<K,V>[] nextTable; // 連接表,用于哈希表擴容,擴容完成后會被重置為 null 

private transient volatile long baseCount;保存著整個哈希表中存儲的所有的結點的個數總和,有點類似于 HashMap 的 size 屬性。private transient volatile int sizeCtl

負數:表示進行初始化或者擴容,-1:表示正在初始化,-N:表示有 N-1 個線程正在進行擴容 正數:0 表示還沒有被初始化,> 0的數:初始化或者是下一次進行擴容的閾值,有點類似HashMap中的threshold,不過功能「更強大」

?  
  1. 若干重要類
  • 構成每個元素的基本類     Node
      static class Node<K,V> implements Map.Entry<K,V> {
              final int hash;    // key的hash值
              final K key;       // key
              volatile V val;    // value
              volatile Node<K,V> next; 
               //表示鏈表中的下一個節點
      }
 
  • TreeNode繼承于Node,用來存儲紅黑樹節點
      static final class TreeNode<K,V> extends Node<K,V> {
              TreeNode<K,V> parent;  
              // 紅黑樹的父親節點
              TreeNode<K,V> left;
              // 左節點
              TreeNode<K,V> right;
             // 右節點
              TreeNode<K,V> prev;    
             // 前節點
              boolean red;
             // 是否為紅點
      }
 
  • ForwardingNode 在 Node 的子類     ForwardingNode 的構造方法中,可以看到此變量的hash =     「-1」 ,類中還存儲     nextTable的引用。該初始化方法只在     transfer方法被調用,如果一個類被設置成此種情況并且hash = -1 則說明該節點不需要resize了。
static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //注意這里
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
 //.....
}
 
  • TreeBin TreeBin從字面含義中可以理解為存儲樹形結構的容器,而樹形結構就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制.
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
}
   

構造函數

整體的構造情況基本跟HashMap類似,并且為了跟原來的JDK7中的兼容性還可以傳入并發度。不過JDK8中并發度已經有table的具體長度來控制了。

?  
  1. ConcurrentHashMap():創建一個帶有默認初始容量 (16)、加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
  2. ConcurrentHashMap(int):創建一個帶有指定初始容量      tableSizeFor、默認加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
  3. ConcurrentHashMap(Map<? extends K, ? extends V> m):構造一個與給定映射具有相同映射關系的新映射
  4. ConcurrentHashMap(int initialCapacity, float loadFactor):創建一個帶有指定初始容量、加載因子和默認 concurrencyLevel (1) 的新的空映射
  5. ConcurrentHashMap(int, float, int):創建一個帶有指定初始容量、加載因子和并發級別的新的空映射。
?  
 

put

假設table已經初始化完成,put操作采用 CAS + synchronized 實現并發插入或更新操作,具體實現如下。

?  
  1. 做一些邊界處理,然后獲得hash值。
  2. 沒初始化就初始化,初始化后看下對應的桶是否為空,為空就原子性的嘗試插入。
  3. 如果當前節點正在擴容還要去幫忙擴容,騷操作。
  4. 用      syn來加鎖當前節點,然后操作幾乎跟就跟hashmap一樣了。
?  

// Node 節點的 hash值在HashMap中存儲的就是hash值,在currenthashmap中可能有多種情況哦!
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key  null || value  null) throw new NullPointerException(); //邊界處理
    int hash = spread(key.hashCode());// 最終hash值計算
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //循環表
        Node<K,V> f; int n, i, fh;
        if (tab  null || (n = tab.length)  0)
            tab = initTable(); // 初始化表 如果為空,懶漢式
        else if ((f = tabAt(tab, i = (n - 1) & hash))  null) {
        // 如果對應桶位置為空
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 
                         // CAS 原子性的嘗試插入
                break;
        } 
        else if ((fh = f.hash)  MOVED) 
        // 如果當前節點正在擴容。還要幫著去擴容。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) { //  桶存在數據 加鎖操作進行處理
                if (tabAt(tab, i)  f) {
                    if (fh >= 0) { // 如果存儲的是鏈表 存儲的是節點的hash值
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 遍歷鏈表去查找,如果找到key一樣則選擇性
                            if (e.hash  hash &&
                                ((ek = e.key)  key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next)  null) {// 找到尾部插入
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {// 如果桶節點類型為TreeBin
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { 
                             // 嘗試紅黑樹插入,同時也要防止節點本來就有,選擇性覆蓋
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { // 如果鏈表數量
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //  鏈表轉紅黑樹哦!
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 統計大小 并且檢查是否要擴容。
    return null;
}
 

涉及到重要函數initTabletabAtcasTabAthelpTransferputTreeValtreeifyBinaddCount函數。

 

initTable

「只允許一個線程」對表進行初始化,如果不巧有其他線程進來了,那么會讓其他線程交出 CPU 等待下次系統調度Thread.yield。這樣,保證了表同時只會被一個線程初始化,對于table的大小,會根據sizeCtl的值進行設置,如果沒有設置szieCtl的值,那么默認生成的table大小為16,否則,會根據sizeCtl的大小設置table大小。

// 容器初始化 操作
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table)  null || tab.length  0) {
        if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在擴容。
            Thread.yield(); // 進行線程讓步等待
     // 讓掉當前線程 CPU 的時間片,使正在運行中的線程重新變成就緒狀態,并重新競爭 CPU 的調度權。
     // 它可能會獲取到,也有可能被其他線程獲取到。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 
          //  比較sizeCtl的值與sc是否相等,相等則用 -1 替換,這表明我這個線程在進行初始化了!
            try {
                if ((tab = table)  null || tab.length  0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默認為16
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2); // sc = 0.75n
                }
            } finally {
                sizeCtl = sc; //設置sizeCtl 類似threshold
            }
            break;
        }
    }
    return tab;
}
   

unsafe

ConcurrentHashMap中使用了unSafe方法,通過直接操作內存的方式來保證并發處理的安全性,使用的是硬件的安全機制。

 // 用來返回節點數組的指定位置的節點的原子操作
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// cas原子操作,在指定位置設定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子操作,在指定位置設定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 比較table數組下標為i的結點是否為c,若為c,則用v交換操作。否則,不進行交換操作。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
 

可以看到獲得table[i]數據是通過Unsafe對象通過反射獲取的,取數據直接table[index]不可以么,為什么要這么復雜?在java內存模型中,我們已經知道每個線程都有一個工作內存,里面存儲著table的「副本」,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接獲取指定內存的數據,「保證了每次拿到數據都是最新的」

 

helpTransfer

// 可能有多個線程在同時幫忙運行helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // table不是空  且 node節點是轉移類型,并且轉移類型的nextTable 不是空 說明還在擴容ing
        int rs = resizeStamp(tab.length); 
        // 根據 length 得到一個前16位的標識符,數組容量大小。
        // 確定新table指向沒有變,老table數據也沒變,并且此時 sizeCtl小于0 還在擴容ing
        while (nextTab  nextTable && table  tab && (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 || sc  rs + MAX_RESIZERS || transferIndex <= 0)
            // 1. sizeCtl 無符號右移16位獲得高16位如果不等 rs 標識符變了
            // 2. 如果擴容結束了 這里可以看 trePresize 函數第一次擴容操作:
            // 默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,
            // 就會將 sc 減一。這個時候,sc 就等于 rs + 1。
            // 3. 如果達到了最大幫助線程個數 65535個
            // 4. 如果轉移下標調整ing 擴容已經結束了
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            // 如果以上都不是, 將 sizeCtl + 1,增加一個線程來擴容
                transfer(tab, nextTab); // 進行轉移
                break;// 結束循環
            }
        }
        return nextTab;
    }
    return table;
}
 
  • Integer.numberOfLeadingZeros(n)
?  

該方法的作用是「返回無符號整型i的最高非零位前面的0的個數」,包括符號位在內;如果i為負數,這個方法將會返回0,符號位為1. 比如說,10的二進制表示為 0000 0000 0000 0000 0000 0000 0000 1010 java的整型長度為32位。那么這個方法返回的就是28

?  
  • resizeStamp 主要用來獲得標識符,可以簡單理解是對當前系統容量大小的一種監控。
static final int resizeStamp(int n) {
   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
   //RESIZE_STAMP_BITS = 16
}
   

addCount

主要就2件事:一是更新 baseCount,二是判斷是否需要擴容。

private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 // 首先如果沒有并發 此時countCells is null, 此時嘗試CAS設置數據值。
 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     // 如果 counterCells不為空以為此時有并發的設置 或者 CAS設置 baseCount 失敗了
     CounterCell a; long v; int m;
     boolean uncontended = true;
     if (as  null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m])  null ||
         !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
         // 1. 如果沒出現并發 此時計數盒子為 null
         // 2. 隨機取出一個數組位置發現為空
         // 3. 出現并發后修改這個cellvalue 失敗了
         // 執行funAddCount
         fullAddCount(x, uncontended);// 死循環操作
         return;
     }
     if (check <= 1)
         return;
     s = sumCount(); // 吧counterCells數組中的每一個數據進行累加給baseCount。
 }
 // 如果需要擴容
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
   int rs = resizeStamp(n);// 獲得高位標識符
   if (sc < 0) { // 是否需要幫忙去擴容
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
     sc  rs + MAX_RESIZERS || (nt = nextTable)  null || transferIndex <= 0)
     break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   } // 第一次擴容
   else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
   s = sumCount();
  }
 }
}
 
  1. baseCount添加     ConcurrentHashMap提供了     baseCount、     counterCells 兩個輔助變量和一個     CounterCell輔助內部類。sumCount() 就是迭代     counterCells來統計 sum 的過程。put 操作時,肯定會影響     size(),在     put() 方法最后會調用     addCount()方法。整體的思維方法跟LongAdder類似,用的思維就是借鑒的     ConcurrentHashMap。每一個     Cell都用Contended修飾來避免偽共享。
?  
  1. JDK1.7 和 JDK1.8 對 size 的計算是不一樣的。1.7 中是先不加鎖計算三次,如果三次結果不一樣在加鎖。
  2. JDK1.8 size 是通過對 baseCount 和 counterCell 進行 CAS 計算,最終通過 baseCount 和 遍歷 CounterCell 數組得出 size。
  3. JDK 8 推薦使用mappingCount 方法,因為這個方法的返回值是 long 類型,不會因為 size 方法是 int 類型限制最大值。
?  
  1. 關于擴容     ConcurrentHashMap有什么用在     addCount第一次擴容時候會有騷操作     sc=rs << RESIZE_STAMP_SHIFT) + 2)其中     rs = resizeStamp(n)。這里需要核心說一點,

ConcurrentHashMap有什么用如果不是第一次擴容則直接將低16位的數字 +1 即可。

 

putTreeVal

這個操作幾乎跟HashMap的操作完全一樣,核心思想就是一定要決定向左還是向右然后最終嘗試放置新數據,然后balance。不同點就是有鎖的考慮。

 

treeifyBin

這里的基本思路跟HashMap幾乎一樣,不同點就是先變成TreeNode,然后是「單向鏈表」串聯。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果整個table的數量小于64,就擴容至原來的一倍,不轉紅黑樹了
        //因為這個閾值擴容可以減少hash沖突,不必要去轉紅黑樹
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) { //鎖定當前桶
                if (tabAt(tab, index)  b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //遍歷這個鏈表然后將每個節點封裝成TreeNode,最終單鏈表串聯起來,
                        // 最終 調用setTabAt 放置紅黑樹
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl)  null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通過TreeBin對象對TreeNode轉換成紅黑樹
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
   

TreeBin

主要功能就是鏈表變化為紅黑樹,這個紅黑樹用TreeBin來包裝。并且要注意 轉成紅黑樹以后以前鏈表的結構信息還是有的,最終信息如下:

  1. TreeBin.first = 鏈表中第一個節點。
  2. TreeBin.root = 紅黑樹中的root節點。
TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);   
            //創建空節點 hash = -2 
            this.first = b;
            TreeNode<K,V> r = null; // root 節點
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r  null) {
                    x.parent = null;
                    x.red = false;
                    r = x; // root 節點設置為x 
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                   // x代表的是轉換為樹之前的順序遍歷到鏈表的位置的節點,r代表的是根節點
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc  null &&
                                  (kc = comparableClassFor(k))  null) ||
                                 (dir = compareComparables(kc, k, pk))  0)
                            dir = tieBreakOrder(k, pk);    
                            // 當key不可以比較,或者相等的時候采取的一種排序措施
                            TreeNode<K,V> xp = p;
                        // 放一定是放在葉子節點上,如果還沒找到葉子節點則進行循環往下找。
                        // 找到了目前葉子節點才會進入 再放置數據
                        if ((p = (dir <= 0) ? p.left : p.right)  null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x); 
                     // 每次插入一個元素的時候都調用 balanceInsertion 來保持紅黑樹的平衡
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }
   

tryPresize

當數組長度小于64的時候,擴張數組長度一倍,調用此函數。擴容后容量大小的核對,可能涉及到初始化容器大小。并且擴容的時候又跟2的次冪聯系上了!,其中初始化時候傳入map會調用putAll方法直接put一個map的話,在「putAll」方法中沒有調用initTable方法去初始化table,而是直接調用了tryPresize方法,所以這里需要做一個是不是需要初始化table的判斷。

PS:默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等于 rs + 1,這個時候說明擴容完畢了。

     /**
     * 擴容表為指可以容納指定個數的大小(總是2的N次方)
     * 假設原來的數組長度為16,則在調用tryPresize的時候,size參數的值為16<<1(32),此時sizeCtl的值為12
     * 計算出來c的值為64, 則要擴容到 sizeCtl ≥ c
     *  第一次擴容之后 數組長:32 sizeCtl:24
     *  第三次擴容之后 數組長:128  sizeCtl:96 退出
     */
    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1); // 合理范圍
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
                if (tab  null || (n = tab.length)  0) {
                // 初始化傳入map,今天putAll會直接調用這個。
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                //初始化tab的時候,把 sizeCtl 設為 -1
                    try {
                        if (table  tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
             // 初始化時候如果  數組容量<=sizeCtl 或 容量已經最大化了則退出
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;//退出擴張
            }
            else if (tab  table) {
                int rs = resizeStamp(n);

                if (sc < 0) { // sc = siztCtl 如果正在擴容Table的話,則幫助擴容
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
                        sc  rs + MAX_RESIZERS || (nt = nextTable)  null ||
                        transferIndex <= 0)
                        break; // 各種條件判斷是否需要加入擴容工作。
                     // 幫助轉移數據的線程數 + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                 // 沒有在初始化或擴容,則開始擴容
                 // 此處切記第一次擴容 直接 +2 
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                      (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }
   

transfer

這里代碼量比較大主要分文三部分,并且感覺思路很精髓,尤其「是其他線程幫著去擴容的騷操作」

  1. 主要是 單個線程能處理的最少桶結點個數的計算和一些屬性的初始化操作。
  2. 每個線程進來會先領取自己的任務區間     [bound,i],然后開始 --i 來遍歷自己的任務區間,對每個桶進行處理。如果遇到桶的頭結點是空的,那么使用     ForwardingNode標識舊table中該桶已經被處理完成了。如果遇到已經處理完成的桶,直接跳過進行下一個桶的處理。如果是正常的桶,對桶首節點加鎖,正常的遷移即可(跟HashMap第三部分一樣思路),遷移結束后依然會將原表的該位置標識位已經處理。

該函數中的finish= true 則說明整張表的遷移操作已經「全部」完成了,我們只需要重置 table的引用并將 nextTable 賦為空即可。否則,CAS 式的將 sizeCtl減一,表示當前線程已經完成了任務,退出擴容操作。如果退出成功,那么需要進一步判斷當前線程是否就是最后一個在執行擴容的。

f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;
 

第一次擴容時在addCount中有寫到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當前只有一個線程正在工作,「相對應的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,說明當前線程就是最后一個還在擴容的線程,那么會將 finishing 標識為 true,并在下一次循環中退出擴容方法。

  1. 幾乎跟     HashMap大致思路類似的遍歷鏈表/紅黑樹然后擴容操作。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用來控制不要占用太多CPU
        stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16 每個CPU處理最小長度個數

    if (nextTab  null) { // 新表格為空則直接新建二倍,別的輔助線程來幫忙擴容則不會進入此if條件
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // transferIndex 指向最后一個桶,方便從后向前遍歷
    }
    int nextn = nextTab.length; // 新表長度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 創建一個fwd節點,這個是用來控制并發的,當一個節點為空或已經被轉移之后,就設置為fwd節點
    boolean advance = true;    //是否繼續向前查找的標志位
    boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數組,看看有沒完成的沒
     // 第一部分
    // i 指向當前桶, bound 指向當前線程需要處理的桶結點的區間下限【bound,i】 這樣來跟線程劃分任務。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
       // 這個 while 循環的目的就是通過 --i 遍歷當前線程所分配到的桶結點
       // 一個桶一個桶的處理
        while (advance) {//  每一次成功處理操作都會將advance設置為true,然里來處理區間的上一個數據
            int nextIndex, nextBound;
            if (--i >= bound || finishing) { //通過此處進行任務區間的遍歷
                advance = false;
            }
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;// 任務分配完了
                advance = false;
            }
            // 更新 transferIndex
           // 為當前線程分配任務,處理的桶結點區間為(nextBound,nextIndex)
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
               // nextIndex本來等于末尾數字,
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 當前線程所有任務完成 
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 已經完成轉移 則直接賦值操作
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);    //設置sizeCtl為擴容后的0.75
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示當前線程任務完成。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { 
                // 判斷當前線程完成的線程是不是最后一個在擴容的,思路精髓
                        return;
                }
                finishing = advance = true;// 如果是則相應的設置參數
                i = n; 
            }
        }
        else if ((f = tabAt(tab, i))  null) // 數組中把null的元素設置為ForwardingNode節點(hash值為MOVED[-1])
            advance = casTabAt(tab, i, null, fwd); // 如果老節點數據是空的則直接進行CAS設置為fwd
        else if ((fh = f.hash)  MOVED) //已經是個fwd了,因為是多線程操作 可能別人已經給你弄好了,
            advance = true; // already processed
        else {
            synchronized (f) { //加鎖操作
                if (tabAt(tab, i)  f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //該節點的hash值大于等于0,說明是一個Node節點
                    // 關于鏈表的操作整體跟HashMap類似不過 感覺好像更擾一些。
                        int runBit = fh & n; // fh= f.hash first hash的意思,看第一個點 放老位置還是新位置
                        Node<K,V> lastRun = f;

                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;    //n的值為擴張前的數組的長度
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;//最后導致發生變化的節點
                            }
                        }
                        if (runBit  0) { //看最后一個變化點是新還是舊 舊
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; //看最后一個變化點是新還是舊 舊
                            ln = null;
                        }
                        /*
                         * 構造兩個鏈表,順序大部分和原來是反的,不過順序也有差異
                         * 分別放到原來的位置和新增加的長度的相同位置(i/n+i)
                         */
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n)  0)
                                    /*
                                     * 假設runBit的值為0,
                                     * 則第一次進入這個設置的時候相當于把舊的序列的最后一次發生hash變化的節點(該節點后面可能還有hash計算后同為0的節點)設置到舊的table的第一個hash計算后為0的節點下一個節點
                                     * 并且把自己返回,然后在下次進來的時候把它自己設置為后面節點的下一個節點
                                     */
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                    /*
                                     * 假設runBit的值不為0,
                                     * 則第一次進入這個設置的時候相當于把舊的序列的最后一次發生hash變化的節點(該節點后面可能還有hash計算后同不為0的節點)設置到舊的table的第一個hash計算后不為0的節點下一個節點
                                     * 并且把自己返回,然后在下次進來的時候把它自己設置為后面節點的下一個節點
                                     */
                                hn = new Node<K,V>(ph, pk, pv, hn);    
                        }
                        setTabAt(nextTab, i, ln);    
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 該節點hash值是個負數否則的話是一個樹節點
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null; // 舊 頭尾
                        TreeNode<K,V> hi = null, hiTail = null; //新頭圍
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n)  0) {
                                if ((p.prev = loTail)  null)
                                    lo = p;
                                else
                                    loTail.next = p; //舊頭尾設置
                                loTail = p;
                                ++lc;
                            }
                            else { // 新頭圍設置
                                if ((p.prev = hiTail)  null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                         //ln  如果老位置數字<=6 則要對老位置鏈表進行紅黑樹降級到鏈表,否則就看是否還需要對老位置數據進行新建紅黑樹
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd); //老表中i位置節點設置下
                        advance = true;
                    }
                }
            }
        }
    }
}
   

get

這個就很簡單了,獲得hash值,然后判斷存在與否,遍歷鏈表即可,注意get沒有任何鎖操作!

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 計算key的hash值
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不為空并且表的長度大于0并且key所在的桶不為空
            if ((eh = e.hash)  h) { // 表中的元素的hash值與key的hash值相等
                if ((ek = e.key)  key || (ek != null && key.equals(ek))) // 鍵相等
                    // 返回值
                    return e.val;
            }
            else if (eh < 0) // 是個TreeBin hash = -2 
                // 在紅黑樹中查找,因為紅黑樹中也保存這一個鏈表順序
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 對于結點hash值大于0的情況鏈表
                if (e.hash  h &&
                    ((ek = e.key)  key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
   

clear

關于清空也相對簡單 ,無非就是遍歷桶數組,然后通過CAS來置空。

public void clear() {
    long delta = 0L;
    int i = 0;
    Node<K,V>[] tab = table;
    while (tab != null && i < tab.length) {
        int fh;
        Node<K,V> f = tabAt(tab, i);
        if (f  null)
            ++i; //這個桶是空的直接跳過
        else if ((fh = f.hash)  MOVED) { // 這個桶的數據還在擴容中,要去擴容同時等待。
            tab = helpTransfer(tab, f);
            i = 0; // restart
        }
        else {
            synchronized (f) { // 真正的刪除
                if (tabAt(tab, i)  f) {
                    Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                        //循環到鏈表/者紅黑樹的尾部
                        while (p != null) {
                            --delta; // 記錄刪除了多少個
                            p = p.next;
                        } 
                        //利用CAS無鎖置null  
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        if (delta != 0L)
            addCount(delta, -1); //調整count
    }
   

end

ConcurrentHashMap是如果來做到「并發安全」,又是如何做到「高效」的并發的呢?

  1. 首先是讀操作,讀源碼發現get方法中根本沒有使用同步機制,也沒有使用unsafe方法,所以讀操作是支持并發操作的。

  2. 寫操作

  • . 數據擴容函數是     transfer,該方法的只有     addCount,     helpTransfer和     tryPresize這三個方法來調用。
  1. addCount是在當對數組進行操作,使得數組中存儲的元素個數發生了變化的時候會調用的方法。
  2. helpTransfer是在當一個線程要對table中元素進行操作的時候,如果檢測到節點的·hash·= MOVED 的時候,就會調用       helpTransfer方法,在       helpTransfer中再調用       transfer方法來幫助完成數組的擴容
?    
  1. tryPresize是在 treeIfybinputAll方法中調用, treeIfybin主要是在 put添加元素完之后,判斷該數組節點相關元素是不是已經超過8個的時候,如果超過則會調用這個方法來擴容數組或者把鏈表轉為樹。注意 putAll在初始化傳入一個大map的時候會調用。·
?

總結擴容情況發生:

?  
  1. 在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小于64的時候,才會觸發數組的擴容。
  2. 當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容
?  

3. 擴容時候是否可以進行讀寫。

?  

對于讀操作,因為是沒有加鎖的所以可以的. 對于寫操作,JDK8中已經將鎖的范圍細膩到table[i]l了,當在進行數組擴容的時候,如果當前節點還沒有被處理(也就是說還沒有設置為fwd節點),那就可以進行設置操作。如果該節點已經被處理了,則當前線程也會加入到擴容的操作中去。

?  
  1. 多個線程又是如何同步處理的 在     ConcurrentHashMap中,同步處理主要是通過     Synchronized和     unsafe的硬件級別原子性 這兩種方式來完成的。
?  
  1. 在取得sizeCtl跟某個位置的Node的時候,使用的都是      unsafe的方法,來達到并發安全的目的
  2. 當需要在某個位置設置節點的時候,則會通過      Synchronized的同步機制來鎖定該位置的節點。
  3. 在數組擴容的時候,則通過處理的      步長和      fwd節點來達到并發安全的目的,通過設置hash值為MOVED=-1。
  4. 當把某個位置的節點復制到擴張后的table的時候,也通過      Synchronized的同步機制來保證線程安全
?    

到此,相信大家對“ConcurrentHashMap有什么用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

榆社县| 东方市| 洞头县| 永吉县| 潞西市| 南投市| 资源县| 扶风县| 边坝县| 察隅县| 富民县| 连州市| 青阳县| 遵化市| 彭山县| 本溪| 大冶市| 健康| 阿荣旗| 赣州市| 进贤县| 仁化县| 中山市| 西峡县| 金堂县| 光山县| 舞钢市| 垦利县| 奉新县| 沭阳县| 曲阳县| 平罗县| 广东省| 遂昌县| 津南区| 绥江县| 宁阳县| 手机| 宁南县| 吕梁市| 德令哈市|