中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫

發布時間:2021-10-19 15:57:05 來源:億速云 閱讀:170 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

HashMap是一個線程不安全的類,在并發情況下會產生很多問題,詳情可以參考HashMap 源碼解析;HashTable是線程安全的類,但是它使用的是synchronized來保證線程安全,線程競爭激烈的情況下效率非常低下。在jdk1.5的時候引入了ConcurrentHashMap,這也是一個線程安全的類,它使用了分段鎖的技術來提升并發訪問效率。

HashTable容器在競爭激烈的并發環境下表現出效率低下的原因是所有訪問HashTable的 線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么 當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效提高并 發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。

在jdk1.7及以前ConcurrentHashMap采用Segment數組結構和HashEntry數組結構組成,之后采用的是和HashMap一樣的結構。

ConcurrentHashMap jdk1.7

結構圖

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫

采用Segment數組結構和HashEntry數組結構組成,Segment數組的大小就是ConcurrentHashMap的并發度。Segment繼承自ReentrantLock,所以他本身就是一個鎖。Segment數組一旦初始化后就不會再進行擴容,這也是jdk1.8去掉他的原因。Segment里面又包含了一個table數組,這個數組是可以擴容的。

如圖我們在定位數據的時候需要對key的hash值進行兩次尋址操作,第一次找到在Segment數組的位置,第二次找到在table數組中的位置。

Segment 類

// 直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖
static final class Segment<K,V> extends ReentrantLock implements Serializable { 
    ...
   // table數組  
   transient volatile HashEntry<K,V>[] table;

    // 一個Segment內的元素個數
    transient int count;

    // 擴容閾值
    transient int threshold;

    // 擴容因子
    final float loadFactor;

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
...

我們發現Segment直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖。所以Segment數組的長度大小直接影響了ConcurrentHashMap的并發度。還有每個Segment單獨維護了擴容閾值,擴容因子,所以每個Segment的擴容操作時完全獨立互不干擾的。

HashEntry 類

static final class HashEntry<K,V> {
    // 不可變
    final int hash;
    final K key;
    // volatile保證可見性,這樣我們在get操作時就不用加鎖了
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
...
}

構造函數

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //  參數校驗
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并發度控制,最大是65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 等于ssize從1向左移位的 次數
    int sshift = 0;
    int ssize = 1;
    // 找出最接近concurrencyLevel的2的n次冪的數值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 這里之所 以用32是因為ConcurrentHashMap里的hash()方法輸出的最大數是32位的
    this.segmentShift = 32 - sshift;
    // 散列運算的掩碼,等于ssize減1
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 里HashEntry數組的長度度,它等于initialCapacity除以ssize的倍數c,如果c大于1,就會取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 保證HashEntry數組大小一定是2的n次冪
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 初始化Segment數組,并實際只填充Segment數組的第0個元素。
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

通過代碼我們可以看出,在構造ConcurrentHashMap的時候我們就會完成以下件事情:

  1. 確認ConcurrentHashMap的并發度,也就是Segment數組長度,并保證它是2的n次冪

  2. 確認HashEntry數組的初始化長度,并保證它是2的n次冪

  3. 將Segment數組初始化好并且只填充第0個元素

put() 方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 先獲取key的hash值
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 2. 定位到Segment
        s = ensureSegment(j);
    // 3.調用Segment的put方法
    return s.put(key, hash, value, false);
}

主要流程是:

  1. 先獲取key的hash值

  2. 定位到Segment

  3. 調用Segment的put方法

hash() 方法

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash42((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

這個方法大致思路是:先拿到key的hashCode,然后對這個值進行再散列。

Segment.put() 方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 1. 加鎖
    HashEntry<K,V> node = tryLock() ? null :
            // scanAndLockForPut在沒有獲取到鎖的情況下,去查詢key是否存在,如果不存在就新建一個Node
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 確定元素在tabl數組上的位置
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 如果原來位置上有值并且key相同,那么直接替換原來的value
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                // 元素總數加一
                int c = count + 1;
                // 判斷是否需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

大致過程是:

  1. 加鎖

  2. 定位key在tabl數組上的索引位置index,獲取到頭結點

  3. 判斷是否有hash沖突

  4. 如果沒有沖突直接將新節點node添加到數組index索引位

  5. 如果有沖突,先判斷是否有相同key

  6. 有相同key直接替換對應node的value值

  7. 沒有添加新元素到鏈表尾部

  8. 解鎖

這里需要注意的是scanAndLockForPut方法,他在沒有獲取到鎖的時候不僅會通過自旋獲取鎖,還會做一些其他的查找或新增節點的工,以此來提升put性能。

Segment.scanAndLockForPut() 方法

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    //定位HashEntry數組位置,獲取第一個節點
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    //掃描次數,循環標記位
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        // 表示遍歷鏈表還沒有結束
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    //  完成新節點初始化
                    node = new HashEntry<K,V>(hash, key, value, null);
                // 完成鏈表的遍歷,還是沒有找到相同key的節點
                retries = 0;
            }
            // 有hash沖突,開始查找是否有相同的key
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        // 斷循環次數是否大于最大掃描次數
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋獲取鎖
            lock();
            break;
        }
        // 每間隔一次循環,檢查一次first節點是否改變
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            // 首節點有變動,更新first,重新掃描
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

