ConcurrentHashMap源码解析(1.8)

1、简介

上篇文章详细介绍了HashMap的源码及原理,本文趁热打铁继续分析ConcurrentHashMap的原理。html

首先在看本文以前,但愿对HashMap有一个详细的了解。否则看直接看ConcurrentHashMap的源码仍是有些费劲的。node

相信对HashMap,HashTable有必定了解,应该知道HashMap是不具有线程安全性的,在resize时会丢数据(JDK8),而HashTable虽然保证了线程安全性,可是其是经过给每一个方法加Synchronized关键字达到的同步目的。可是都知道Synchronized在竞争激烈的多线程并发环境中,在性能上的表现是很是不如人意的。那在高并发环境中HashMap如何保证线程安全而又不浪费太多性能呢?答案就是Java J.U.C并发包中的ConcurrentHashMap。数组

依然开局一张图。JDK8中的ConcurrentHashMap数据结构。安全

 

 

 呃呵,和HashMap的结构是同样的,没错在数据结构层面,ConcurrentHashMap和HashMap是彻底同样的。有了这个基础继续往下看。数据结构

2、历史版本

ConcurrentHashMap的历史版本大体分界线在JDK8。也就是能够分为JDK8和JDK8之前版本。多线程

数据结构的区别并发

在JDK8以前HashMap没有引入红黑树,一样的ConcurrentHashMap也没有引入红黑树。并且ConcurrentHashMap采用的是分段数组的底层数据结构。ide

在JDK7中的数据结构。函数

从上图咱们不难看出其在数据结构方面的差异。高并发

锁的区别

JDK7中为了提升并发性能采用了这种分段的设计。因此在JDK7中ConcurrentHashMap采用的是分段锁,也就是在每一个Segment上加ReentrantLock实现的线程安全线。关于ReetrantLock后面有时间会介绍,大体来讲ReetrantLoack是比Synchronized更细粒度的一种锁。使用得当的话其性能要比Synchronized表现要好,可是若是实现不得当容易形成死锁。

这种基于Segment和ReetrantLock的设计相对HashTable来讲大大提升了并发性能。也就是说多个线程能够并发的操做多个Segment,而HashTable是经过给每一个方法加Synchronized即将多线程串行而实现的。因此在必定程度上提升了并发性能。可是这种性能的提高表现相对JDK8来讲显得不值一提。

若是说JDK7 ConcurrentHashMap相对HashTable来讲是串行到多个线程并发的改进。而JDK8则是经过比Segment更细粒度的并发控制大大提升了其并发表现。

JDK8中ConcurrentHashMap采用的是CAS+Synchronized锁而且锁粒度是每个桶。简单来讲JDK7中锁的粒度是Segment,JDK8锁粒度细化到了桶级别。可想而知锁粒度是大大提到了。辅之以代码的优化,JDK8中的ConcurrentHashMap在性能上的表现很是优秀。

简单总结一下,从HashTable到JDK7 ConcurrentHashMap再到JDK8 ConcurrentHashMap。是从同步到并发再到高并发的进步。

3、基础知识

3.一、常量

//正在扩容,对应fwd类型的节点的hash
static final int MOVED     = -1; // hash for forwarding nodes //当前数组
transient volatile Node<K,V>[] table; //扩容时用到的,扩容后的数组。
private transient volatile Node<K,V>[] nextTable; //1,大于零,表示size * 0.75。 //2,等于-1,表示正在初始化。 //3,-(n + 1),表示正在执行扩容的线程其只表示基数,而不是真正的数量,须要计算得出的哦
private transient volatile int sizeCtl;

3.二、Unsafe类方法

 1     @SuppressWarnings("unchecked")  //transient volatile Node<K,V>[] table; tab变量确实是volatile
 2     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//获取table中索引 i 处的元素。
 3         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//若是tab是volatile变量,则该方法保证其可见性。
 4  }  5 
 6     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//经过CAS设置table索引为 i 处的元素。
 7                                         Node<K,V> c, Node<K,V> v) {  8         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  9  } 10             //transient volatile Node<K,V>[] table; tab变量确实是volatile
11     static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 处的元素。
12         U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//若是tab是volatile变量,则该方法保证其可见性。
13     }

 

咱们不难看出 以上三个方法都是调用的Unsafe(U)类中的方法,Unsafe类中定义了大量对内存的操做方法,是native的,不建议开发者直接使用。

tabAt和setTabAt最终调用的两个方法分别是 U.getObjectVolatile()和U.putObjectVolatile 顾名思义其是经过volatile保证的tab的可见性(Volatile只保证可见性不保证原子性哦)。前提是tab变量是Volatile修饰的变量。咱们经过调用栈,最红能够看到其实tab就是ConcurrentHashMap中的table。而这个变量是这么定义的。

transient volatile Node<K,V>[] table;

 

可见其确实是Volatile修饰的变量。

再看

casTabAt方法,这个就是CAS方法了。

CAS:Compare and Swap三个单词的缩写,即:比较交换的意思。CAS在Java中又称之为乐观锁即咱们总认为是没有锁的。

while(true){ CAS(); }

通常的经过上述用法达到自旋的目的。CAS通常经过自旋达到自旋锁的目的,即认为没有锁,失败重试,这种思路。更多内容请自行百度。CAS很重要哦。

4、put过程源码

 1 public V put(K key, V value) {  2     return putVal(key, value, false);  3 }  4 
 5 /** Implementation for put and putIfAbsent */
 6 final V putVal(K key, V value, boolean onlyIfAbsent) {  7     if (key == null || value == null) throw new NullPointerException();  8     int hash = spread(key.hashCode());//hash,对hashcode再散列
 9     int binCount = 0; 10     for (Node<K,V>[] tab = table;;) {//迭代桶数组,自旋
11         Node<K,V> f; int n, i, fh; 12         if (tab == null || (n = tab.length) == 0)//懒加载。若是为空,则进行初始化
13             tab = initTable();//初始化桶数组 14         //(n - 1) & hash)计算下标,取值,为空即无hash碰撞
15         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 16             if (casTabAt(tab, i, null, 17                          new Node<K,V>(hash, key, value, null)))//经过cas插入新值
18                 break;                   // no lock when adding to empty bin
19  } 20         //判断是否正在扩容。若是正在扩容,当前线程帮助进行扩容。 21         //每一个线程只能同时负责一个桶上的数据迁移,而且不影响其它桶的put和get操做。 22         //(很牛逼的思路,能这么作创建在更细粒度的锁基础上)
23         else if ((fh = f.hash) == MOVED) 24             tab = helpTransfer(tab, f); 25         else {//put5,存在hash碰撞
26             V oldVal = null; 27             //此处,f在上面已经被赋值,f为当前下标桶的首元素。对链表来讲是链表头对红黑树来讲是红黑树的头元素。
28             synchronized (f) { 29                 //再次检查当前节点是否有变化,有变化进入下一轮自旋 30                 //为何再次检查?由于不能保证,当前线程到这里,有没有其余线程对该节点进行修改
31                 if (tabAt(tab, i) == f) { 32                     if (fh >= 0) {//当前桶为链表
33                         binCount = 1; 34                         for (Node<K,V> e = f;; ++binCount) {//迭代链表节点
35  K ek; 36                             if (e.hash == hash &&//key相同,覆盖(onlyIfAbsent有什么用?)
37                                 ((ek = e.key) == key ||
38                                  (ek != null && key.equals(ek)))) { 39                                 oldVal = e.val; 40                                 if (!onlyIfAbsent) 41                                     e.val = value; 42                                 break; 43  } 44                             Node<K,V> pred = e; 45                             //找到链表尾部,插入新节点。(什么这里不用CAS?由于这在同步代码块里面)
46                             if ((e = e.next) == null) { 47                                 pred.next = new Node<K,V>(hash, key, 48                                                           value, null); 49                                 break; 50  } 51  } 52  } 53                     else if (f instanceof TreeBin) {//当前桶为红黑树
54                         Node<K,V> p; 55                         binCount = 2; 56                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 57                                                        value)) != null) {//想红黑树插入新节点
58                             oldVal = p.val; 59                             if (!onlyIfAbsent) 60                                 p.val = value; 61  } 62  } 63  } 64  } 65             if (binCount != 0) { 66                 //树化。binCount > 8,进行树化,链表转红黑树
67                 if (binCount >= TREEIFY_THRESHOLD) 68                     //若是容量 < 64则直接进行扩容;不转红黑树。 69                     //(你想一想,假如容量为16,你就插入了9个元素,巧了,都在同一个桶里面, 70                     //若是这时进行树化,时间复杂度会增长,性能降低,不如直接进行扩容,空间换时间)
71  treeifyBin(tab, i); 72                 if (oldVal != null) 73                     return oldVal; 74                 break; 75  } 76  } 77  } 78     addCount(1L, binCount);//扩容。addCount内部会进行判断要不要扩容
79     return null; 80 }

总结以上过程

