原文发布在个人我的博客中killCodehtml
由于JDK1.8 与 1.7 里对ConcurrentHashMap 有不少不一样的更改以提升性能。因此特别找出相似的方面,进行分析。java
//初始容积为 16 private static final int DEFAULT_CAPACITY = 16; //加载因子 0.75 private static final float LOAD_FACTOR = 0.75f; /** * 盛装Node元素的数组 它的大小是2的整数次幂 * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table; /* * hash表初始化或扩容时的一个控制位标识量。 * 负数表明正在进行初始化或扩容操做 * -1表明正在初始化 * -N 表示有N-1个线程正在进行扩容操做 * 正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小 * * **既表明 HashMap 的 threshold** * 又表明 **进行扩容时的进程数** */ private transient volatile int sizeCtl; // 如下两个是用来控制扩容的时候 单线程进入的变量 // resize校验码 private static int RESIZE_STAMP_BITS = 16; // resize校验码的位移量。 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点 static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点 static final int RESERVED = -3; // hash for transient reservations //在 spread() 方法中 用来对 hashcode 进行 高位hash 减小可能发生的碰撞。 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
上面的 sizectl 很重要。是解决 concurrenthashmap 扩容的基础node
与 HashMap
最大的区别是 加入了对val 与 next 用了volatile关键字修饰
而且 setValue() 方法 直接抛出异常,能够看出,val 是不能直接改变的。
是经过 Unsafe 类的 方法进行所有替换算法
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; //相比于 HashMap ,加入了 volatile 关键字 volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); }
与 HashMap
不一样的是编程
此次 TreeNode
再也不是继承自 LinkedHashMap.Entry 而是继承自本类中的 Node.数组
并不直接用于红黑树的结点,而是将 结点包装成 TreeNode 后,用下面的 TreeBin 进行二次包装。安全
优势是可使用 Node 类的 next 指针,方便TreeBin 后续 从 链表
到 红黑树
的转换。
构造函数能够看出,原先对TreeNode 的初始化只是设置了其的后续结点。组成了链表。多线程
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; }
特色: 1. 不持有key与val ,指向TreeNode 的 root 与 list。并发
2. 加入读写锁。方便并发的访问。
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; //经过锁的状态 , 判断锁的类型。 volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock
构造方法以下
root 表明 TreeNode 的根结点
使用first ,是用于第一次初始化时,由于root的特殊性,因此不便于 this.root = b
所以经过 first代替第一次的初始化过程。
而后在 过程当中 用r 表明root ,直到结束 红黑树的初始化后,再 root =r
保证root的安全性。app
TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
做用是在 transfer() 过程当中,插入到 TreeBin 之间,用做连接做用。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
Unsafe提供了硬件级别的原子操做。内部的方法均为 native方法
,能够访问系统底层。
这里用了 CAS 算法(compare and swap) 大大的避免了使用时对性能的消耗,以及保证了使用时的安全性。
**注:** CAS 算法的核心是 将须要改变的参数,与内存中已经存在的变量的值进行对比,一致就改变,不一致就放弃此次操做。与之相相似的优化操做还有 LL/SC(Load-Linked/Store-Conditional : 加载连接/条件存储) 、 Test-and-Set(测试并设置)
这里额外介绍一下 Unsafe 类的 compareAndSwapInt
方法。
/** * 比较obj的offset处内存位置中的值和指望的值,若是相同则更新。此更新是不可中断的。 * * @param obj 须要更新的对象 * @param offset obj中整型field的偏移量 * @param expect 但愿field中存在的值 * @param update 若是指望值expect与field的当前值相同,设置filed的值为这个新值 * @return 若是field的值被更改返回true */ public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
下面是 ConcurrentHashMap 中有关的应用
// Unsafe mechanics private static final sun.misc.Unsafe U; //对应于 类中的 sizectl private static final long SIZECTL; //在 transfer() 方法的使用时,计算索引 private static final long TRANSFERINDEX; // 用于对 ConcurrentHashMap 的 size 统计。 // 下文 第8点关于 size 会说明。 private static final long BASECOUNT; // 辅助类 countercell 类中的属性,用于分布式计算 // 是实现 java8 中 londAddr 的基础 private static final long CELLSBUSY; private static final long CELLVALUE; // 用来肯定在数组中的位置 // 数组中的偏移地址 private static final long ABASE; // 数组中的增量地址 private static final int ASHIFT; static { try { //经过反射调用 类中的值,从而对 这些变量赋值 U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
在操做过程当中,常常会看到如下几个,或者相相似的方法。
其核心是
//得到 i 位置上的 Node 节点 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法设置i位置上的Node节点。 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法设置节点位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
调用ConcurrentHashMap的构造方法仅仅是设置了一些参数而已,而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。
当向 map 插入数据的时候 table == null , 则会调用 initTable()方法 。
用 put 方法 简单展现一下。
final V putVal(K key, V value, boolean onlyIfAbsent) { ... ... for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); ... ... }
initTable() 方法展现以下
其中有 sizectl 变量,这里回顾一下
hash表初始化或扩容时的一个控制位标识量。 负数表明正在进行初始化或扩容操做 -1表明正在初始化 -N 表示有N-1个线程正在进行扩容操做 正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl <0 表示有其余线程正在进行初始化操做,把线程挂起。对于table的初始化工做,只能有一个线程在进行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //至关于0.75*n 设置一个扩容的阈值 // sc = n - n/4 sc = n - (n >>> 2); } } finally { // 更新 sizectl sizeCtl = sc; } break; } } return tab; }
当ConcurrentHashMap容量不足的时候,须要对table进行扩容。这个方法的基本思想跟HashMap是很像的,可是因为它是支持并发扩容的,因此要复杂的多。缘由是它支持多线程进行扩容操做,而并无加锁。我想这样作的目的不只仅是为了知足concurrent的要求,而是但愿利用并发处理去减小扩容带来的时间影响。由于在扩容的时候,老是会涉及到从一个“数组”到另外一个“数组”拷贝的操做,若是这个操做可以并发进行,那真真是极好的了。
整个扩容操做分为两个部分:
1. 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操做是单线程完成的。这个单线程的保证是经过RESIZE_STAMP_SHIFT这个常量通过一次运算来保证的,这个地方在后面会有提到; 2. 第二个部分就是将原来table中的元素复制到nextTable中,这里容许多线程进行操做。
先来看一下单线程是如何完成的:
它的大致思想就是遍历、复制的过程。首先根据运算获得须要遍历的次数i,而后利用tabAt方法得到i位置的元素:
1. 若是这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点; 2. 若是这个位置是Node节点(fh>=0),就构造两个链表,一个表明高位为 0 , 一个表明高位为 1 。将原来的结点 分别放在nextTable的i和i+n的位置上,而且除了lastRun的位置相对位于链表的底部外,其他元素均为 **反序** 。 3. 若是这个位置是TreeBin节点(fh<0),也作一个处理,而且判断是否须要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
遍历过全部的节点之后就完成了复制工做,这时让nextTable做为新的table,而且更新sizeCtl为新容量的0.75倍 ,完成扩容。
再看一下多线程是如何完成的:
//若是遍历到ForwardingNode节点 说明这个点已经被处理过了,直接跳过 这里是控制并发扩容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed
这是一个判断,若是遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另外一个线程看到forward,就向后遍历。这样交叉就完成了复制工做。并且还很好的解决了线程安全的问题。
如图:
下面是源码:
/** * 一个过渡的table表 只有在扩容的时候才会使用 */ private transient volatile Node<K, V>[] nextTable; /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { int n = tab.length, stride; // 经过计算 NCPU CPU的核心数与 表的大小的比值,将表进行范围的细分,以方便 并发。 // 感受上 有点像 segment 分段锁的意思。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { //构造一个nextTable对象 它的容量是原来的两倍。 @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME //原来的 容量限制为 1<<30 //HashMap 在扩容时,会用 resize() 方法,扩大 threshold 的值 //当大于 MAXIMUM_CAPACITY 时,会将 threshold 设置为 Integer.MAX_VALUE sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);//构造一个连节点指针 用于标志位 boolean advance = true;//并发扩容的关键属性 若是等于true 说明这个节点已经处理过 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0; ; ) { Node<K, V> f; int fh; //这个while循环体的做用就是在控制i递减 经过i能够依次遍历原hash表中的节点 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //若是全部的节点都已经完成复制工做 就把nextTable赋值给table 清空临时对象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然至关于如今容量的0.75倍 return; } //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操做 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //若是遍历到的节点为空 则放入ForwardingNode指针 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //若是遍历到ForwardingNode节点 说明这个点已经被处理过了,直接跳过 这里是控制并发扩容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //节点上锁 synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; //若是fh>=0 证实这是一个Node节点 if (fh >= 0) { // runBit 表明正在 运行的 Node 节点的 分类 // 所以链表根据高位为0或者1分为两个子链表,高位为0的节点桶位置没有发生变化,高位为1的节点桶位置增长了n, // 因此有setTabAt(nextTab, i, ln);和 setTabAt(nextTab, i + n, hn); // n = 2的幂 。 二进制 0001000 // fh & n = 1. 1000 // 2. 0000 因此划分出两个链表。 int runBit = fh & n; // lastRun 是正在运行的节点 Node<K, V> lastRun = f; //如下的部分在完成的工做是构造两个链表 一个是高位为 0 的链表 另外一个是高位为 1 的链表 // 找出最后一个 与后面的结点不一样的 结点 for (Node<K, V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 将最后一个 结点保存起来 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; //这个链表是从低层向上构建 // ln 或 hn = lastRun, 构建一个 node 结点 // 其下一个结点为 lastRun 。 if ((ph & n) == 0) // 构建低位链表 ln = new Node<K, V>(ph, pk, pv, ln); else // 构建高位链表 hn = new Node<K, V>(ph, pk, pv, hn); } //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另外一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就能够执行 --i 操做 advance = true; } //对TreeBin对象进行处理 与上面的过程相似 else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> lo = null, loTail = null; TreeNode<K, V> hi = null, hiTail = null; int lc = 0, hc = 0; //构造高位和低位两个链表 for (Node<K, V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K, V> p = new TreeNode<K, V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //若是扩容后已经再也不须要tree的结构 反向转换为链表结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t; //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另外一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就能够执行 --i 操做 advance = true; } } } } } }
put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i。
注:1. hash = spread(key.hashCode()) 2. spread(int h) --> return (h ^ (h >>> 16)) & HASH_BITS; --> 经过hashCode()的高16位异或低16位优化高位运算的算法 3. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
若是i位置是空的,直接放进去,不然进行判断,
若是i位置是树节点,按照树的方式插入新的节点,不然把i插入到链表的末尾
不一样点:ConcurrentHashMap不容许key或value为null值。
多线程的状况下:
若是一个或多个线程正在对ConcurrentHashMap进行扩容操做,当前线程也要进入扩容的操做中。这个扩容的操做之因此能被检测到,是由于transfer方法中在空结点上插入forward节点,若是检测到须要插入的位置被forward节点占有,就帮助进行扩容; --> helpTransfer() 方法。
若是检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,可是仍是会比hashTable的synchronized要好得多。
首先判断这个节点的类型。若是是链表节点(fh>0),则获得的结点就是hash值相同的节点组成的链表的头节点。须要依次向后遍历肯定这个新加入的值所在位置。若是遇到hash值与key值都与新加入节点是一致的状况,则只须要更新value值便可。不然依次向后遍历,直到链表尾插入这个结点。
若是加入这个节点之后链表长度大于8,就把这个链表转换成红黑树。
若是这个节点的类型已是树节点的话,直接调用树节点的插入方法进行插入新的值。
源码以下:
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //不容许 key或value为null if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); //计算该链表 节点的数量 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 第一次 put 操做的时候初始化,若是table为空的话,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); //根据hash值计算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 根据对应的key hash 到具体的索引,若是该索引对应的 Node 为 null,则采用 CAS 操做更新整个 table // 若是这个位置没有值 ,直接放进去,不须要加锁 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //当遇到表链接点时,须要进行整合表的操做 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 结点上锁,只是对链表头结点做锁操做 synchronized (f) { if (tabAt(tab, i) == f) { //fh > 0 说明这个节点是一个链表的节点 不是树的节点 if (fh >= 0) { binCount = 1; //在这里遍历链表全部的结点 //而且计算链表里结点的数量 for (Node<K,V> e = f;; ++binCount) { K ek; //若是hash值和key值相同 则修改对应结点的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //若是遍历到了最后一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部 if ((e = e.next) == null) { // 插入到链表尾 pred.next = new Node<K,V>(hash, key, value, null); break; } } } //若是这个节点是树节点,就按照树的方式插入值 else if (f instanceof TreeBin) { // 若是是红黑树结点,按照红黑树的插入 Node<K,V> p; // 若是为树节点, binCount一直为2,不会引起扩容。 binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 若是这个链表结点达到了临界值8,那么把这个链表转换成红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //将当前ConcurrentHashMap的元素数量+1,table的扩容是在这里发生的 addCount(1L, binCount); return null; }
出现于 put 方法
以下地点
//当遇到表链接点时,须要进行整合表的操做 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
helpTransfer() 方法的源码以下
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 当前 table 不为 null , 且 f 为 forwardingNode 结点 , 且存在下一张表 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length);//计算一个扩容校验码 // 当 sizeCtl < 0 时,表示有线程在 transfer(). while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //正常状况下 sc >>> RESIZE_STAMP_SHIFT == resizeStamp(tab.length); if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //将 扩容的线程先行减一,表示,这是来辅助 transfer,而非进行 transfer的线程。 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
涉及变量 MIN_TREEIFY_CAPACITY = 64;
若是数组长度n小于阈值MIN_TREEIFY_CAPACITY,默认是64,则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,从新调整节点的位置。
出现于 put 方法
以下地点
if (binCount != 0) { // TREEIFY_THRESHOLD 默认为 8. if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
其中源码以下:
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 将原来的数组扩大为原来的两倍 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
这里讲一个 JDK8 中设计的很是巧妙的算法。看了很久才看懂。
出自 tryPresize 方法中的如下位置
//数组的最大容积为 1<<30 。若是数组大小超过 1<<29 ,则将最大大小设置为 MAXIMUM_CAPACITY //不然,设置为原来的两倍。 private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
下面让咱们来分析一下,tableSizeFor()
这个算法的目的,是得出相比较于给定参数,返回一个恰好比参数大的 2次幂 整数。
static final int tableSizeFor(int cap) { int n = cap - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
先来分析有关n位操做部分:先来假设n的二进制为01xxx...xxx。接着
对n右移1位:001xx...xxx,再位或:011xx...xxx
对n右移2为:00011...xxx,再位或:01111...xxx
此时前面已经有四个1了,再右移4位且位或可得8个1
同理,有8个1,右移8位确定会让后八位也为1。
综上可得,该算法让最高位的1后面的位全变为1。
最后再让结果n+1,即获得了2的整数次幂的值了。
如今回来看看第一条语句:
int n = cap - 1;
让cap-1再赋值给n的目的是另找到的目标值大于或等于原值。例如二进制1000,十进制数值为8。若是不对它减1而直接操做,将获得答案10000,即16。显然不是结果。减1后二进制为111,再进行操做则会获得原来的数值1000,即8。
引用自(http://www.cnblogs.com/loadin...
经过 key值 搜索 value 值。
而且要 经过分辨 结点的种类,进行不一样形式的寻找。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //根据hash值肯定节点位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //若是搜索到的节点key与传入的key相同且不为null,直接返回这个节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //若是eh<0 说明这个节点在树上 直接寻找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //不然遍历链表 找到对应的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
《并发编程实战》中有提到,size返回的结果在计算时可能已通过期了,它实际上只是一个估计值,所以容许size返回一个近似值,而不是一个精确值。
从注释中能够看出,这是从 LongAdder 类中的思想,拷贝过来的一个类。
LongAdder 类 是 JDK 1.8 新引进的类,其思想:
多个线程持有本身的加数(cell),线程个数增长时,会自动提供新的加数。 当全部工做作完后,再提供新的加数。
有时间写一篇相关的源码分析~ 逃~
不过,这里同样不能精确统计,这里的 CounterCell 等同于 LongAdder.Cell sumCount() 等同于 LongAdder.sum()方法。
执行逻辑是同样的。
就 LongAdder 类中的 sum 方法所说, 当有线程在运行时,同样只是估计值,只有当全部线程执行完毕,才是实际值。
而统计 Size ,不可以像垃圾清除同样,有 Safe point 或 Safe region ,因此,这个假设不成立。。。
其相关的源码以下。
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //执行逻辑 final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
就官方文档中所说, mappingCount 方法,应该取代 size 方法,
但这个方法得出的值同样在线程运行的时候,只是一个估计的值。
从源码中就能够看出,使用的是上文分析的 sumCount() 方法。
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }
出自于 put 方法的以下位置
//将当前ConcurrentHashMap的元素数量+1 addCount(1L, binCount); return null; }
统计上:
这里用到 CounterCell类,而且统计的值的计算同样是采用的 sumCount() 方法。
因此缺点如上,再也不阐述。
扩容上:
逻辑与 helpTransfer() 相似,都是判断是否有多个线程在执行扩容,而后判断是否须要辅助 transfer();
源码以下
private final void addCount(long x, int check) { //用到了 CounterCell 类 CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //若是check值大于等于0 则须要检验是否须要进行扩容操做 //下面的逻辑与 helpTransfer() 相似,能够与 helpTransfer() 一块儿参考。 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); //若是已经有其余线程在执行扩容操做 if (sc < 0) { //校验失效,直接退出。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //当前线程是惟一的或是第一个发起扩容的线程 此时nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }