AnthonyZero's Bolg

ConcurrentHashMap实现原理

简介

在并发编程中使用HashMap时可能导致程序死循环,因为HashMap并不是线程安全的。而使用线程安全的HashTable效率又非常低下,该类基本上所有的方法都采用synchronized进行线程安全的控制,可想而知,在高并发的情况下,每次只有一个线程能够获取对象监视器锁,这样的并发性能的确不令人满意。基于以上两个原因,便有了ConcurrentHashMap的登场机会。

HashMap线程不安全的原因

HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形链表,Entry的next节点永远不为空,就会产生死循环获取Entry.
// resize的核心内容
// 将原来的记录重新计算在新桶的位置,然后迁移过去
void transfer(Entry[] newTable, boolean rehash) {  
    int newCapacity = newTable.length;  
    //oldTable里循环元素放到NewTable中
    for (Entry<K,V> e : table) {  

        while(null != e) {  
            Entry<K,V> next = e.next;           
            if (rehash) {  
                e.hash = null == e.key ? 0 : hash(e.key);  
            }  
            int i = indexFor(e.hash, newCapacity);   
            e.next = newTable[i];  
            newTable[i] = e;  
            e = next;  
        } 
    }  
}
  1. HashMap在插入元素过多的时候需要进行Resize(先插入再扩容),Resize的条件是HashMap.Size >= Capacity * LoadFactor。
  2. HashMap的Resize包含扩容和ReHash两个步骤,ReHash在并发的情况下可能会形成链表环。

JDK1.7 实现

Alt text Alt text
如图所示,是由 Segment 数组、HashEntry 数组组成,和 HashMap 一样,仍然是数组加链表组成

ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

CurrentHashMap包含一个Segment数组,每个Segment包含一个HashEntry数组并且守护它,当修改HashEntry数组数据时,需要先获取它对应的Segment锁;而HashEntry数组采用开链法处理冲突,所以它的每个HashEntry元素又是链表结构的元素。

Segment定位

ConcurrentHashMap使用分段锁segment来保护数据,也就是说,在插入和读取元素,需要先通过hash算法定位segment。ConcurrentHashMap使用了变种hash算法对元素的hashCode再散列。

再散列的目的是为了减少冲突,让元素可以近似均匀的分布在不同的Segment上,从而提升存储效率.

Get操作

ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。(除非读到的值是空才会加锁重读)

只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值

Put操作

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 计算 key 的 hash 值
    int hash = hash(key);
    // 2. 根据 hash 值找到 Segment 数组中的位置 j
    //    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    //    然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值
    return s.put(key, hash, value, false);
}

虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理。

首先也是通过 Key 的 Hash 定位到具体的 Segment,在 put 之前会进行一次扩容校验。这里比 HashMap 要好的一点是:HashMap 是插入元素之后再看是否需要扩容,有可能扩容之后后续就没有插入就浪费了本次扩容(扩容非常消耗性能)。

而 ConcurrentHashMap 不一样,它是在将数据插入之前检查是否需要扩容,之后再做插入操作

ConcurrentHashMap不会对整个容器扩容(segment数组不会扩容),扩容是 segment 数组某个位置内部的数组 HashEntry[] 进行扩容,扩容后,容量为原来的 2 倍.

Size操作

每个 Segment 都有一个 volatile 修饰的全局变量 count ,求整个 ConcurrentHashMap 的 size 时很明显就是将所有的 count 累加即可。但是 volatile 修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。

但如果每次调用 size 方法将其余的修改操作加锁效率也很低。所以做法是先尝试两次将 count 累加,如果容器的 count 发生了变化再加锁来统计 size。

至于 ConcurrentHashMap 是如何知道在统计时大小发生了变化呢,每个 Segment 都有一个 modCount 变量,每当进行一次 put remove 等操作,modCount 将会 +1。只要 modCount 发生了变化就认为容器的大小也在发生变化。

JDK1.8 实现

JDK 1.7 使用分段锁机制来实现并发更新操作,核心类为 Segment,它继承自重入锁 ReentrantLock,并发度与 Segment 数量相等。

JDK 1.8 使用了 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized。并且 JDK 1.8 的实现也在链表过长时会转换为红黑树。
Alt text

关键属性

  1. volatile Node<K,V>[] table:装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方。
  2. volatile Node<K,V>[] nextTable: 扩容时使用,平时为null,只有在扩容的时候才为非null
  3. sizeCtl: 默认为0,用来控制table的初始化和扩容操作.它的数值有以下含义
    • -1 :代表table正在初始化,其他线程应该交出CPU时间片,退出
    • -N: 表示正有N-1个线程执行扩容操作
    • 大于0: 如果table已经初始化,代表table容量,默认为table大小的0.75,如果还未初始化,代表需要初始化的大小
  4. sun.misc.Unsafe U:用了CAS算法保证了线程安全性,这是一种乐观策略
  5. Node 保存键值对的节点
static class Node<K,V> implements Map.Entry<K,V> {
     final int hash;
     final K key;
     volatile V val;
     volatile Node<K,V> next;
         ......
}

Table初始化

初始化table的工作将发生在进行put操作时,如果发现table还没有被初始化,那么就会调用方法initTable来进行table的初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

在initTable方法中我们可以看到,当线程发现sizeCtl小于0的时候,他就会让出CPU时间,而稍后再进行尝试,当发现sizeCtl不再小于0的时候,就会通过调用方法compareAndSwapInt来讲sizeCtl共享变量变为-1,以告诉其他试图获得sizeCtl变量的线程,目前正在由本线程在用该变量完成特定工作.

sizeCtl是一个用于同步多个线程的共享变量,如果它的当前值为负数,则说明table正在被某个线程初始化或者扩容,所以,如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量

Get操作

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 1. 重hash
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //2. table[i]桶节点的key与查找的key相同,则直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            //4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 都不满足那就按照链表的方式遍历获取值

Put操作

调用put方法时实际具体实现是putVal方法,源码如下:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //1. 计算key的hash值
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 找到该 hash 值对应的数组下标,得到第一个节点 f
        //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //用一次 CAS 操作将这个新值放入其中即可,这个 put 操作结束
            //如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //4. 当前正在扩容
        else if ((fh = f.hash) == MOVED)
            // 帮助数据迁移
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //5. 当前为链表,在链表中插入新的键值对
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 6.当前为红黑树,将新的键值对插入到红黑树中
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
            if (binCount != 0) {
                 // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 
    addCount(1L, binCount);
    return null;
}

整体流程:

  1. 首先对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置
  2. 如果当前table数组还未初始化,先将table数组进行初始化操作;
  3. 如果这个位置是null的,那么使用CAS操作直接放入;
  4. 如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果该节点fh==MOVED(代表forwardingNode,数组正在进行扩容)的话,说明正在进行扩容;
  5. 如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到key相同的节点,则只需要覆盖该结点的value值即可。否则依次向后遍历,直到链表尾插入这个结点;
  6. 如果这个节点的类型是TreeBin的话,直接调用红黑树的插入方法进行插入新的节点;
  7. 插入完节点之后再次检查链表长度,如果长度大于8,就把这个链表转换成红黑树;
  8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。

总结

  • 从JDK7到8 不采用segment而采用node,锁住node来实现减小锁粒度
  • 底层结构是散列表(数组+链表)+红黑树,这一点和HashMap是一样的。
  • ConcurrentHashMap的key和Value都不能为null
  • ConcurrentHashMap作为一个高并发的容器,它是通过synchronized+CAS算法来进行实现线程安全的
  • get方法是非阻塞,无锁的。重写Node类,通过volatile修饰next来实现每次获取都是最新设置的值