scanAndLockForPut方法在當前線程獲取不到segment鎖的情況下,完成查找或新建節點的工作。當獲取到鎖后直接將該節點加入鏈表即可,提升了put操作的性能。大致過程:

  1. 定位key在HashEntry數組的索引位,并獲取第一個節點

  2. 嘗試獲取鎖,如果成功直接返回,否則進入自旋

  3. 判斷是否有hash沖突,沒有就直接完成新節點的初始化

  4. 有hash沖突,開始遍歷鏈表查找是否有相同key

  5. 如果沒找到相同key,那么就完成新節點的初始化

  6. 如果找到相同key,判斷循環次數是否大于最大掃描次數

  7. 如果循環次數是否大于最大掃描次數,就直接CAS拿鎖(阻塞式)

  8. 如果循環次數不大于最大掃描次數,判斷頭結點是否有變化

  9. 進入下次循環

Segment.rehash() 擴容方法

private void rehash(HashEntry<K,V> node) {
    // 復制老數組
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // table數組擴容2倍
    int newCapacity = oldCapacity << 1;
    // 擴容閾值也增加兩倍
    threshold = (int)(newCapacity * loadFactor);
    // 創建新數組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 計算新的掩碼
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算新的索引位
            int idx = e.hash & sizeMask;
            // 轉移數據
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新的節點加到對應索引位
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

在這里我們可以發現每次擴容是針對一個單獨的Segment的,在擴容完成之前中不會對擴容前的數組進行修改,這樣就可以保證get()不被擴容影響。大致過程是:

  1. 新建擴容后的數組,容量是原來的兩倍

  2. 遍歷擴容前的數組

  3. 通過e.hash & sizeMask;計算key新的索引位

  4. 轉移數據

  5. 將擴容后的數組指向成員變量table

get() 方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // 計算出Segment的索引位
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 以原子的方式獲取Segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 原子方式獲取HashEntry
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // key相同
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                // value是volatile所以可以不加鎖直接取值返回
                return e.value;
        }
    }
    return null;
}

我們可以看到get方法是沒有加鎖的,因為HashEntry的value和next屬性是volatile的,volatile直接保證了可見性,所以讀的時候可以不加鎖。Java中Unsafe類可以參考這篇博客。

size() 方法

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    // true表示size溢出32位(大于Integer.MAX_VALUE)
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // retries 如果retries等于2則對所有Segment加鎖
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            // 統計每個Segment元素個數
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 解鎖
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    // 如果size大于Integer.MAX_VALUE值則直接返貨Integer.MAX_VALUE
    return overflow ? Integer.MAX_VALUE : size;
}

size的核心思想是先進性兩次不加鎖統計,如果兩次的值一樣則直接返回,否則第三個統計的時候會將所有segment全部鎖定,再進行size統計,所以size()盡量少用。因為這是在并發情況下,size其他線程也會改變size大小,所以size()的返回值只能表示當前線程、當時的一個狀態,可以算其實是一個預估值。

isEmpty() 方法

public boolean isEmpty() {
    long sum = 0L;
    final Segment<K,V>[] segments = this.segments;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            // 只要有一個Segment的元素個數不為0則表示不為null
            if (seg.count != 0)
                return false;
            // 統計操作總數
            sum += seg.modCount;
        }
    }
    if (sum != 0L) { // recheck unless no modifications
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum -= seg.modCount;
            }
        }
        // 說明在統計過程中ConcurrentHashMap又被操作過,
        // 因為上面判斷了ConcurrentHashMap不可能會有元素,所以這里如果有操作一定是新增節點
        if (sum != 0L)
            return false;
    }
    return true;
}
  1. 先判斷Segment里面是否有元素,如果有直接返回,如果沒有則統計操作總數;

  2. 為了保證在統計過程中ConcurrentHashMap里面的元素沒有發生變化,再對所有的Segment的操作數做了統計;

  3. 最后 sum==0 表示ConcurrentHashMap里面確實沒有元素返回true,否則一定進行過新增元素返回false。

