ConcurrentHashMap与 HashMap 同样,都是键值对数据存储结构,不过 HashMap 是非线程安全,而ConcurrentHashMap是线程安全的,提及线程安全又不得不与HashTable对比一下,HashTable相比ConcurrentHashMap效率低,接下来看看ConcurrentHashMap究竟是如何保证线程安全且比HashTable优秀(JDK1.8)算法
仍是从put()开始数组
public V put(K key, V value) { return putVal(key, value, false); }
put()会再调一个putValue(),由于要传入 onlyIfAbsent 参数赞成更新元素,与HashMap不同的是 key的散列算法不是在put()完成,而是在putValue()完成安全
/** * key:储存到HashMap的key * value:储存到HashMap的key对应的value * onlyIfAbsent:若是包含了该key,则不更新对应的值,众所周知,put()除了新增以外,还有更新的功能,是由于put()调用putVal()时候传的都是false */ final V putVal(K key, V value, boolean onlyIfAbsent) { //ConcurrentHashMap的键值都不容许为null if (key == null || value == null) throw new NullPointerException(); //计算key的hash值 int hash = spread(key.hashCode()); // 记录当前储存的位置是否须要转行成红黑树结构 int binCount = 0; // 循环ConcurrentHashMap的Node数组,找出对于存储的位置 for (ConcurrentHashMap.Node<K,V>[] tab = table;;) { // 定义了几个变量: // f:存放在数组相同下标位置的第一个元素 // n:当前ConcurrentHashMap的Node数组的长度 // i:当前存储元素须要存放的数组下标 // fh : 存放在数组相同下标位置的第一个元素的hash值 ConcurrentHashMap.Node<K,V> f; int n, i, fh; // 判断ConcurrentHashMap的Node数组长度,若是等于null或者长度等于0的状况,则进行扩容操做 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 判断当前元素须要存储的数组位置是否为null,若是是null的状况下则尝试存放数据 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 这里使用了CAS操做(乐观锁),尝试存放元素,若是元素存放成功则直接结束当前put(),不然继续找位置 if (casTabAt(tab, i, null, new ConcurrentHashMap.Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 这里判断当前ConcurrentHashMap是否正在进行扩容操做 // 若是是在进行扩容操做,则当前线程也帮忙扩容,而且存放元素 else if ((fh = f.hash) == MOVED) // 帮忙进行扩容操做,helpTransfer() 这个函数是帮助正在扩容的线程 // 一块儿进行扩容操做而且存放元素 tab = helpTransfer(tab, f); else { // 定义了一个指向旧值的变量,当本次put()操做是更新操做时,则使用该变量指向旧值 // 而且更新完成以后返回旧值 V oldVal = null; // 上面也说过 f 是每一个相同元素下标位置的首个元素, // 这里对 f 进行了加锁,这样能锁住全部即将存在 i 位置元素的put()操做 // 而且不影响其余下标位置的元素插入,相比HashTable的方法锁大大提升了性能 synchronized (f) { // 若是 f 没又改变,则继续操做,不然重复第一个循环 if (tabAt(tab, i) == f) { // 判断当前节点是否链表节点 if (fh >= 0) { // 记录当前储存的位置是否须要转行成红黑树结构 binCount = 1; // 遍历链表找出存放的位置 for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) { K ek; // 若是遍历到的节点 key一致,则进行更新操做 而且oldVal变量存储旧值,用于后续返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是一致没有相同的key,则在链表最后添加新节点 ConcurrentHashMap.Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new ConcurrentHashMap.Node<K,V>(hash, key, value, null); break; } } } // 若是当前数组下标节点等于 TreeBin 的时候,则进行按红黑树规则进行添加或修改操做 else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.Node<K,V> p; binCount = 2; if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 判断当前链表的长度是否须要转换成红黑树结构 // static final int TREEIFY_THRESHOLD = 8; // 当链表长度大于等于 8 的时候,则进行转换成红黑树操做 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 记录当前ConcurrentHashMap长度而且判断是否须要进行扩容操做 addCount(1L, binCount); return null; }
put()操做使用了synchronized 与CAS乐观锁,当存储在数组相同位置的元素为null时候,会使用CAS操做进行尝试存储,若是存储成功则直接走最后的记录长度和判断是否须要扩容流程。数组相同位置的元素不为null,则先使用 synchronized 锁住该对象,这样就锁住了该数组下标的整个数据结构(链表或者红黑树的管理者TreeBin)再进行与HashMap大体相同的链表或者红黑树存储规则数据结构
下面随便画了一个 此处 synchronized 锁的图多线程
接下来看看初始化容量函数 initTable()并发
private final ConcurrentHashMap.Node<K,V>[] initTable() { ConcurrentHashMap.Node<K,V>[] tab; int sc; // 判断当前ConcurrentHashMap数组长度是否须要扩容 while ((tab = table) == null || tab.length == 0) { // 判断是否有其余线程在进行初始化容量操做 // 若是 sizeCtl < 0 则说明有其余线程在进行初始化容量操做 if ((sc = sizeCtl) < 0) // 若是有其余线程在进行初始化容量,则下降当前线程的优先级 Thread.yield(); // lost initialization race; just spin // 若是没有其余线程进行初始化容量则使用CAS操做尝试获取到初始化容量的资格 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 再判断一次是否须要初始化容量 // 当首个线程先取得初始化容量资格时候,很大可能已经完成了 // 其余减低了优先级的线程即便再获取到容量初始化资格都会被该判断阻挡 if ((tab = table) == null || tab.length == 0) { // 获取默认初始化容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化ConcurrentHashMap 数组 @SuppressWarnings("unchecked") ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } // 返回初始化的数组 return tab; }
initTable()主要也仍是使用到了CAS操做,使只有一个线程常常初始化,而且table使用了volatile修饰线程共享资源,初始化一旦完成,其余线程立马知道了该table已经初始化,在高并发多线程状况下,有些被下降了优先级的线程后面也会经过CAS获取到初始化资格,可是下面还有一个判断table的长度,就无需再进行扩容操做了,因此ConcurrentHashMap的容量初始化是由双重校验+CAS+volatile保证了其线程安全。HashMap里面扩容与初始化容量都是同一个函数,而ConcurrentHashMap则是分开了2个函数,初始化容量一个,扩容一个(transfer())dom
transient volatile Node<K,V>[] table;
下面看看扩容函数 transfer()与协助扩容函数 helpTransfer()ide
/** * tab:当前的table * nextTab:扩容后的table */ private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) { int n = tab.length, stride; // 获取CPU每一个核的处理量,默认16 // private static final int MIN_TRANSFER_STRIDE = 16; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 判断扩容后的 table 是否为null // 若是为null则在此处进行建立新 table if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 记录新的table nextTable = nextTab; // 记录旧的table长度 transferIndex = n; } // 记录新 table的长度 int nextn = nextTab.length; // 建立一个 ForwardingNode 而且说明当前的table正在进行扩容操做 ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab); // 记录当前 f 当前数组下标位置的某个元素数据是否须要计算新table下的新下标 boolean advance = true; // 记录是否完成数据扫描 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { ConcurrentHashMap.Node<K,V> f; int fh; // 遍历旧table中的节点,计算新table中的下标 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 当该旧table所有节点转移到新的table上时,结束扩容 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // talbe 指向新table nextTable = null; table = nextTab; //扩容阈值设置为原来容量的1.5倍 sizeCtl = (n << 1) - (n >>> 1); return; } // 使用CAS操做更新这个扩容阈值 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 判断存放节点的数组下标位置是否为null // 若是为null则尝试存放 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 若是不为null则判断改位置是否已经转移完成 // 若是是则再也不转移当前下标位置的元素 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 锁住数组的下标的第一个元素 synchronized (f) { // 判断 f 的值有没有被修改 // 若是 f 没又改变,则继续操做,不然重复第一个循环 if (tabAt(tab, i) == f) { ConcurrentHashMap.Node<K,V> ln, hn; // 判断是否链表结构 // 若是是,则根据链表结构的存储规则存储元素 if (fh >= 0) { int runBit = fh & n; ConcurrentHashMap.Node<K,V> lastRun = f; for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) { // 把节点存放新table的原位或者移动 n 位存放 // 其实就是计算该节点是存放在当前下标位置仍是 i+n的下标位置 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln); else hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn); } // 经过上面的计算把ln链表放在本来的下标位置 // 把hn链表放在 本来下标+n的位置 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 在旧table存放 ForwardingNode 表明该数组下标位置的数据已经完成转移 setTabAt(tab, i, fwd); advance = true; } // 判断是否红黑树结构 // 若是是,则根据红黑树结构的存储规则存储元素 else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f; ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null; ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V> (h, e.key, e.val, null, null); // 与上面链表的操做大体相同 // 计算当前元素存放的位置 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 若是经过计算分开重组后的红黑树长度小于等于6则转换成链表 // static final int UNTREEIFY_THRESHOLD = 6; ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t; // 把计算后存放在原来位置的 ln继续存放在原来的位置 // hn存放在 i+n 的位置 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 在旧table存放 ForwardingNode 表明该数组下标位置的数据已经完成转移 setTabAt(tab, i, fwd); advance = true; } } } } } }
final ConcurrentHashMap.Node<K,V>[] helpTransfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V> f) { ConcurrentHashMap.Node<K,V>[] nextTab; int sc; // 判断当前 table是否在进行扩容操做 if (tab != null && (f instanceof ConcurrentHashMap.ForwardingNode) && (nextTab = ((ConcurrentHashMap.ForwardingNode<K,V>)f).nextTable) != null) { // 获取扩容后的table大小 int rs = resizeStamp(tab.length); // 再次判断扩容操做是否还在进行,而且是同一次扩容操做 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 经过CAS扩容操做尝试获取扩容操做资格 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
每次扩容是容量原来的2倍,而且旧table原元素只会存放在新table的 i 或者 i+n 的位置而不是从新计算hash新的 i ,这样避免了多线程扩容状况下同时操做 i 引发的各类麻烦,而且put()操做的时候有可能会帮忙一块儿扩容,分担了单线程操做的压力,当旧的 i 位置的节点所有转移完成使用 ForwardingNode 标记当前位置因此节点已经转移完成函数
会引发扩容操做的2个函数addCount()与treeifyBin()高并发
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 使用CAS操做更新baseCount if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 多线程修改baseCount时,没修改为功的线程会执行fullAddCount(),把x的值添加到counterCell类中 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // 当check >= 0 的时候证实须要扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 其余线程在初始化,break; break; // 其余线程正在扩容,协助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 只有前线程在扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //若是table的长度<64 就扩大一倍 // 就是说当前 table的长度>64 且链表节点数量 >= 8时候才会转换成红黑树 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 锁住当前数组位置,进行红黑树转换 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //在原来下标的位置 用TreeBin替换掉原来的Node对象 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
addCount()的主要做用是增长当前ConcurrentHashMap的元素数量而且判断是否须要扩容,当容量已经达到了扩容阈值则进行扩容操做
treeifyBin()则是用于进行树结构转换和判断是否须要扩容,当容量 < 64 时则进行扩容而非转换成树结构,只有容量>64且链表长度>=8时才会进行树结构转换
与HashMap不一样的是多了2个内部类:ForwardingNode 与 TreeBin
ForwardingNode:只有扩容操做的时候才会使用到,标记数组的 i 位置是否已经完成
TreeBin:HashMap是直接使用TreeNode,而ConcurrentHashMap则是经过TreeBin来管理TreeNode来使用
ConcurrentHashMap经过CAS和巧妙地运用synchronized 提供了一个安全和高效并且很是精彩解决方案
ConcurrentHashMap的一些简单学习记录到此结束,世界真的很大