1,懒加载,未初始化则初始化table 2,hash,hashcode再散列,并计算下标 3,无碰撞,经过CAS插入 4,有碰撞   4.1、若是正在扩容,协助其它线程去扩容   4.2、若是是链表,插入链表   4.3、若是是红黑树,插入红黑树   4.4、若是链表长度超过8,树化   4.5,若是key已经存在,覆盖旧值 5,须要扩容,则扩容

相比HashMap过程多了一个协助扩容。

以上源码须要注意的是

1 for (Node<K,V>[] tab = table;;) {//迭代桶数组,自旋
2     
3 }

这是一个自旋的过程,若是CAS修改失败会进入下一轮自旋。好久之前看这段源码的时候,我老是在想,CAS失败了不就丢数据了吗?因此这个自旋,也称之为自旋锁会保证数据必定能插入成功。

说说上面锁竞争的状况,以上过程咱们不难发现对table的修改都是经过CAS操做实现的。好比下面这行代码,若是已经有线程正在操做 i 位置的元素,则意味着本轮自旋将会失败,继续自旋,当其余线程修改完成,本线程再次运行到tabAt觉得是Volatile操做,其余线程的修改对本线程当即可见(详见Volatile关键字内存语义的内容)。本线程经过tabAt发现该处已经存在元素,即发生碰撞,继续往下运行。

 

1 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 2     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//经过cas插入新值
3      break;                   // no lock when adding to empty bin
4 }

线程的调度须要操做系统从用户态转为内核态,这是很是重量级的操做。CAS+自旋组成的自旋锁保证了线程不会进入阻塞态。

而后继续往下看

 

synchronized (f) { //再次检查当前节点是否有变化,有变化进入下一轮自旋 //为何再次检查?由于不能保证,当前线程运行到这里,有没有其余线程对该节点进行修改
    if (tabAt(tab, i) == f) {

先看这行代码 synchronized (f) 这个f是一个桶的头元素。也就是说在JDK8中synchronized锁仅仅只锁链表头或者红黑树的头(其实就是锁一个桶,由于要访问链表或者红黑树总要从头开始访问吧)

再看 if (tabAt(tab, i) == f) {} 其实就是双重检测(参考单例的双重检测),为何要再检查一遍呢?由于不能保证当前线程运行到这里,有没有其余线程已经对该节点进行了修改。

initTable()

 1 private final Node<K,V>[] initTable() {  2     Node<K,V>[] tab; int sc;  3     while ((tab = table) == null || tab.length == 0) {  4         // 赋值sc。并当sizeCtl == -1 即当前有线程正在执行初始化
 5         if ((sc = sizeCtl) < 0)  6             //yield()暂停当前正在执行的线程,执行其余线程  7             //(这是一个通知,可是这是不必定会让当前线程中止,要取决于线程调度器)  8             //就是我想让出资源,可是这只是一厢情愿的事情,线程调度器会考虑你的方法,可是不必定采纳。
 9  Thread.yield(); 10         //修改 sizeCtl 的值为 -1。 SIZECTL 为 sizeCtl 的内存地址。
11         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 12             try { 13                 //执行初始化过程
14                 if ((tab = table) == null || tab.length == 0) { 15                     //sc在上面已经赋值,=原来 sizeCtl的值。是非讨厌JDK源码这种赋值方式。
16                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 17                     @SuppressWarnings("unchecked") 18                     //建立一个sc长度的table。
19                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 20                     table = tab = nt; 21                     sc = n - (n >>> 2); 22  } 23             } finally { 24                 //初始化完成, sizeCtl从新赋值为当前数组的长度。
25                 sizeCtl = sc; 26  } 27             break; 28  } 29  } 30     return tab; 31 }

以上过程,一样是经过CAS实现的初始化控制,保证只有一个线程去执行初始化。

helpTransfer(tab, f);方法咱们后面介绍完扩容再说。

看完以上put过程,咱们能发现,JDK8经过CAS+自旋锁将锁的粒度控制在每个桶上,相对于JDK7中Segment锁,锁粒度提升了不少。而且CAS+自旋锁保证了不会出现线程的切花这种重量级的操做。

5、扩容

 1 //tab旧桶数组,nextTab新桶数组
 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  3     int n = tab.length, stride;  4     //控制并发数,控制CPU的资源
 5     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  6         stride = MIN_TRANSFER_STRIDE; // subdivide range
 7     if (nextTab == null) {            // initiating//新数组为空,则初始化新数组
 8         try {  9             @SuppressWarnings("unchecked")  10             //扩容为原来的两倍 n << 1
 11             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  12             nextTab = nt;  13         } catch (Throwable ex) {      // try to cope with OOME
 14             sizeCtl = Integer.MAX_VALUE;  15             return;  16  }  17         nextTable = nextTab;  18         transferIndex = n;  19  }  20     int nextn = nextTab.length;  21     //在这里面进行new Node将node.hash置为-1。表示该桶正在进行移动。  22     //(这里很重要的一点是,只锁表头,因此只须要将链表(或者红黑树)头结点.hash置为-1便可)
 23     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  24     //advance是控制是否继续进行移动的条件,当advance == false,表示正在移动一个桶。  25     //true表示能够继续进行下一个桶的移动
 26     boolean advance = true;  27     boolean finishing = false; // to ensure sweep before committing nextTab
 28     for (int i = 0, bound = 0;;) {//自旋
 29         Node<K,V> f; int fh;  30         while (advance) {//start
 31             int nextIndex, nextBound;  32             //当前桶是否是已经移动完了
 33             if (--i >= bound || finishing)  34                 advance = false;  35             //两个中止移动的条件。移动完了。(这个是真正中止的条件。下面那个条件会进行一次检查)
 36             else if ((nextIndex = transferIndex) <= 0) {  37                 i = -1;  38                 advance = false;  39  }  40             else if (U.compareAndSwapInt  41                      (this, TRANSFERINDEX, nextIndex,  42                       nextBound = (nextIndex > stride ?
 43                                    nextIndex - stride : 0))) {  44                 bound = nextBound;  45                 i = nextIndex - 1;  46                 advance = false;  47  }  48  }  49         if (i < 0 || i >= n || i + n >= nextn) {  50             int sc;  51             if (finishing) {//结束扩容
 52                 nextTable = null;  53                 table = nextTab;  54                 sizeCtl = (n << 1) - (n >>> 1);  55                 return;  56  }  57             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  58                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  59                     return;  60                 finishing = advance = true;  61                 i = n; // recheck before commit 再次检查一遍,防止有桶中还有数据没移动。
 62  }  63         }//end 从start到end可看可不看就是条件控制,包括结束条件的控制,移动进度的控制等。  64         //该桶没数据
 65         else if ((f = tabAt(tab, i)) == null)  66             //将oldtab中的该桶设置为fwd节点,hash=-1
 67             advance = casTabAt(tab, i, null, fwd);  68         //已经移动过的桶其hash=-1
 69         else if ((fh = f.hash) == MOVED)  70             advance = true; // already processed
 71         else {  72             synchronized (f) {//上锁
 73                 if (tabAt(tab, i) == f) {  74                     //ln新链表,不须要移动的节点从新组组织成的链表。  75                     //hn新链表,须要移动的节点从新组织成的链表
 76                     Node<K,V> ln, hn;  77                     if (fh >= 0) {//链表
 78                         int runBit = fh & n;  79                         Node<K,V> lastRun = f;  80                         //start  81                         //从start,到end之间。不看也行。实在费脑子。其实这段代码写的有点让人费解  82                         //主要是不认真看不知道做者的意图。本意是这样的。判断是否是能够从某个节点n开始  83                         //后面的节点是否是都是和节点n同样,移动的目标桶同样的。  84                         //若是是同样的,则后面的这些节点就不用移动了,只须要移动n节点便可。  85                         //(注意链表的引用,next指针就把后面的都带过去了)  86                         //想一个极端状况,若是在这里迭代后发现,全部节点,扩容后数据移动的目标桶都是同样的。  87                         //则只须要移动头结点便可。不用从新拼接链表了。
 88                         for (Node<K,V> p = f.next; p != null; p = p.next) {  89                             int b = p.hash & n;  90                             if (b != runBit) {  91                                 runBit = b;  92                                 lastRun = p;  93  }  94  }  95                         if (runBit == 0) {// runBit== 0 表示该节点不须要移动
 96                             ln = lastRun;  97                             hn = null;  98  }  99                         else { 100                             hn = lastRun; 101                             ln = null; 102                         }//end
103                         for (Node<K,V> p = f; p != lastRun; p = p.next) { 104                             int ph = p.hash; K pk = p.key; V pv = p.val; 105                             if ((ph & n) == 0) 106                                 ln = new Node<K,V>(ph, pk, pv, ln); 107                             else
108                                 hn = new Node<K,V>(ph, pk, pv, hn); 109  } 110  setTabAt(nextTab, i, ln); 111                         setTabAt(nextTab, i + n, hn); 112  setTabAt(tab, i, fwd); 113                         advance = true; 114  } 115                     else if (f instanceof TreeBin) {//红黑树
116                         TreeBin<K,V> t = (TreeBin<K,V>)f; 117                         TreeNode<K,V> lo = null, loTail = null; 118                         TreeNode<K,V> hi = null, hiTail = null; 119                         int lc = 0, hc = 0; 120                         for (Node<K,V> e = t.first; e != null; e = e.next) { 121                             int h = e.hash; 122                             TreeNode<K,V> p = new TreeNode<K,V>
123                                 (h, e.key, e.val, null, null); 124                             if ((h & n) == 0) { 125                                 if ((p.prev = loTail) == null) 126                                     lo = p; 127                                 else
128                                     loTail.next = p; 129                                 loTail = p; 130                                 ++lc; 131  } 132                             else { 133                                 if ((p.prev = hiTail) == null) 134                                     hi = p; 135                                 else
136                                     hiTail.next = p; 137                                 hiTail = p; 138                                 ++hc; 139  } 140  } 141                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 142                             (hc != 0) ? new TreeBin<K,V>(lo) : t; 143                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 144                             (lc != 0) ? new TreeBin<K,V>(hi) : t; 145  setTabAt(nextTab, i, ln); 146                         setTabAt(nextTab, i + n, hn); 147  setTabAt(tab, i, fwd); 148                         advance = true; 149  } 150  } 151  } 152  } 153  } 154 }

