Java8较java7的改进:java
改进一:取消segments字段,直接采用transient volatile HashEntry<K,V> table保存数据,采用table数组元素做为锁,从而实现了对每一行数据进行加锁,进一步减小并发冲突的几率。node
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two(2的幂). Accessed directly by iterators. */ transient volatile Node<K,V>[] table;
改进二:将原先table数组+单向链表的数据结构,变动为table数组+单向链表+红黑树的结构。对于hash表来讲,最核心的能力在于将key hash以后能均匀的分布在数组中。若是hash以后散列的很均匀,那么table数组中的每一个队列长度主要为0或者1。但实际状况并不是老是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,可是在数据量过大或者运气不佳的状况下,仍是会存在一些队列长度过长的状况,若是仍是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);所以,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度能够下降到O(logN),能够改进性能。算法
重要属性:数组
/** * 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是全部修改都在CAS中进行,可是sizeCtl为何不设计成LongAdder(jdk8出现的)类型呢? * 或者设计成AtomicLong(在高并发的状况下比LongAdder低效),这样就能减小本身操做CAS了。 * * 默认为0,用来控制table的初始化和扩容操做,具体应用在后续会体现出来。 * -1 表明table正在初始化 * -N 表示有N-1个线程正在进行扩容操做 * 其他状况: *一、若是table未初始化,表示table须要初始化的大小。 *二、若是table初始化完成,表示table的容量,默认是table大小的0.75 倍,竟然用这个公式算0.75(n - (n >>> 2))。 **/ private static final long SIZECTL; private static final long TRANSFERINDEX; /** * races. Updated via CAS. * 记录容器的容量大小,经过CAS更新 */ private static final long BASECOUNT; /** * 自旋锁 (锁定经过 CAS) 在调整大小和/或建立 CounterCells 时使用。 在CounterCell类更新value中会使用,功能相似显示锁和内置锁,性能更好 * 在Striped64类也有应用 */ private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; /** * Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。 * @param <K> * @param <V> */ static class Node<K,V> implements Entry<K,V> { final int hash; final K key; volatile V val; .. volatile Node<K,V> next; } /** * ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。 * @param <K> * @param <V> */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; … }
构造函数:数据结构
/** *initialCapacity 初始化容量 **/ public ConcurrentHashMap(int initialCapacity) /** * *建立与给定map具备相同映射的新map **/ public ConcurrentHashMap(Map<? extends K, ? extends V> m) /** *initialCapacity 初始容量 *loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容 **/ public ConcurrentHashMap(int initialCapacity, float loadFactor) /** *initialCapacity 初始容量 *loadFactor 负载因子 *concurrencyLevel 预估的并发更新线程数 **/ public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
方法:多线程
put:并发
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//对hashCode进行再散列,算法为(h ^ (h >>> 16)) & HASH_BITS int binCount = 0; //这边加了一个循环,就是不断的尝试,由于在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject //由于若是其余线程正在修改tab,那么尝试就会失败,因此这边要加一个for循环,不断的尝试 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 若是table为空,初始化;不然,根据hash值计算获得数组索引i,若是tab[i]为空,直接新建节点Node便可。注:tab[i]实质为链表或者红黑树的首节点。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 若是tab[i]不为空而且hash值为MOVED(-1),说明该链表正在进行transfer操做,返回扩容完成后的table else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 针对首个节点进行加锁操做,而不是segment,进一步减小线程冲突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 若是在链表中找到值为key的节点e,直接设置e.val = value便可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是没有找到值为key的节点,直接新建Node并加入链表便可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操做。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 若是节点数>=8,那么转换链表结构为红黑树结构。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数增长1,有可能触发transfer操做(扩容)。 addCount(1L, binCount); return null; }
helpTransfer:app
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //下面几种状况和addCount的方法同样,请参考addCount的备注 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
tabAt:dom
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } /* *可是这边为何i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量 *ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址 *因此呢,((long)i << ASHIFT) + ABASE就算i最后的地址 * 那么compareAndSwapObject的做用就算tab[i]和c比较,若是相等就tab[i]=v不然tab[i]=c; */ static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
addCount:ide
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次进来都baseCount都加1由于x=1 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //多线程CAS发生失败的时候执行 fullAddCount(x, uncontended);//2 return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //当条件知足开始扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) {//若是小于0说明已经有线程在进行扩容操做了 //一下的状况说明已经有在扩容或者多线程进行了扩容,其余线程直接break不要进入扩容操做 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//若是相等说明扩容已经完成,能够继续扩容 transfer(tab, nt); } //这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数, // 这边加上2很巧妙,由于transfer后面对sizeCtl--操做的时候,最多只能减两次就结束 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
上面注释1,每次都会对baseCount 加1,若是并发竞争太大,那么可能致使U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,那么为了提升高并发的时候baseCount可见性失败的问题,又避免一直重试,这样性能会有很大的影响,那么在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的,是AtomicLong的增强版,AtomicLong在高并发场景性能会比LongAdder差。可是LongAdder的空间复杂度会高点。
咱们每次进来都对baseCount进行加1当达到必定的容量时,就须要对table进行扩容。扩容方法就是transfer
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //构建一个连节点的指针,用于标识位 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //循环的关键变量,判断是否已经扩容完成,完成就return,退出循环 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //循环的关键i,i--操做保证了倒序遍历数组 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默认16) i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //i<0说明已经遍历完旧的数组tab;i>=n何时有可能呢?在下面看到i=n,因此目前i最大应该是n吧。 //i+n>=nextn,nextn=nextTab.length,因此若是知足i+n>=nextn说明已经扩容完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {// a nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操做,参考sizeCtl的注释 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //若是有多个线程进行扩容,那么这个值在第二个线程之后就不会相等,由于sizeCtl已经被减1了, // 因此后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a) if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true;//finishing和advance保证线程已经扩容完成了能够退出循环 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null)//若是tab[i]为null,那么就把fwd插入到tab[i],代表这个节点已经处理过了 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED)//那么若是f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了 advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; //这边还对链表进行遍历,这边的的算法和hashmap的算法又不同了,这班是有点对半拆分的感受 //把链表分表拆分为,hash&n等于0和不等于0的,而后分别放在新表的i和i+n位置 //次方法同hashmap的resize for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab setTabAt(tab, i, fwd); advance = true; }//下面红黑树基本和链表差很少 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判断扩容后是否还须要红黑树结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
注意:若是链表结构中元素超过TREEIFY_THRESHOLD阈值,默认为8个,则把链表转化为红黑树,提升遍历查询效率.接下来咱们看看如何构造树结构,代码以下:
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
能够看出,生成树节点的代码块是同步的,进入同步代码块以后,再次验证table中index位置元素是否被修改过。
一、根据table中index位置Node链表,从新生成一个hd为头结点的TreeNode链表。
二、根据hd头结点,生成TreeBin树结构,并把树结构的root节点写到table的index位置的内存中,具体实现以下:
/** * Creates bin with initial set of nodes headed by b. */ TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
Get:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0)//若是eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另外一个线程正则扩容 //因此要查找key对应的值的话,直接到新newtable找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
这个get请求,咱们须要cas来保证变量的原子性。若是tab[i]正被锁住,那么CAS就会失败,失败以后就会不断的重试。这也保证了get在高并发状况下不会出错。
咱们来分析下到底有多少种状况会致使get在并发的状况下可能取不到值。一、一个线程在get的时候,另外一个线程在对同一个key的node进行remove操做;二、一个线程在get的时候,另外一个线程正则重排table。可能致使旧table取不到值。
那么本质是,我在get的时候,有其余线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?咱们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt究竟是干吗的:
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
它是对tab[i]进行原子性的读取,由于咱们知道putVal等对table的桶操做是有加锁的,那么通常状况下咱们对桶的读也是要加锁的,可是咱们这边为何不须要加锁呢?由于咱们用了Unsafe的getObjectVolatile,由于table是volatile类型,因此对tab[i]的原子请求也是可见的。由于若是同步正确的状况下,根据happens-before原则,对volatile域的写入操做happens-before于每个后续对同一域的读操做。因此无论其余线程对table链表或树的修改,都对get读取可见。
sun.misc.Unsafe类:
Unsafe类是什么呢?java不能直接访问操做系统底层,而是经过本地方法来访问。Unsafe类提供了硬件级别的原子操做。Unsafe类在jdk 源码的多个类中用到,这个类的提供了一些绕开JVM的更底层功能,基于它的实现能够提升效率。可是,它是一把双刃剑:正如它的名字所预示的那样,它是Unsafe的,它所分配的内存须要手动free(不被GC回收)。Unsafe类,提供了JNI某些功能的简单替代:确保高效性的同时,使事情变得更简单。
//在o的offset偏移地址处,获取volatile类型的对象 public native java.lang.Object getObjectVolatile(java.lang.Object o, long l); //原子性的更新java变量 public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2); /** * Stores a reference value into a given Java variable, with volatile store * semantics. Otherwise identical to * {@link #putObject(Object, long, Object)} */ public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1); /** * Atomically update Java variable to <tt>x</tt> if it is currently holding * <tt>expected</tt>. * * @return <tt>true</tt> if successful */ public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2);