和size方法一樣這個方法也是一個若一致方法,最后的結果也是一個預估值。


ConcurrentHashMap jdk1.8

結構圖

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫

這個結構和HashMap一樣

核心屬性

//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//初始容量
private static final int DEFAULT_CAPACITY = 16;
//數組最大容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默認并發度,兼容1.7及之前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//加載/擴容因子,實際使用n - (n >>> 2)
private static final float LOAD_FACTOR = 0.75f;
//鏈表轉紅黑樹的節點數閥值
static final int TREEIFY_THRESHOLD = 8;
//紅黑樹轉鏈表的節點數閥值
static final int UNTREEIFY_THRESHOLD = 6;
//當數組長度還未超過64,優先數組的擴容,否則將鏈表轉為紅黑樹
static final int MIN_TREEIFY_CAPACITY = 64;
//擴容時任務的最小轉移節點數
private static final int MIN_TRANSFER_STRIDE = 16;
//sizeCtl中記錄stamp的位數
private static int RESIZE_STAMP_BITS = 16;
//幫助擴容的最大線程數
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//size在sizeCtl中的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

// ForwardingNode標記節點的hash值(表示正在擴容)
static final int MOVED     = -1; // hash for forwarding nodes
// TreeBin節點的hash值,它是對應桶的根節點
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

//存放Node元素的數組,在第一次插入數據時初始化
transient volatile Node<K,V>[] table;
//一個過渡的table表,只有在擴容的時候才會使用
private transient volatile Node<K,V>[] nextTable;
//基礎計數器值(size = baseCount + CounterCell[i].value)
private transient volatile long baseCount;
/**
 * 控制table數組的初始化和擴容,不同的值有不同的含義:
 * -1:表示正在初始化
 * -n:表示正在擴容
 * 0:表示還未初始化,默認值
 * 大于0:表示下一次擴容的閾值
 */
private transient volatile int sizeCtl;
//節點轉移時下一個需要轉移的table索引
private transient volatile int transferIndex;
//元素變化時用于控制自旋
private transient volatile int cellsBusy;
// 保存table中的每個節點的元素個數 長度是2的冪次方,初始化是2,每次擴容為原來的2倍
// size = baseCount + CounterCell[i].value
private transient volatile CounterCell[] counterCells;

其他的參考HashMap 源碼解析

Node 類

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
...

鏈表節點,保存著key和value的值。

TreeNode類

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
...

紅黑樹節點,包含了樹的信息。

TreeNode類

TreeBins中使用的節點

static final class TreeBin<K,V> extends Node<K,V> {
	TreeNode<K,V> root;
	volatile TreeNode<K,V> first;
	// 鎖的持有者
	volatile Thread waiter;
	// 鎖狀態
	volatile int lockState;
	// values for lockState
	// 表示持有寫鎖
	static final int WRITER = 1; // set while holding write lock
	// 表示等待
	static final int WAITER = 2; // set when waiting for write lock
	// 表示讀鎖的增量值
	static final int READER = 4; // increment value for setting read lock
...

與HashMap有點區別的是,他不直接使用TreeNode作為數的根節點,而是使用TreeBins對其做了裝飾后成為了根節點;同時它還記錄了鎖的狀態;需要注意的是:

  1. TreeBins節點的hash值是 -2

  2. 我們對紅黑樹添加節點后,紅黑樹的根節點有可能會因為旋轉而發生變化,所以我們在添加樹節點的時候在putTreeVal()方法里面我們使用cas在加了一次鎖。

ForwardingNode 類

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
	final Node<K,V>[] nextTable;
	ForwardingNode(Node<K,V>[] tab) {
		super(MOVED, null, null, null);
		this.nextTable = tab;
	}

	Node<K,V> find(int h, Object k) {
		// loop to avoid arbitrarily deep recursion on forwarding nodes
		outer: for (Node<K,V>[] tab = nextTable;;) {
			Node<K,V> e; int n;
			// 1. 判斷新的數組是否是null,
			// 2. 如果不為NULL給那就找到對應索引位上的頭結點
			// 3. 判斷頭節點是否為NULL
			if (k == null || tab == null || (n = tab.length) == 0 ||
				(e = tabAt(tab, (n - 1) & h)) == null)
				return null;
			// 自旋找節點
			for (;;) {
				int eh; K ek;
				if ((eh = e.hash) == h &&
					((ek = e.key) == k || (ek != null && k.equals(ek))))
					return e;
				if (eh < 0) {
					// 如果又變成了ForwardingNode標記節點,那說明有發生了擴容,需要跳出循環從新查找
					if (e instanceof ForwardingNode) {
						tab = ((ForwardingNode<K,V>)e).nextTable;
						continue outer;
					}
					else
						return e.find(h, k);
				}
				if ((e = e.next) == null)
					return null;
			}
		}
	}
}

ForwardingNode 節點是一個擴容標記節點,只要在數組上發現對應索引位上是ForwardingNode 節點時,表示正在擴容。當get方法調用時,如果遇到ForwardingNode 節點,那么它將會到擴容后的數據上查找數據,否則還是在擴容前的數組上查找數據。這個要注意兩點:

  1. 這個節點的hash值是 -1

  2. 這個節點的find方法是在對擴容后的數組進行查找

構造函數

public ConcurrentHashMap18() {
}

與HashMap一樣,構造函數啥都沒干,初始化操作是在第一次put完成的。

put() 方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

啥都么有

spread() 方法

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

計算key.hashCode()并將更高位的散列擴展(XOR)降低。采用位運算主要是是加快計算速度。

putVal() 方法

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
	if (key == null || value == null) throw new NullPointerException();
	// 計算hash值
	int hash = spread(key.hashCode());
	int binCount = 0;
	for (Node<K,V>[] tab = table;;) {
		Node<K,V> f; int n, i, fh;
		// 判斷是否需要初始化
		if (tab == null || (n = tab.length) == 0)
			tab = initTable();
		// 找出key對應的索引位上的第一個節點
		else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
			// 如果該索引位為null,則直接將數據放到該索引位
			if (casTabAt(tab, i, null,
						 new Node<K,V>(hash, key, value, null)))
				break;                   // no lock when adding to empty bin
		}
		// 正在擴容
		else if ((fh = f.hash) == MOVED)
			tab = helpTransfer(tab, f);
		else {
			V oldVal = null;
			// 加內置鎖鎖定一個數組的索引位,并添加節點
			synchronized (f) {
				if (tabAt(tab, i) == f) {
					// 表示鏈表節點
					if (fh >= 0) {
						binCount = 1;
						for (Node<K,V> e = f;; ++binCount) {
							K ek;
							// key相同直接替換value值
							if (e.hash == hash &&
								((ek = e.key) == key ||
								 (ek != null && key.equals(ek)))) {
								oldVal = e.val;
								if (!onlyIfAbsent)
									e.val = value;
								break;
							}
							// 將新節點添加到鏈表尾部
							Node<K,V> pred = e;
							if ((e = e.next) == null) {
								pred.next = new Node<K,V>(hash, key,
														  value, null);
								break;
							}
						}
					}
					// 表示樹節點
					else if (f instanceof TreeBin) {
						Node<K,V> p;
						binCount = 2;
						// 添加數節點
						if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
													   value)) != null) {
							oldVal = p.val;
							if (!onlyIfAbsent)
								p.val = value;
						}
					}
				}
			}
			if (binCount != 0) {
				// 嘗試將鏈表轉換成紅黑樹
				if (binCount >= TREEIFY_THRESHOLD)
					treeifyBin(tab, i);
				if (oldVal != null)
					return oldVal;
				break;
			}
		}
	}
	addCount(1L, binCount);
	return null;
}

主要流程:

  1. 計算key的hash值

  2. 判斷是否需要初始化,如果是則調用initTable() 方法完成初始化

  3. 判斷是否有hash沖突,如沒有直接設置新節點到對飲索引位,如果有獲取頭結點

  4. 根據頭結點的hash值判斷是否正在擴容,如果是則幫助擴容

  5. 如果沒有擴容則對頭結點加鎖,添加新節點

  6. fh >= 0根據頭結點hash值判斷是否是鏈表節點,如果是新增鏈表節點,否則新增樹節點

  7. 新增樹節點putTreeVal()需要注意,紅黑樹的根節點有可能會因為旋轉而發生變化,所以我們在添加節點的時候還需要對根節點使用cas在加了一次鎖。

  8. 判斷是否需要嘗試由鏈表轉換成樹結構

  9. addCount(1L, binCount);新增count數,并判斷是否需要擴容或者幫助擴容

sizeCtl值含義:

  • -1:表示正在初始化

  • -n:表示正在擴容

  • 0:表示還未初始化,默認值

  • 大于0:表示下一次擴容的閾值

initTable() 初始化方法

private final Node<K,V>[] initTable() {
	Node<K,V>[] tab; int sc;
	while ((tab = table) == null || tab.length == 0) {
		// 正在初始化
		if ((sc = sizeCtl) < 0)
			// 讓出CPU執行權,然后自旋
			Thread.yield(); // lost initialization race; just spin
		// CAS替換標志位(相當于獲取鎖)
		else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
			try {
				// 二次判斷
				if ((tab = table) == null || tab.length == 0) {
					int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
					@SuppressWarnings("unchecked")
					Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
					table = tab = nt;
					// 相當于sc=n*3/4
					sc = n - (n >>> 2); 
				}
			} finally {
				// 擴容閾值
				sizeCtl = sc;
			}
			break;
		}
	}
	return tab;
}

主要過程:

  1. 根據sizeCtl判斷是否正在初始化

  2. 如果其他線程正在初始化就讓出CPU執行權,進入下一次CPU執行權的競爭Thread.yield();

  3. 如果沒有進行初始化的線程則,CAS替換sizeCtl標志位(相當于獲取鎖)

  4. 獲取到鎖后再次判斷是否初始化

  5. 如果沒有則初始化Node數組,并設置sizeCtl值為下一次擴容閾值

helpTransfer()幫助擴容

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
	Node<K,V>[] nextTab; int sc;
	// ForwardingNode標記節點,表示正在擴容
	if (tab != null && (f instanceof ForwardingNode) &&
		(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
		int rs = resizeStamp(tab.length);
		while (nextTab == nextTable && table == tab &&
			   (sc = sizeCtl) < 0) {
			if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
				sc == rs + MAX_RESIZERS || transferIndex <= 0)
				break;
			if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
				transfer(tab, nextTab);
				break;
			}
		}
		return nextTab;
	}
	return table;
}

判斷是否正在擴容,如果是就幫助擴容。