5.一、扩容前准备阶段

ForwardingNode

1 static final class ForwardingNode<K,V> extends Node<K,V> { 2     final Node<K,V>[] nextTable; 3     ForwardingNode(Node<K,V>[] tab) { 4         super(MOVED, null, null, null); 5         this.nextTable = tab; 6  } 7 }

看一下这个内部类,其实呢其就是一个起到标识做用的节点,该节点看上面代码可知,该节点最主要的特色就是hash=MOVED=-1。hash=-1的节点在ConcurrentHashMap中表示该桶是被扩容过程迁移过的桶。而后当前线程判断若是该桶已经被迁移。不管put仍是get都去新的数组中操做。还有一点很重要,还能够经过ForwardingNode中 nextTable获取到新的数组。

 

1 //该桶没数据
2 else if ((f = tabAt(tab, i)) == null) 3     //将oldtab中的该桶设置为fwd节点,hash=-1
4     advance = casTabAt(tab, i, null, fwd);

看上面代码,先判断该桶还有没有数据。没数据不用迁移,等同于已经迁移完了。其余线程put会直接put到新的数组中。

 

1 //已经移动过的桶其hash=-1;
2 else if ((fh = f.hash) == MOVED) 3     advance = true; // already processed

若是该桶已经移动则跳过。

到此咱们能看出什么?主要是已经移动完的设置成fwd节点,其它线程看到该桶已经移动,则会到新的table中操做。若是未移动,还直接操做当前table,由于就算put,待会处理到该桶,同样移动到新桶,也没啥影响。若是是正在移动的接下来会看到加了Synchronized锁,保证只有一个线程能操做当前桶。简直不要太妙。

5.二、扩容过程

画重点,扩容过程

 1 synchronized (f) {//上锁
 2     if (tabAt(tab, i) == f) {  3         //ln新链表,不须要移动的节点从新组组织成的链表。  4         //hn新链表,须要移动的节点从新组织成的链表
 5         Node<K,V> ln, hn;  6         if (fh >= 0) {//链表
 7             int runBit = fh & n;  8             Node<K,V> lastRun = f;  9             //start 10             //从start,到end之间。不看也行。实在费脑子。其实这段代码写的有点让人费解 11             //主要是不认真看不知道做者的意图。本意是这样的。判断是否是能够从某个节点n开始 12             //后面的节点是否是都是和节点n同样,移动的目标桶同样的。 13             //若是是同样的,则后面的这些节点就不用移动了,只须要移动n节点便可。 14             //(注意链表的引用,next指针就把后面的都带过去了) 15             //想一个极端状况,若是在这里迭代后发现,全部节点,扩容后数据移动的目标桶都是同样的。 16             //则只须要移动头结点便可。不用从新拼接链表了。
17             for (Node<K,V> p = f.next; p != null; p = p.next) { 18                 int b = p.hash & n; 19                 if (b != runBit) { 20                     runBit = b; 21                     lastRun = p; 22  } 23  } 24             if (runBit == 0) {// runBit== 0 表示该节点不须要移动
25                 ln = lastRun; 26                 hn = null; 27  } 28             else { 29                 hn = lastRun; 30                 ln = null; 31             }//end
32             for (Node<K,V> p = f; p != lastRun; p = p.next) { 33                 int ph = p.hash; K pk = p.key; V pv = p.val; 34                 if ((ph & n) == 0) 35                     ln = new Node<K,V>(ph, pk, pv, ln); 36                 else
37                     hn = new Node<K,V>(ph, pk, pv, hn); 38  } 39  setTabAt(nextTab, i, ln); 40             setTabAt(nextTab, i + n, hn); 41  setTabAt(tab, i, fwd); 42             advance = true; 43  } 44         else if (f instanceof TreeBin) {//红黑树 45             //红黑树跳过
46  } 47  } 48 }

 

 5.2.一、并发控制

