其实ConcurrentHashMap我本身已经看过不少遍了,可是今天在面试阿里的时候本身在描述ConcurrentHashMap发现本身根本讲不清楚什么是ConcurrentHashMap,以及里面是怎么实现的,搞的我忽然发现本身什么都不懂,因此我想要再次的来分析一下这个源码,彻底理解ConcurrentHashMap,而不是觉得本身懂了,实际上本身不懂。html
首先咱们看一下put方法,put方法会调用到putVal方法上面。java
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //若是put进去的是个链表,这个参数表示链表的大小 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //初始化链表 tab = initTable(); //若是这个槽位没有数据 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS将这个新的node设置到hash桶里面去 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //帮助迁移 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //获取锁 V oldVal = null; synchronized (f) { //双重检查锁 if (tabAt(tab, i) == f) { //若是hash值大于等于0,那么表明这个节点里的数据是链表 if (fh >= 0) { binCount = 1; //每次遍历完后binCount加1,表示链表长度 for (Node<K,V> e = f;; ++binCount) { K ek; //若是hash值和key值都相同,那么覆盖,break结束循环 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //下一个节点为null,说明遍历到尾节点了,那么直接在尾节点设值一个新的值 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //若是是红黑树 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //若是链表个数大于8,那么就调用这个方法 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
解释一下上面的源码作了什么:node
接下来咱们先看initTable 方法,再看treeifyBin和helpTransfer面试
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //一开始的时候sizeCtl为0 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //将sizeCtl用CAS设置成-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //由于sc一开始为0,因此n取DEFAULT_CAPACITY为16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //将table赋值为大小为16的Node数组 table = tab = nt; //将sc的设置为总容量的75%,若是 n 为 16 的话,那么这里 sc = 12 sc = n - (n >>> 2); } } finally { //最后将sizeCtl设置为sc的值 sizeCtl = sc; } break; } } return tab; }
这个方法里面初始化了一个很重要的变量sizeCtl,初始值为总容量的75%,table初始化为一个容量为16的数组数组
下面咱们在看看treeifyBin方法安全
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //若是数据的长度小于64,那么调用tryPresize进行扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //若是这个槽位里面的元素是链表 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //给链表头加上锁 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //遍历链表,而后初始化红黑树对象 for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //给tab槽位为index的元素设置新的对象 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
treeifyBin这个方法里面并非只是将链表转化为红黑树,而是当tab的长度大于64的时候才会将链表转成红黑树,不然的话,会调用tryPresize方法。多线程
而后咱们进入到tryPresize方法里面看看,tryPresize传入的参数是当前tab数组长度的两倍。ide
private final void tryPresize(int size) { //本来传进来的size已是两倍了,这里会再往上取最近的 2 的 n 次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 这个 if 分支和以前说的初始化数组的代码基本上是同样的,在这里,咱们能够不用管这块代码 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //一开始进来的时候sc是大于0的 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //将SIZECTL设置为一个很大的复数 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
这个方法里面,会对tab数据进行校验,若是没有初始化的话会从新进行初始化大小,若是是第一次进来的话会将SIZECTL设置成一个很大的复数,而后调用transfer方法,传如当前的tab数据和null。源码分析
接着咱们来看transfer方法,这个方法比较长,主要的扩容和转移节点都在这个方法里面实现,咱们将这个长方法分红代码块,一步步分析:this
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //若是当前tab数组长度为16 int n = tab.length, stride; //那么(n >>> 3) / NCPU = 0 小于MIN_TRANSFER_STRIDE if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //将stride设置为 16 stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //若是n是16,那么nextTab就是一个容量为32的空数组 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //将transferIndex赋值为16 transferIndex = n; } ... }
这个代码块主要是作nextTable、transferIndex 、stride的赋值操做。
... //初始化nextn为32 int nextn = nextTab.length; //新建一个ForwardingNode对象,里面放入长度为32的nextTab数组 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; //初始化bound为0 for (int i = 0, bound = 0;;) { ... }
下面的代码会所有包裹在这个for循环里面,因此咱们来分析一下这个for循环里面的代码
for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; //将nextIndex设置为transferIndex,一开始16 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //一开始的时候nextIndex是和stride相同,那么nextBound为0,TRANSFERINDEX也为0 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //这里bound也直接为0 bound = nextBound; //i = 15 i = nextIndex - 1; advance = false; } } ... }
这个方法是为了设置transferIndex这个属性,transferIndex一开始是原tab数组的长度,每次会向前移动stride大小的值,若是transferIndex减到了0或小于0,那么就设置I等于-1,i在下面的代码会说到。
for (int i = 0, bound = 0;;) { ... //在上面一段代码块中,若是transferIndex已经小于等于0了,就会把i设置为-1 if (i < 0 || i >= n || i + n >= nextn) { int sc; //表示迁移已经完成 if (finishing) { //将nextTable置空,表示不须要迁移了 nextTable = null; //将table设置为新的数组 table = nextTab; //sizeCtl设置为n的 1.5倍 sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT, // 也就是说,全部的迁移任务都作完了,也就会进入到上面的 if(finishing){} 分支了 finishing = advance = true; i = n; // recheck before commit } } ... }
这个方法是用来表示已经迁移完毕了,能够退出。
for (int i = 0, bound = 0;;) { ... //若是该槽位没有元素,那么直接把tab的i槽位设置为fwd else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //说明这个槽位已经有其余线程迁移过了 else if ((fh = f.hash) == MOVED) advance = true; // already processed //走到这里,说明tab的这个槽位里面有数据,那么咱们须要得到槽位的头节点的监视器锁 else { synchronized (f) { if (tabAt(tab, i) == f) { ... } } } ... }
在这个代码块中,i会从最后一个元素一个个往前移动,而后根据i这个index来判断tab里面槽位的状况。
下面的代码咱们来分析监视器锁里面的内容:
synchronized (f) { if (tabAt(tab, i) == f) { //fh是当前节点的hash值 if (fh >= 0) { int runBit = fh & n; //lastRun设置为头节点 Node<K,V> lastRun = f; // 须要将链表一分为二, // 找到原链表中的 lastRun,而后 lastRun 及其以后的节点是一块儿进行迁移的 // lastRun 以前的节点须要进行克隆,而后分到两个链表中 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //其中的一个链表放在新数组的位置 i setTabAt(nextTab, i, ln); //另外一个链表放在新数组的位置 i+n setTabAt(nextTab, i + n, hn); //将原数组该位置处设置为 fwd,表明该位置已经处理完毕 //其余线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了 setTabAt(tab, i, fwd); //advance 设置为 true,表明该位置已经迁移完毕 advance = true; } //下面红黑树的迁移和上面差很少 else if (f instanceof TreeBin) { .... } } }
这个方法主要是将头节点里面的链表拆分红两个链表,而后设置到新的数组中去,再给老的数组设置为fwd,表示这个节点已经迁移过了。
到这里transfer方法已经分析完毕了。 这里我再举个例子,让你们根据透彻的明白多线程之间是怎么进行迁移工做的。
咱们假设stride仍是默认的16,第一次进来nextTab为null,可是tab的长度为32。 一开始的赋值: 1. n会设置成32,而且n只会赋值一次,表明被迁移的数组长度 2. nextTab会被设置成一个大小为64的数组,并塞入到新的ForwardingNode对象中去。 3. transferIndex会被赋值为32 进入循环: 初始化i为0,bound为0; 第一次循环: 1. 因为advance初始化为true,因此会进入到while循环中,循环出来后,transferIndex会被设置成16,bound被设置成16,i设置成31。这里你可能会问 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true 第二次循环: 1. --i,变为30,--i >= bound成立,并将advance设置成false 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true 。。。 第十六次循环: 1. --i,变为15,将transferIndex设置为0,bound也设置为0,i设置为15 2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true 第三十二次循环: 1. 这个时候--i等于-1,而且(nextIndex = transferIndex) <= 0成立,那么会将i设置为-1,advance设置为false 2. 会把SIZECTL用CAS设置为原来的值加1,而后设置finishing为true 第三十三次循环: 1. 因为finishing为true,那么nextTable设置为null,table设置为新的数组值,sizeCtl设置为旧tab的长度的1.5倍
原文出处:https://www.cnblogs.com/luozhiyun/p/11406557.html