简介:html
本文主要介绍Java8中的并发容器ConcurrentHashMap的工做原理,和其它文章不一样的是,本文重点分析了不一样线程的各种并发操做如get,put,remove之间是如何同步的,以及这些操做和扩容操做之间同步可能出现的各类状况。因为源代码的分析确定会有所纰漏,但愿你们积极指出错误。java
欢迎探讨,若有错误敬请指正 node
如需转载,请注明出处 http://www.cnblogs.com/nullzx/数组
图片来源(http://www.importnew.com/28263.html)安全
咱们将数组称之为表,将数组中每一个链表或红黑树称之为桶,将数组中的每一个结点称之为槽,也就是说“槽”存储了链表的头结点或者红黑树的根结点。源代码中用内部类Node表示链表中的每一个结点。多线程
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; //…… //省略其它代码 //…… Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
每一个结点都有一个hash属性它表示了Node对象的哈希值,这个哈希值其实是key.hashCode()通过spread函数进一步散列后的值(后面的内容有spread函数源代码)。特别须要注意的是val和next属性都是用volatile修饰的。并发
而有关红黑树的内容不在本文的讨论范围以内,有兴趣的同窗能够参考个人另外三篇有关红黑树的技术博客。ide
构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor)
2.1 表的长度
实际上表的长度必须为2的整数次幂。该类内部会用大于等于initialCapacity的最小2的整数次幂做为长度。假设你构造ConcurrentHashMap对象时传递的initialCapacity的值是21,那么实际上表的长度是32。通常教科书上设计哈希表时,会将表的长度设置为较大的质数,而这里将表的长度设置成2的整数次幂,我认为有如下两点缘由:
1)在教科书中咱们是经过
(Node对象的hash属性值)%表长度
来定位槽的位置。这样作的前提是咱们假设求余运算是很快就能够完成的,但实际上CPU可能须要不少条指令才能实现求余操做。若是槽的长度正好的2的整数次幂,那么咱们就能够经过下面的方式计算槽的位置 ,这和上面的计算方式等价,但位与运算明显要快于求余运算。
(Node对象的hash属性值)&(表长度-1)
2)在多线程扩容的时,这样的长度设置能够避免在扩容时对新表加锁,从而加快ConcurrentHashMap的扩容速度。关于扩容的细节问题,后面会进行讲述。
2.2 负载因子的含义
默认负载因子为0.75。咱们假设表的长度为100(固然,实际上不多是这个值,这里只是为了方便分析)。那么咱们最多存储75个结点就要扩容(注意并非占用75个槽之后才会扩容)。因此负载因子是对查询效率和存储空间平衡关系的表示。
static final int HASH_BITS = 0x7fffffff; static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
经过key肯定槽的位置时,若是咱们直接使用
key.hashCode() &(表长度-1)
若是经过上述方法,仍然还存在较多的key冲突,那么就会致使同一个槽中汇集了较多结点,Java8中就会将这个长的链表转化为一颗以key表示大小的红黑树,以减小查询时间。默认状况下链表长度大于8就会被转化成红黑树。
这里咱们先不考虑并发问题,先说说基本的扩容操做,当put操做完成后,都要统计当前ConcurrentHashMap中结点的个数(显然结点个数不是一个准确值,只能是一个估计值)。若是结点个数大于设定的阀值(表的长度*负载因子),就要进行扩容操做,以提升查询效率。
前面咱们说过表的长度是2的整数次幂,扩容时咱们让表的长度翻倍,因此扩容后的新表长度也必然是2的整数次幂。咱们这里假设旧表的长度是8(实际上代码中表的最小长度也是16,这样假设是为了画图方便),图中的数字表示结点的hash值。
从图中咱们能够看出,扩容后表的长度变成了16。咱们如今要对比观察扩容先后每一个结点的位置,显然能够获得一个有意思的结论:每一个结点在扩容后要么留在了新表原来的位置上,要么去了新表 “原位置+8”的位置上,而8就是旧表的长度。好比扩容前3号槽有[3,11,19]结点,扩容后[3,19]结点依然留在了原3号位置,而节点[11]去了“原位置3 + 8 = 11”的位置。计算新表中槽的位置有很巧妙的方法,有兴趣的同窗能够参照transfer函数的源代码。
扩容长度翻倍并,且扩容后长度仍然是2的整数次幂的特性在多线程扩容有很大的优点。原表中不一样桶上的结点,在新表上必定不会分配到相同位置的槽上。咱们可让不一样线程负责原表不一样位置的桶中全部结点的迁移,这样两个线程的迁移操做是不会相互干扰的。
好比咱们可让一个线程负责原表中3号桶中全部结点的迁移,另外一个线程负责原表中4号桶全部结点的迁移。原表中3号位置上的结点只能迁移到新表3号位置或11号位置上,绝对不会映射到其它位置上。而4号位置上的结点只能迁移到新表4号位置或12号位置上,因此在迁移结点的过程当中,两个线程就没必要在新表的对应槽上加锁了。
5.1 get方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //说明这个桶迁移已完成 或者 槽中是红黑树的根 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
经过源代码发现,整个get操做都没有加锁,也没有用 CAS操做,那么get方法是怎么保证线程安全的呢?如今先不回答这个问题,不过咱们应该注意get方法中头结点hash值小于0的状况(即eh < 0)的状况,结合后面的扩容操做进行解释。
5.2 put方法
public V put(K key, V value) { return putVal(key, value, false); }
put方法实际上调用了putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //每次循环都会从新计算槽的位置,由于可能恰好完成扩容操做 //扩容完成后会使用新表,槽的位置可能会发生改变 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //槽为空,先尝试用CAS方式进行添加结点 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //当前线程先帮助迁移,迁移完成后在新表中进行put else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //加锁操做,防止其它线程对此桶同时进行put,remove,transfer操做 synchronized (f) { //头结点发生改变,就说明当前链表(或红黑树)的头节点已不是f了 //可能被前面的线程remove掉了或者迁移到新表上了 //若是被remove掉了,须要从新对链表新的头节点加锁 if (tabAt(tab, i) == f) { //ForwordingNode的hash值为-1 //链表结点的hash值 >= 0 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是遍历到了最后一个结点, //那么就说明须要插入新的结点,就把新结点插入在链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //红黑树的根结点的hash值为-2 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //内部判断是否须要扩容 addCount(1L, binCount); return null; }
put方法作了如下几点事情:
1)若是没有初始化就先调用initTable()方法来进行初始化过程2)若是没有hash冲突就尝试CAS方式插入
3)若是还在进行扩容操做就先帮助其它线程进一块儿行扩容
4)若是存在hash冲突,就加锁来保证put操做的线程安全。
有意思的是,ConcurrentHashMap中并无使用ReentrantLock,而是直接使用了synchronized关键字对槽加锁。我的猜想,这样作的缘由是避免建立过多的锁对象。若是桶的长度是1024(别问我为啥是这个值,我只是考虑到了它是2的整数次幂,若是你联想到了其它不宜公开讨论的内容,请告诉我地址),那么咱们就须要在每一个桶的位置上分配一把锁,也就要1024把锁,考虑到每次扩容后都还要从新建立全部的锁对象,这显然是不划算的。
添加结点操做完成后会调用addCount方法,在addCount方法中会去判断是否须要扩容操做。若是容量超过阀值了,就由这个线程发起扩容操做。若是已经处于扩容状态(sizeCtl < -1),根据剩余迁移的数据和已参加到扩容中的线程数来判断是否须要当前线程来帮助扩容。
5.3 remove方法
public V remove(Object key) { return replaceNode(key, null, null); }
实际上调用了replaceNode方法
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || /*每次循环都会从新计算槽的位置,由于在扩容完成后会使用新表 槽的位置可能会发生改变*/ (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //若是有线程正在扩容,先帮助它一块儿扩容,而后在新表中进行put操做 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; //加锁操做,防止其它线程对此桶同时进行put,remove,transfer操做 synchronized (f) { //头结点发生改变,就说明当前链表(或红黑树)的头节点已不是f了 //可能被前面的线程remove掉了或者迁移到新表上了 //若是被remove掉了,须要从新对链表新的头节点加锁 if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
5.4 ForwardingNode类
static final class ForwardingNode<K,V> extends Node<K,V> { //新表的引用 final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //进行get操做的线程若发现槽中的节点为ForwordingNode类型 //说明该桶中全部结点已迁移完成,会调用ForwordingNode的find方法在新表中进行查找 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes //重新表中查询 outer: for (Node<K,V>[] tab = nextTable;;) { //n表示新表的长度 Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || //从新在新表中定位 (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; //继续递归查询?这里没看懂 if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } //下一个 if ((e = e.next) == null) return null; } } } }
ForwardingNode类继承了Node类,因此ForwardingNode对象也是Node类型对象,因此它也能够放到表中。
ForwardingNode在扩容中使用。每个ForwardingNode对象都包含扩容后的表的引用(新表保存在nextTable属性中)。 ForwardingNode对象的key,value,next属性值所有为null,它的hash值为-1(注意小于0哦,能够去看看get方法中对应的部分了)。
ForwardingNode对象中也定义了find的方法,它是从扩容后的新表中查询结点,而不是以自身为头结点进行查找。
5.5 扩容方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //头结点加锁,防止其它线程此时对该桶进行put和remove操做 synchronized (f) { //和put及remove操做判断头结点是否改变的原理相似 if (tabAt(tab, i) == f) { // fh >= 0 表示链表 Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //按新表中槽的位置分为两部分 //注意新表中的节点都是新建的,而不是修改原的结点的next指针 //这样作是为了同其它线程的get方法并发时能get正确的结果 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //将头结点设置为fwd setTabAt(tab, i, fwd); advance = true; } //表示红黑树 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //将头结点设置为fwd setTabAt(tab, i, fwd); advance = true; } } } } } }
整个扩容操做分为两个部分
1)构建一个nextTable,它的容量是原来的两倍,这个操做是单线程完成的。
2)是将原来table中的结点迁移到nextTable中,这里容许多线程进行操做。
在每一个位置扩容时,会对头结点加锁,避免其它线程在该位置进行put及remove操做,这个位置扩容结束时会将头结点设置成ForwardingNode,而后释放锁。ForwardingNode结点中包含新表的引用,ForwardingNode结点的hash属性的值为-1,next属性的值为null。原表中引用为null的槽一样被设置成ForwardingNode结点。
多线程迁移的过程不是一个线程处理一个槽,而是一个线程处理多个连续的多个槽。在ConcurrentHashMap类中还定义下面属性值,开始扩容时这个值表示了旧表的长度,也就是说搬运工做是从旧表的末尾开始的。
private transient volatile int transferIndex;
transfer函数中定义一个局部变量stride,它表示了每一个线程的一次迁移处理的桶的个数,当一个线程处理完成后 transferIndex就自减一个stride,那么下一个线程就应该从transerIndex – stride处开始,往前处理stride个桶,以此类推完成协做。为何要设计成从旧表的后部开始往头部的方向搬运呢?我的猜测是搬运结束的时候条件是统一的,只是写代码的技巧吧。固然怎么肯定整个旧表上的内容所有都迁移了,还须要读更多的源代码,这里就不做分析了。
上图表示了扩容操做过程当中旧表和新表之间的一种可能的状态,在图中fw表示ForwordingNode类型结点,数字表示Node类型结点(上图中的扩容过程和前面论述过的“4.扩容操做”章节中的的扩容过程不是同一个过程,对应的数据会有所差别)。如今咱们就经过如下几种状况解释上图所表达的意思。
首先,多个线程在同一个位置上的get操做时显然不须要同步,因此这种状况不须要讨论,咱们来讨论剩下几种状况。
6.1初化的同步问题
表的建立并非在构造函数中进行的,而是在put方法中进行的,也就是说这其实是个懒汉模式。可是若是多个线程同时建立表,显然是非线程安全的。因此只能有一个线程来进行建立表,其它线程会等待建立完成后完成其它操做。ConcurrentHashMap类中设定一个volatile变量sizeCtl
private transient volatile int sizeCtl;
而后经过CAS方法去修改它,若是有其它线程发现sieCtl为-1
U.compareAndSwapInt(this, SIZECTL, sc, -1)
就表示已经有线程正在建立表了,那么当前线程就会放弃CPU使用权(调用Thread.yield()方法),等待分初始化完成后继续进行put操做。不然当前线程尝试将siezeCtl修改成-1,若成功,就由当前线程来建立表。
6.2 put方法和remove方法之间的同步问题
在表的同一个桶上,一个线程调用put方法和另外一个线程调用put方法是互斥的;在表的同一个桶上,一个线程调用remove方法和另外一个线程调用remove方法也是互斥的;在表的同一个桶上,一个线程调用remove方法和另外一个线程调用put方法也是互斥的。这些互斥操做在代码中都是经过锁来保证的,每一个线程执行这些操做时都会先锁住槽。
6.3 put(或remove)方法和get方法的同步问题
实际上这两类操做是不须要同步,先到先得。这主要因为Node定义中value和next都定义成了volatile类型。一个线程可否get到另外一个线程刚刚put(或remove)的值,这主要由两个线程当前访问的结点所处的位置决定的。
6.4 get方法和扩容操做的同步问题
能够分红两种状况讨论
1)该位置的头结点是Node类型对象,直接get,即便这个桶正在进行迁移,在get方法未完成前,迁移操做已完成,即槽被设置成了ForwordingNode对象,也不要紧,并不影响get的结果。由于get线程仍然持有旧链表的引用,能够从当前结点位置访问到全部的后续结点。这是由于新表中的节点是经过复制旧表中的结点获得的,因此新表的结点的next值不会影响旧表中对应结点的next值。当get方法结束后,旧链表就出于不可达的状态,会被垃圾回收线程回收。
2)该位置的头结点是ForwordingNode类型对象(头结点的hash值 == -1),头结点是ForwordingNode类型的对象,调用该对象的find方法,在新表中查找。
因此不管哪一种状况,都能get到正确的值。
6.5 put(或remove)方法和扩容操做的同步问题
一样能够分为两种状况讨论:
1)该位置的头结点是Node类型对象,那就看谁先获取锁,若是put操做先获取锁,则先将Node对象放入到旧表中,而后调用addCount方法,判断是否须要帮助扩容。
2)该位置的头结点是ForwordingNode类型对象,那就会先帮助扩容,而后在新表中进行put操做。
[1] Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析