首先扩容过程是在synchronized同步代码块中的。而且只锁了一个表头。可看到没有锁新数组nextTab的桶。想一想,oldTab(tab变量)和nextTab都是多个线程共享的变量,为何只有只锁了oldTab正在操做的桶?若是有多个线程向nextTab同时迁移数据怎么办?会不会存在线程安全性问题?

TIPS: 统一术语 tab = oldTab = table(旧数组) newTab = nextTab(扩容后新数组) oldIndex即在oldTab中的索引位 newIndex即在newTab中的位置

上一篇文章中介绍HashMap的时候详细介绍了HashMap扩容中,oldTab旧桶迁移向newTab只有两个目标桶。再简单回顾一遍。

上面这张图形象的展现了旧桶在扩容后的两个去向:1,索引位原地不动,2,索引位为oldCap+oldIndex。(关于为何是这两个去向,在HashMap扩容中已经详细介绍了)

若是你还没懂个人疑问,请参考下面这个图。

前提,ConcurrentHashMap是并发扩容,能够有多个线程同时扩容,其次若是如上图红线那样,oldTab中有多个桶中的数据迁移到newTab中的同一个桶中,若是出现这种状况就意味着存在线程安全性问题。

从上图5-1中,两个数据迁移的方向可知,扩容前,oldIndex不一样就表示不在一个桶,扩容后的两个去向若是oldIndex不同,也必定不在同一个桶。因此不会出现5-2图中红线的那种状况,也就说明在扩容过程当中不须要锁newTab。佩服+2

 

5.2.二、数据迁移

//ln新链表,不须要移动的节点从新组组织成的链表。 //hn新链表,须要移动的节点从新组织成的链表
Node<K,V> ln, hn;
int runBit = fh & n;

看两个变量,上面说过扩容后,旧桶中的数据只有两个迁移的方向。ln正是数据迁移后索引位依然是oldIndex的数据的链表,hn是迁移后须要迁移到oldCap + oldIndex索引位的链表。

关注一下runBit变量,若是 runBit == 0 成立则说明迁移后桶的索引位依然是oldIndex。详见HashMap扩容分析。

重点关注一下start到end之间的代码

关于这段代码,首选咱们假设一种极端状况,若是当前正在移动的桶中的数据在rehash以后,数据迁移的目标桶除了第一个节点的目标桶是oldIndex以外,后面的数据的目标桶都是oldIndex + oldCap。咱们还须要处理后面的节点吗?不须要,由于只须要将第二个节点移动到newTab的oldIndex + oldCap位置便可。第二个元素也就是lastRun变量。相对于HashMap彻底的将数据组织成两个链表,这也算得上是一个性能上的优化吧。

接着往下看

代码段1:

1 for (Node<K,V> p = f.next; p != null; p = p.next) { 2     int b = p.hash & n; 3     if (b != runBit) {//相同跳过 4         runBit = b; 5         lastRun = p; 6  } 7 }

以上代码经过对链表的一次扫描决定了lastRun。

代码段2:

1 if (runBit == 0) {// runBit== 0 表示该节点不须要移动
2     ln = lastRun; 3     hn = null; 4 } 5 else { 6     hn = lastRun; 7     ln = null; 8 }//end

 

根据lastRun指向的节点的runBit决定后续节点在扩容后是oldIndex + oldCap仍是oldIndex。

代码段3:

1 for (Node<K,V> p = f; p != lastRun; p = p.next) { 2     int ph = p.hash; K pk = p.key; V pv = p.val; 3     if ((ph & n) == 0) 4         ln = new Node<K,V>(ph, pk, pv, ln); 5     else
6         hn = new Node<K,V>(ph, pk, pv, hn); 7 }

 

上述代码会从新组织两个新链表。注意这个迭代到lastRun位置结束,由于以上过程已经肯定了lastRun的归属。

看一下 ln = new Node<K,V>(ph, pk, pv, ln); 从新组织链表的代码,也就是ln会成为新new出来的node的下一个节点。

这样有什么问题?问题就是节点在旧桶中的相对顺序在新桶中将相反。也就是next的指针翻转一下。能够看一下node的构造函数就明了了。

 

演示扩容过程

假设当前扩容前oldCap即oldTab的长度为2,扩容后newCap即newTab的长度为4。以下图看扩容过程,橘色的表明迁移后索引位依然是oldIndex,绿色表明扩容后索引位为oldIndex + oldCap。

 

 

 上述代码段1迭代找到了lastRun即指向node(11),代码段2将lastRun赋值给hn。代码段3执行过程以下

1,将node(1)拼接到ln 2,将node(3)拼接到hn,此时注意,hn已经lastRun指向的节点node(11),此时hn=3—>11—>15—>19—>null
3,处理node(5)拼接到ln 4,处理...

 

对比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在扩容后对节点相对顺序的保证方面,JDK7 HashMap是彻底倒序。JDK8 HashMap不改变相对顺序。JDK8 ConcurrentHashMap 保证部分节点的相对顺序,其他的倒序。

题外话,从代码风格和死路上,猜想一下ConcurrentHashMap应该是来自JDK7的HashMap。

 

1 setTabAt(nextTab, i, ln); 2 setTabAt(nextTab, i + n, hn); 3 setTabAt(tab, i, fwd); 4 advance = true;

ln和hn两个链表各回各家各找各妈。

 

回过头来再看put方法中的帮助扩容

1 else if ((fh = f.hash) == MOVED) 2     tab = helpTransfer(tab, f);

 

在put方法中有这样一行判断,当f.hash = MOVED即当前HashMap正在扩容中,则当前线程会去尝试帮助扩容。

 1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {  2     Node<K,V>[] nextTab; int sc;  3     if (tab != null && (f instanceof ForwardingNode) &&//条件判断
 4         (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//从fwd节点中取出新table
 5         int rs = resizeStamp(tab.length);  6         while (nextTab == nextTable && table == tab &&
 7                (sc = sizeCtl) < 0) {  8             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 9                 sc == rs + MAX_RESIZERS || transferIndex <= 0) 10                 break; 11             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//修改sizeCtl = sizeCtl + 1,表示多了一个线程参与扩容
12  transfer(tab, nextTab); 13                 break; 14  } 15  } 16         return nextTab; 17  } 18     return table; 19 }

 

在helpTransfer方法中会首先作一系列判断,经过fwd节点获取到nextTab即新的数组。经过CAS 实现sizeCtl++操做,表示多了一个线程进行扩容,由于在扩容方法中对扩容线程数量有控制。

 

最后的最后,扩容的时机

说一下触发扩容的操做,总的来讲就是put操做,可是有两个时机很重要,其一就是addCount方法中,每次put一个元素,在addCount方法中都会判断需不须要进行扩容。另外就是treeifyBin方法中,若是桶中数据超过了8个而且数组长度<64则不会进行树化,而是进行扩容。关于这个在HashMap源码介绍中也有介绍。你想一想,假如容量为16,你就插入了9个元素,巧了,都在同一个桶里面,若是这时进行树化,树化自己就是一个耗时的过程。时间复杂度会增长,性能降低,不如直接进行扩容,空间换时间。

终于扩容过程写完了。很经典,想读懂也很费劲。

6、get过程源码

 1 public V get(Object key) {  2     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  3     int h = spread(key.hashCode());//hash
 4     if ((tab = table) != null && (n = tab.length) > 0 &&
 5         (e = tabAt(tab, (n - 1) & h)) != null) {//取桶
 6         if ((eh = e.hash) == h) {//key相同直接返回
 7             if ((ek = e.key) == key || (ek != null && key.equals(ek)))  8                 return e.val;  9  } 10         else if (eh < 0)//hash < 0 表示正在扩容 11             //在这里须要很是注意的一点,扩容后的桶会放入fwd节点 12             //该节点hash = MOVED,fwd.nextTable为扩容后新的数组。
13             return (p = e.find(h, key)) != null ? p.val : null; 14         while ((e = e.next) != null) {//迭代链表
15             if (e.hash == h &&
16                 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 17                 return e.val; 18  } 19  } 20     return null; 21 }

 

get源码只关注下面这行

return (p = e.find(h, key)) != null ? p.val : null;

 

当该桶已经被移动,则经过e.find方法去nextTab新数组查找。首先在5章节resize扩容方法中,已经扩容的桶会被塞进去一个ForwardingNode节点 setTabAt(tab, i, fwd); 继续看resize方法中ForwardingNode的初始化会发现是这样初始化的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ,看它的构造方法

static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }

 

不难发下其初始化方法接收一个nextTab也就是扩容后的新数组,并将该数组赋值给其内部变量nextTable。也就是说当get发现桶已经扩容后,咱们能够从fwd节点中找到新的数组。并重新的数组中找到新的目标桶并进行元素查找。

看了以上代码,回到 e.find(h, key)) ,须要明确的是e就是ForwardingNode节点。看看find方法

 1 Node<K,V> find(int h, Object k) {  2     // loop to avoid arbitrarily deep recursion on forwarding nodes
 3     outer: for (Node<K,V>[] tab = nextTable;;) {//迭代nextTable
 4         Node<K,V> e; int n;  5         if (k == null || tab == null || (n = tab.length) == 0 ||
 6             (e = tabAt(tab, (n - 1) & h)) == null)  7             return null;  8         for (;;) {  9             int eh; K ek; 10             if ((eh = e.hash) == h &&
11                 ((ek = e.key) == k || (ek != null && k.equals(ek)))) 12                 return e; 13             if (eh < 0) { 14                 if (e instanceof ForwardingNode) { 15                     tab = ((ForwardingNode<K,V>)e).nextTable; 16                     continue outer; 17  } 18                 else
19                     return e.find(h, k); 20  } 21             if ((e = e.next) == null) 22                 return null; 23  } 24  } 25 }

