文章简介java
想必你们对HashMap数据结构并不陌生,JDK1.7采用的是数组+链表的方式,JDK1.8采用的是数组+链表+红黑树的方式。虽然JDK1.8对于HashMap有了很大的改进,提升了存取效率,可是线程安全的问题不可忽视,因此就有了线程安全的解决方案,好比在方法上加synchronized同步锁的HashTable,或者并发包中的ConcurrentHashMap线程安全类,本文就来和你们一块儿探讨一下关于ConcurrentHashMap的源码,版本是JDK1.8,下面让咱们正式开始吧。数组
备注:你们须要对HashMap1.8源码有一些了解,在原来HashMap1.8源码中比较常见的知识点本文不会具体展开。安全
不妨先以一段你们熟悉的代码开始本文的旅程数据结构
ConcurrentHashMap<Integer,String> map=new ConcurrentHashMap<Integer, String>(); map.put(1,"Zhang");
当咱们在put元素时,点开put方法的源码会发现,这里调用了一个putVal()的方法,同时将key和value做为参数传入并发
public V put(K key, V value) { return putVal(key, value, false); }
继续点击putVal()方法,而后咱们看看这里到底实现了什么ide
//key或者value都不能为空 if (key == null || value == null) throw new NullPointerException(); //计算hash值,实际上就是获得一个int类型的数,只是须要对这个数进行处理,目的是为了肯定key,value组成的Node节点在数组下标中的位置 int hash = spread(key.hashCode());
不妨先看下spread(key.hashCode())的实现this
key.hashCode()实际上调用的是native的方法,目的是获得一个整形数,为了使得这个整形数尽量不同,因此要对高16位和低16位进行异或运算,尽量利用好每一位的值 static final int spread(int h) { //对key.hashCode的结果进行高16位和低16位的运算 return (h ^ (h >>> 16)) & HASH_BITS; }
接下来就是要初始化这个数组的大小,由于数组不初始化,表明key,value的每一个Node类也不能放到对应的位置线程
if (tab == null || (n = tab.length) == 0) //初始化数组的大小 tab = initTable();
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //只有当数组为空或者大小为0的时候才对数组进行初始化 while ((tab = table) == null || tab.length == 0) { //这里其实就是用一个sizeCtl记录是否已经有线程在进行初始化操做,若是有,则让出CPU的资源,也就是保证只有一个线程对数组进行初始化操做,从而保证线程安全。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //使用CAS乐观锁机制比较SIZECTL和sc是否相等,只有当前值和内存中最新值相等的时候,才会将当前值赋值为-1,一旦被赋值为-1,上面有其余线程进来,就直接执行了Thread.yeild()方法了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //三元运算符获得数组默认大小,点击DEFAULT_CAPACITY发现是16,这点和HashMap是同样的 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //建立Node类型的数组,真正初始化的地方 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //计算扩容的标准,采用的是位移运算,由于效率更高,sc最终结果为12 sc = n - (n >>> 2); } } finally { //无论不管最终将sc赋值为sizeCtl,这时候sizeCtl结果为12 sizeCtl = sc; } break; } } return tab; }
当数组初始化完成以后,就须要将key,value建立出来的Node节点放到数组中对应的位置了,分为几种状况,下面这种是原来某个位置就没有元素值,可是为了保证线程安全,放到多个线程同时添加,也使用CAS乐观锁的机制进行添加。code
//根据(n-1)&hash的结果确认当前节点所在的位置是否有元素,效果和hash%n是同样的,只是&运算效率更高,这里hashmap也是这样作的,就不作更多赘述了 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS乐观锁机制向对应的下标中添加对应的Node if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
//f其实是当前数组下标的Node节点,这里判断它的hash值是否为MOVED,也就是-1,若是是-1,就调用helpTransfer(tab,f)方法帮助其余线程完成扩容操做,而后再添加元素。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
接下来就要考虑数组具体下标位置有元素的状况,这时候就须要把Node节点向当前节点下进行顺延,造成链表或者红黑树的结构,还有一种状况就是key值相同,value值不能,这时候只须要进行一个value值的替换便可。对象
V oldVal = null; //数组初始化和在数组下标中插入Node时,为了保证线程安全使用的是CAS无锁化机制 //那元素继续往下插入时,线程安全的问题怎么保证呢?可使用synchronized关键字 //发现同步代码块中锁的对象是f,也就是当前数组下标的元素,这样不一样的数组下标之间彼此互相不影响。 synchronized (f) { //再次确认当前头结点是否为f if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //第一种状况,发现是key值相同,只须要替换掉oldValue便可 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //第二种状况,按照链表的方式进行插入 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //第三种状况,按照红黑树的方式进行插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //通常链表转红黑树是节点数>8的时候,但不是一旦某个数组下标的节点数大于8就转成红黑树,也能够经过调整数组的容量来解决,好比treeifyBin中进行的 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //说明上面有须要替换掉旧值的节点 if (oldVal != null) return oldVal; break; }
当添加完一个key,value方式的Node以后,就须要检查是否整个数据结构中的节点数超过扩容标准好比12,若是超过了就须要进行数组大小的扩容,先调用addCount()方法,由于第二个参数check大于0,因此直接看里面这段代码。
if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //经过resizeStamp(n),n是数组大小,获得一个int的结果,赋值给rs保存 int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //由于sc<0不成立,因此会来到这段代码 //这里经过CAS的方式比较SIZECTL和sc的值,当二者相等时,会执行rs<<RESIZE<STAMP_SHIFT+2赋值操做,这个结果值是一个负数,表示当前正在执行扩容操做的线程数量 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //调用transfer方法进行真正的扩容操做 transfer(tab, null); s = sumCount(); } }
在concurrenthashmap中的扩容操做可能不止一个线程,因此每一个线程就须要分工合做完成扩容,也就是每一个线程须要领取本身负责的task,固然前提是得要有一个新的数组,这样才能将老数组中的Node节点搬移到新数组中。
int n = tab.length, stride; //肯定线程负责数组大小的范围 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //判断新的数组是否为null,为空则进行建立,好比数组原来的大小是16,2的N次幂,扩容也须要双倍扩容 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") //采用位移运算进行双倍扩容 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //使用transferIndex变量记录数组的大小,表示线程进行扩容的时候,是从后往前进行的 transferIndex = n; }
接下来就要进行搬移工做了,咱们须要用一些标识记录一下搬移的完成状态,同时线程将某个数组下标的节点搬移完成以后也要让别人知道,同时也能知道有线程正在进行扩容操做。
int nextn = nextTab.length; //某个下标节点完成以后的节点类型,实际上就是继承了Node节点,只不过点进去发现它的hash值为MOVED也就是-1 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab
Node<K,V> f; int fh; //i指向当前数组的下标,经过while循环遍历--i,从而知道当前线程拿到的一个区间范围 while (advance) { int nextIndex, nextBound; //一个数组下标一个数组下标的处理 if (--i >= bound || finishing) advance = false; //表示已经没有须要搬运的节点了,将advance赋值为false else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //不一样的线程搬运的内容,不断地将transferindex的值变小 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
if (i < 0 || i >= n || i + n >= nextn) { int sc; //finishing等于true就表示全部的线程都搬运完了,作最后的收尾工做 //好比将新数组的内容赋值到table,扩容标准由原来的12变成24 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //这里是每次有一个线程完成搬运工做,就将线程总数量-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } }
//若是某个线程的某个数组下标搬运完成,则将该头节点赋值为fwd类型的,其实就是hash值为MOVED else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //表示已经搬运完成 else if ((fh = f.hash) == MOVED) advance = true; // already processed
接下来就是每一个线程真正在搬运代码的过程,其实这块和hashmap1.8中的resize后面的过程很相似
synchronized (f) { //再次检查当前数组下标的节点是否为f if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { //新节点的位置要么在原来的位置,要么在原来的位置+原来数组的大小,这点和hashmap中同样 //p.hash&n 也就是判断这个结果是否等于0 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //等于0会走这边 if (runBit == 0) { ln = lastRun; hn = null; } //不等于0会走这边 else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将链表总体迁移到nextTable中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //标识原桶标识位已经处理,头节点标记为fw,hash值为-1 setTabAt(tab, i, fwd); advance = true; } //这里是红黑树迁移的状况 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } }
前面说到,当链表长度超过8会转成红黑树,可是节点总数若是小于64,会用扩容的方式代替转红黑树,代码以下
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //tryPresize进行扩容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
点击tryPresize方法,最终也会来到下面这段代码,和前面addCount中的同样
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //注意这里的第二个参数为null,表示新的数组尚未建立,以前也是null transfer(tab, null);
在以前put的时候,中间跳过了这段话,这段话是当前线程发现有其余线程正在进行扩容操做,协助其余线程扩容完成以后再继续put元素。
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //每当来一个线程帮助扩容,此时就会sc+1,表示多了一个线程 //其实这块也能和transfer方法中的sc-1对应上,一个线程完成以后就数量-1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //扩容的方法,注意第二个参数有传入nextTab,缘由是当前线程只是协助其余线程扩容 //既然其余线程正在扩容,说明这个新数组已经建立好了 transfer(tab, nextTab); break; } } return nextTab; } return table; }
到目前为止,咱们分析了put过程当中会遇到线程安全的点,好比数组初始化,数组头元素添加,put完成过程等。同时还分析了transfer扩容每一个线程领取的任务,搬运结果的方式,协助扩容等方面的内容。若是对你们有帮助,请帮忙转发。