transfer() 擴容方法

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {\
	// n原來數組長度
	int n = tab.length, stride;
	if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
		stride = MIN_TRANSFER_STRIDE; // subdivide range
	// 判斷是發起擴容的線程還是幫助擴容的線程,如果是發起擴容的需要初始化新數組
	if (nextTab == null) {            // initiating
		try {
			@SuppressWarnings("unchecked")
			Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
			nextTab = nt;
		} catch (Throwable ex) {      // try to cope with OOME
			sizeCtl = Integer.MAX_VALUE;
			return;
		}
		nextTable = nextTab;
		transferIndex = n;
	}
	int nextn = nextTab.length;
	// 擴容期間的數據節點(用于標志位,hash值是-1)
	ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
	// 當advance == true時,表明該節點已經處理過了
	boolean advance = true;
	// 在擴容完成之前保證get不被影響
	boolean finishing = false; // to ensure sweep before committing nextTab
	// 1. 從右往左找到第一個有數據的索引位節點(有hash沖突的桶)
	// 2. 如果找到的節點是NULL節點(沒有hash沖突的節點),那么將該索引位的NULL替換成ForwardingNode標記節點,這個節點的hash是-1
	// 3. 如果找到不為NULL的節點(有hash沖突的桶),則對這個節點進行加鎖
	// 4. 開始進進移動節點數據
	for (int i = 0, bound = 0;;) {
		//f:當前處理i位置的node(頭結點或者根節點);
		Node<K,V> f; int fh;
		// 通過while循環獲取本次需要移動的節點索引i
		while (advance) {
			// nextIndex:下一個要處理的節點索引; nextBound:下一個需要處理的節點的索引邊界
			int nextIndex, nextBound;
			// i是老數組索引位,通過--i來講索引位往前一個索引位移動,直到0索引位
			if (--i >= bound || finishing)
				advance = false;
			// 節點已全部轉移
			else if ((nextIndex = transferIndex) <= 0) {
				i = -1;
				advance = false;
			}
			// transferIndex(初值為最后一個節點的索引),表示從transferIndex開始后面所有的節點都已分配,
			// 每次線程領取擴容任務后,需要更新transferIndex的值(transferIndex-stride)。
			// CAS修改transferIndex,并更新索引邊界
			else if (U.compareAndSwapInt
					 (this, TRANSFERINDEX, nextIndex,
					  nextBound = (nextIndex > stride ?
								   nextIndex - stride : 0))) {
				bound = nextBound;
				// 老數組最后一個索引位置
				i = nextIndex - 1;
				advance = false;
			}
		}
		if (i < 0 || i >= n || i + n >= nextn) {
			int sc;
			// 已經完成所有節點復制了
			if (finishing) {
				nextTable = null;
				table = nextTab;
				// sizeCtl閾值為原來的1.5倍
				sizeCtl = (n << 1) - (n >>> 1);
				// 結束自旋
				return;
			}
			// CAS 更新擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作
			if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
				if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
					return;
				finishing = advance = true;
				i = n; // recheck before commit
			}
		}
		// 將以前老數組上為NULL的節點(還沒有元素的桶或者說成沒有hash沖突的數據節點),用ForwardingNode標記節點補齊
		// 主要作用是:其他線程在put元素,發現找到的索引位是fwd節點則表示正在擴容,那么該線程會來幫助擴通,而不是在那里等待
		else if ((f = tabAt(tab, i)) == null)
			advance = casTabAt(tab, i, null, fwd);
		// 表示處理過該節點了
		else if ((fh = f.hash) == MOVED)
			advance = true; // already processed
		else {
			// 對應索引位加鎖
			synchronized (f) {
				// 再次校驗一下老數組對應索引位節點是否是我們找到的節點f
				if (tabAt(tab, i) == f) {
					// 低索引位頭節點(i位), 高位索引位頭節點(i+tab.length)
					Node<K,V> ln, hn;
					// fh >=0 表示鏈表節點,TreeBin節點的hash值-2
					if (fh >= 0) {
						// fh & n算法可以算出新的節點該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn),
						// runBit表示鏈表中最后一個元素的hash值&n的值
						int runBit = fh & n;
						// lastRun表示鏈表中最后一個元素
						Node<K,V> lastRun = f;
						// 找到鏈表中最后一個節點,并賦值給lastRun
						for (Node<K,V> p = f.next; p != null; p = p.next) {
							int b = p.hash & n;
							if (b != runBit) {
								runBit = b;
								lastRun = p;
							}
						}
						// 判斷原來的最后一個節點應該添加到高位還是低位
						if (runBit == 0) {
							ln = lastRun;
							hn = null;
						}
						else {
							hn = lastRun;
							ln = null;
						}
						// f表示頭結點,如果p不是尾節點,則轉移節點
						// 如果以前節點順序是 1 2 3 4 轉移后就是 3 2 1 4 
						for (Node<K,V> p = f; p != lastRun; p = p.next) {
							int ph = p.hash; K pk = p.key; V pv = p.val;
							if ((ph & n) == 0)
								// 轉移節點時都是新建節點,以免破壞原來數組結構影響get方法
								ln = new Node<K,V>(ph, pk, pv, ln);
							else
								hn = new Node<K,V>(ph, pk, pv, hn);
						}
						// 設置新數組低索引位頭節點(i位)
						setTabAt(nextTab, i, ln);
						// 設置新數組高位索引位頭節點(i+tab.length)
						setTabAt(nextTab, i + n, hn);
						// 設置老數組i位為標記節點,表示已經處理過了
						setTabAt(tab, i, fwd);
						advance = true;
					}
					else if (f instanceof TreeBin) {
						TreeBin<K,V> t = (TreeBin<K,V>)f;
						TreeNode<K,V> lo = null, loTail = null;
						TreeNode<K,V> hi = null, hiTail = null;
						int lc = 0, hc = 0;
						for (Node<K,V> e = t.first; e != null; e = e.next) {
							int h = e.hash;
							TreeNode<K,V> p = new TreeNode<K,V>
								(h, e.key, e.val, null, null);
							if ((h & n) == 0) {
								if ((p.prev = loTail) == null)
									lo = p;
								else
									loTail.next = p;
								loTail = p;
								++lc;
							}
							else {
								if ((p.prev = hiTail) == null)
									hi = p;
								else
									hiTail.next = p;
								hiTail = p;
								++hc;
							}
						}
						ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
							(hc != 0) ? new TreeBin<K,V>(lo) : t;
						hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
							(lc != 0) ? new TreeBin<K,V>(hi) : t;
						// 設置新數組低索引位頭節點(i位)
						setTabAt(nextTab, i, ln);
						// 設置新數組高位索引位頭節點(i+tab.length)
						setTabAt(nextTab, i + n, hn);
						// 設置老數組i位為標記節點,表示已經處理過了
						setTabAt(tab, i, fwd);
						advance = true;
					}
				}
			}
		}
	}
}

主要過程:

  1. tab為擴容前的數組

  2. 判斷是否是第一個發起擴容的線程,如果是需要初始化擴容后的數組nextTable

  3. fwd = new ForwardingNode<K,V>(nextTab)初始化擴容標記節點

  4. 進入擴容循環

  5. 在擴容前的數組tab上從右往左(從高索引位到低索引位)遍歷所有頭結點,索引位為i

  6. 如果找到的頭結點是NULL(沒有hash沖突)則tab[i]=fwd

  7. 找到的頭結點不為NULL則(有hash沖突)則鎖定頭結點synchronized (f)

  8. 再次校驗頭結點是否發生改變,如果改變直接結束

  9. 初始化高索引位和第索引位的頭結點

  10. 移動節點到相應索引位

  11. 設置擴容后的數組低索引位頭節點(i位)

  12. 設置擴容后的數組高位索引位頭節點(i+tab.length)

  13. 設置擴容前的數組i位為標記節點(tab[i]=fwd),表示已經處理過了

  14. 進入第3步直到完成

注意:

  • 第5點有tab[i]=fwd有兩層含義:1,表示對應索引位已經處理過了;2,當其他線程拿到該頭結點的時候能知曉正在擴容,這時在put的時候幫助擴容,在get的時候去擴容后的數組上找相應的key

  • int runBit = fh & n;算法可以算出新的節點該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn)

  • 如果是鏈表節點,以前節點順序是 1 2 3 4 擴容后會變成 3 2 1 4

擴容的大致過程圖解:

  1. 發起擴容,擴容前數組tab

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫

  1. 在擴容前的數組tab上從右往左(從高索引位到低索引位)遍歷所有頭結點,索引位為i,如果找到的頭結點是NULL則直接賦值成````fwd``` 標記節點。

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫 3. 擴容前的數組上找到不為NULL的節點,則還是移動節點到擴容后的額數組

JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫

addCount() 方法

private final void addCount(long x, int check) {
	// CounterCell[] as;使用計數器數組因該是為了提升并發量,減小沖突概率
	CounterCell[] as; long b, s;
	// 計數器表不為NULL(counterCells當修改baseCount有沖突時,需要將size增量放到這個計數器數組里面)
	if ((as = counterCells) != null ||
		// 使用CAS更新baseCount的值(+1)如果失敗說明存在競爭
		!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
		CounterCell a; long v; int m;
		// CounterCell是否存在競爭的標記位
		boolean uncontended = true;
		// CounterCell[] as為NULL表示as沒有競爭
		if (as == null || (m = as.length - 1) < 0 ||
			// 隨機一個數組位置來驗證是否為NULL,如果a是null表示沒有競爭,隨機也是為了減小沖突概率
			(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
			// CAS替換a的value,如果失敗表示存在競爭
			!(uncontended =
			  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
			// 將size增量值存到as上
			fullAddCount(x, uncontended);
			return;
		}
		if (check <= 1)
			return;
		// 統計size
		s = sumCount();
	}
	// 檢查是否需要擴容
	if (check >= 0) {
		Node<K,V>[] tab, nt; int n, sc;
		// size大于閾值sizeCtl,tab數組長度小于最大值1<<30
		while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
			   (n = tab.length) < MAXIMUM_CAPACITY) {
			int rs = resizeStamp(n);
			// 表示正在擴容
			if (sc < 0) {
				if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
					sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
					transferIndex <= 0)
					break;
				if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
					// 幫助擴容
					transfer(tab, nt);
			}
			// sc = (rs << RESIZE_STAMP_SHIFT) + 2,移位后是負數
			else if (U.compareAndSwapInt(this, SIZECTL, sc,
										 (rs << RESIZE_STAMP_SHIFT) + 2))
				// 發起擴容,此時nextTable=null
				transfer(tab, null);
			s = sumCount();
		}
	}
}

put()方法執行最后會對當前Map的size+1,ConcurrentHashMap中size由baseCountCounterCell[] as組成,size=baseCount+as[i].value。addCount的大致過程如下:

  1. CAS替換baseCount值,如果失敗說明對size的增量(size++)存在競爭

  2. 如果存在競爭,我們會使用到CounterCell[] as數組

  3. as[ThreadLocalRandom.getProbe() & m]隨機取一個索引位,使用CAS完成size++

  4. 如果as[i]也存在競爭會調用fullAddCount(x, uncontended);方法完成size++

  5. size++完成后通過size=baseCount+as[i].value公式計算出元素總數

  6. 判斷是否需要擴容

  7. 如果需要擴容,在判斷一下是幫助擴容還是發起擴容

注意:

  • CounterCell[] as:這個的只要目的是分散對baseCount的單一競爭,提示size++的并發率,這里和table數組一樣使用了鎖分離技術,as的長度也是2的n次方,初始長度是2

  • 在第三步中使用隨機數也是為了提升并發效率,ThreadLocalRandom類是JDK7在JUC包下新增的隨機數生成器,它解決了Random類在多線程下,多個線程競爭內部唯一的原子性種子變量,而導致大量線程自旋重試的不足

  • fullAddCount(x, uncontended);方法里面完成了as的初始化和擴容

  • 元素總數的計算公式是size=baseCount+as[i].value

sumCount() 方法

final long sumCount() {
	CounterCell[] as = counterCells; CounterCell a;
	long sum = baseCount;
	if (as != null) {
		for (int i = 0; i < as.length; ++i) {
			if ((a = as[i]) != null)
				sum += a.value;
		}
	}
	return sum;
}

元素總數的計算公式是size=baseCount+as[i].value

get() 方法

public V get(Object key) {
	Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
	int h = spread(key.hashCode());
	// table 不為NULL并且對飲索引位不為NULL
	if ((tab = table) != null && (n = tab.length) > 0 &&
		(e = tabAt(tab, (n - 1) & h)) != null) {
		if ((eh = e.hash) == h) {
			// 頭節點key相同
			if ((ek = e.key) == key || (ek != null && key.equals(ek)))
				return e.val;
		}
		// 樹節點或者ForwardingNode標記節點
		else if (eh < 0)
			return (p = e.find(h, key)) != null ? p.val : null;
		// 鏈表節點
		while ((e = e.next) != null) {
			// key相同
			if (e.hash == h &&
				((ek = e.key) == key || (ek != null && key.equals(ek))))
				return e.val;
		}
	}
	return null;
}

主要流程:

  1. 判斷table和key對應索引位是否為NULL

  2. 判斷頭節點是否是要找的節點

  3. eh < 0表示是樹節點或ForwardingNode標記節點,直接通過find方法找對應的key

  4. 否則是鏈表節點,挨個鏈表節點找相應的key

  5. 返回結果

注意:

  • get 方法沒有加鎖,原因是節點的value是volatile的,已經保證了可見性,只要value有更新,那么我們一定能讀到最新數據。

  • e.find(h, key)這里:如果對應索引位頭結點是ForwardingNode節點,那么會直接去擴通后的數組找對應的key,可以參見上面ForwardingNode.find()方法

size()方法

public int size() {
	long n = sumCount();
	return ((n < 0L) ? 0 :
			(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
			(int)n);
}

弱一致性

get方法和containsKey方法都是遍歷對應索引位上所有節點,來判斷是否存在key相同的節點以及獲得該節點的value。但由于遍歷過程中其他線程可能對鏈表結構做了調整,因此get和containsKey返回的可能是過時的數據,這一點是ConcurrentHashMap在弱一致性上的體現。

JDK1.8與JDK1.7的不同點

  • 去掉了Segment 數組:這樣做鎖的粒度更小,減少了并發沖突的概率;查找數據時不用計算兩次hash;

  • 存儲數據是采用了鏈表+紅黑樹的形式:當一個桶內數據量很大的時候,紅黑樹的查詢效率遠高于鏈表。

  • 1.8直接使用了內置鎖synchronized:簡化了加鎖操作

  • 1.8的初始化是在第一次put時完成的,1.7的時候再構造的時候完成的

  • 在put過程中當發現正在擴容,1.8的線程會幫助擴容,1.7的只是會檢查key是否存在或者完成新節點的初始化工作

  • 1.8的hash值計算更簡單了

  • 1.8擴容過程中會修改擴容前的數組,1.7擴容過程中不會修改原來數組

  • 1.8在get()時如果判斷到當前索引位正在擴容,那么直接在擴容后的數組中去找對應的key

  • 1.7的size計算使用的三次計算的方式,1.8使用了鎖分離技術

測試代碼

package com.xiaolyuh;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author yuhao.wang3
 * @since 2019/7/26 17:58
 */
public class HashMapTest {
    public static void main(String[] args) {
        AtomicInteger integer = new AtomicInteger();
        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(50);
        Map<User, Integer> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 10000; i++) {
            cachedThreadPool.execute(() -> {
                User user = new User(1);
                map.put(user, 1);
            });
        }
    }

    static class User {
        int age;

        public User(int age) {
            this.age = age;
        }

        @Override
        public int hashCode() {
            return age;
        }

        @Override
        public String toString() {
            return "" + age;
        }
    }
}

layering-cache

為監控而生的多級緩存框架 layering-cache這是我開源的一個多級緩存框架的實現

上述就是小編為大家分享的JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平利县| 舟山市| 古交市| 古蔺县| 灵川县| 齐河县| 离岛区| 三穗县| 天门市| 磐安县| 无极县| 封丘县| 通榆县| 且末县| 黄冈市| 汾阳市| 封开县| 汉阴县| 四子王旗| 梓潼县| 广西| 教育| 宝应县| 思南县| 边坝县| 治多县| 泽州县| 塔河县| 吉林市| 秀山| 德阳市| 巴塘县| 寿光市| 克什克腾旗| 财经| 建水县| 松阳县| 大荔县| 垫江县| 阳城县| 海阳市|