OK,很明确了,确实是从nextTable中查找的。

得出一个结论,ConcurrentHashMap扩容不影响get操做。也就是在扩容过程当中能够并发读。

7、ConcurrentHashMap并发控制

详细看了ConcurrentHashMap put resize get过程的源码,本章从总体上看一下ConcurrentHashMap的并发控制。

下面结合图片咱们看一下ConcurrentHashMap的并发过程。

 

 

 

 

 如上图,线程1进行put操做,这时发现size > sizeCtl。开始进行扩容

 

 

 

 

此时线程1已经完成oldTab中索引[2,16)中的扩容。正在进行索引为1的桶的扩容。接下来线程2执行get。

 

 

 

 

线程2根据get逻辑和key的hash,可能访问的三种状况如上图所示

状况一:访问蓝色号桶,即未扩容的桶。该桶还未进行扩容,因此在桶中找到对应元素,返回。

状况二:访问绿色桶,即正在扩容的桶。该桶正在扩容,在扩容过程当中,线程1持有Synchronized锁,线程2只能自旋等待。

状况三:访问橘色桶,该桶已扩容的桶。该桶已扩容,oldTab中是fwd节点,hash=-1,因此执行fwd节点的find逻辑,fwd节点持有newTab(nextTable),因此线程2去newTab中查找对应元素,返回。

 

 如上图4,当线程1进行扩容时,线程3进来执行put,一样存在三种可能的状况

状况一:访问蓝色桶,即未扩容的桶。正常执行put逻辑。

状况二:访问绿色桶,即正扩容的桶。由于线层1持有Synchronized锁,线程3将一直自旋,等待扩容结束。

状况三:访问橘色桶,即已扩容的桶。由于已扩容的桶,在oldTab中是fwd节点,hash = -1 = MOVED,因此线程3执行帮助扩容的逻辑。等待扩容完成,线程3继续完成put逻辑。

 

OK,以上就是ConcurrentHashMap关于get put resize的并发控制,从以上过程可见,存在锁竞争的状况颇有限,即便存在锁竞争,也是进行自旋,而不会阻塞线程。可见ConcurrentHashMap能作到高效的并发读。

在put过程当中,由于若是存在线程正在已经扩容,则帮助进行扩容(协助扩容这块,有一个步长的概念,同时进行扩容的线程和table的长度有关)。若是当前桶正在进行扩容,则被Synchronized锁拒之门外,自旋等待扩容结束。若是访问的是未扩容的桶,则执行正常的put逻辑。可见整个过程当中,因为锁的粒度很小,put作到了高效的并发写,也作到了高效的扩容。

总之一句话ConcurrentHashMap的高并发是经过 CAS乐观锁 + 自旋锁 + 细粒度 保证的。

 

  若有错误的地方还请留言指正。
  原创不易,转载请注明原文地址:https://www.cnblogs.com/hello-shf/p/12183263.html

原文出处:https://www.cnblogs.com/hello-shf/p/12183263.html

相关文章
相关标签/搜索