1、ConcurrentHashMap并发容器java
1.ConcurrentHashMapnode
ConcurrentHashMap是HashMap的升级版,HashMap虽然效率高,但它是线程不安全的容器,不能再多线程环境使用,而HashTable虽然是线程安全的,但使用的是效率比较低的synchronized,而且synchronized是悲观锁,不论读写都是独占的。而ConcurrentHashMap的底层是使用CAS来实现并发访问的,效率较高。算法
ConcurrentHashMap的底层数据结构与HashMap相同,采用数组+链表/红黑树来实现,以下图:数组
2.链表结点的实现安全
链表结点是基本的存储单元,其实不用想太多,必然和HashMap中的Node相相似。数据结构
//能够看到Node实现了Map.Entry接口,与HashMap基本同样 static class Node<K,V> implements Map.Entry<K,V> { final int hash; //hash值,final修饰,表示hash码不可改变。 final K key; //key值 volatile V val; //value值,volatile保证可见性 volatile Node<K,V> next; //链表的下个结点 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } //重写equals方法 public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } //查找结点,能够看出ConcurrentHashMap中不容许存放value==null的数据 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } } //底层数组扩容时的过渡链表结点类型 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); //标识结点的状态为MOVED,即转移扩容状态 this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
//红黑树的结点实现,继承了Node结点 static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; //父结点的引用 TreeNode<K,V> left; //左孩子 TreeNode<K,V> right; //右孩子 TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; //颜色 TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } //查看树中是否存在hash值为h,value为k的结点 Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } //查找结点 final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { //判断k是否null,不容许放null值 if (k != null) { TreeNode<K,V> p = this; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
//TreeBin 用做树的头结点,只存储root和first节点,不存储节点的key、value值 static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; //根结点 volatile TreeNode<K,V> first; //树的链式结构 volatile Thread waiter; //等待线程 volatile int lockState; //锁状态 // values for lockState static final int WRITER = 1; // 表明拥有写入锁 static final int WAITER = 2; // 表明等待获取写入锁 static final int READER = 4; // 用于设置读取锁的增长 }
//ConcurrentMap接口源码 public interface ConcurrentMap<K, V> extends Map<K, V> { //返回指定的key对应的value值,若key不存在则返回默认提供的defaultValue @Override default V getOrDefault(Object key, V defaultValue) { V v; return ((v = get(key)) != null) ? v : defaultValue; } //遍历Map的方法,至关于for循环或foreach循环,主要是用于lambda表达式 @Override default void forEach(BiConsumer<? super K, ? super V> action) { Objects.requireNonNull(action); for (Map.Entry<K, V> entry : entrySet()) { K k; V v; try { k = entry.getKey(); v = entry.getValue(); } catch(IllegalStateException ise) { // this usually means the entry is no longer in the map. continue; } action.accept(k, v); } } //若是map中不存在key对应的键值对,那么就新增当前传入的key和value。 //不然就返回map中key对应的value值 V putIfAbsent(K key, V value); //删除map中对应的key和value,当且仅当key和value都对应上时才删除 //若key的对应的值不是value则不删除 boolean remove(Object key, Object value); //替换key的value值,当key对应的旧值是oldValue时,替换为newValue //不然不替换 boolean replace(K key, V oldValue, V newValue); //若key存在,则将映射值替换为给定的value V replace(K key, V value); //将每一个key映射的值替换成给定的调用函数的返回值 @Override default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) { Objects.requireNonNull(function); forEach((k,v) -> { //lambda表达式遍历map while(!replace(k, v, function.apply(k, v))) { // v changed or k is gone if ( (v = get(k)) == null) { // k is no longer in the map. break; } } }); } //若是map中不存在key(或key的映射为null)且调用key为参数的函数返回值newValue不为null //且将key与返回值newValue关联成功时,返回newValue,不然返回key本来的映射值 @Override default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { Objects.requireNonNull(mappingFunction); V v, newValue; return ((v = get(key)) == null && (newValue = mappingFunction.apply(key)) != null && (v = putIfAbsent(key, newValue)) == null) ? newValue : v; } //调用函数返回的新映射值替换key对应的旧映射值,若果新映射值为null,则直接将键值对从map中删除 @Override default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { Objects.requireNonNull(remappingFunction); V oldValue; while((oldValue = get(key)) != null) { V newValue = remappingFunction.apply(key, oldValue); if (newValue != null) { if (replace(key, oldValue, newValue)) return newValue; } else if (remove(key, oldValue)) return null; } return oldValue; } @Override default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { Objects.requireNonNull(remappingFunction); V oldValue = get(key); //获取旧映射值 for(;;) { V newValue = remappingFunction.apply(key, oldValue); //计算新映射值 if (newValue == null) { //判断新映射值是否为null //新值不存在且map中存在key,直接删除该键值对 if (oldValue != null || containsKey(key)) { //判断删除是否成功 if (remove(key, oldValue)) { // removed the old value as expected return null; } //删除失败在继续尝试 oldValue = get(key); } else { //key本来就不存在,直接返回 return null; } } else { // 新映射值不为null,且旧映射值存在。则直接替换旧映射值 if (oldValue != null) { // replace if (replace(key, oldValue, newValue)) { // replaced as expected. return newValue; } oldValue = get(key); } else { //旧映射值不存在,即key不存在,则直接向map中新增 if ((oldValue = putIfAbsent(key, newValue)) == null) { return newValue; } } } } } //若是map中存在key且映射为value,则用调用函数计算出的非空新映射值替换旧映射值 //若新映设置为null,则将旧键值对删除 //若是map中不存在key或key的旧映射为null,则直接新增键值对(key和value) @Override default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { Objects.requireNonNull(remappingFunction); Objects.requireNonNull(value); V oldValue = get(key); for (;;) { if (oldValue != null) { V newValue = remappingFunction.apply(oldValue, value); if (newValue != null) { if (replace(key, oldValue, newValue)) return newValue; } else if (remove(key, oldValue)) { return null; } oldValue = get(key); } else { if ((oldValue = putIfAbsent(key, value)) == null) { return value; } } } } }
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { //map可达到的最大容量,2的31次方 private static final int MAXIMUM_CAPACITY = 1 << 30; //map初始的默认容量,没有指定map的容量时的默认值 private static final int DEFAULT_CAPACITY = 16; //数组的最大长度 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //map的默认并发级别,没有使用 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //map的默认负载因子 private static final float LOAD_FACTOR = 0.75f; //链表转换成红黑树的临界链表长度,即链表长度大于等于8时,链表将转成红黑树 static final int TREEIFY_THRESHOLD = 8; //红黑树转成链表的临界结点数,即红黑树的结点个数小于等于6时,由红黑树退化成链表 static final int UNTREEIFY_THRESHOLD = 6; //容许链表树化的最小容量值,即map的容量要大于64,才能将链表树化 static final int MIN_TREEIFY_CAPACITY = 64; //扩容线程所负责的区间大小最低为16,避免发生大量的内存冲突 private static final int MIN_TRANSFER_STRIDE = 16; //用于生成当前数组对应的基数戳 private static int RESIZE_STAMP_BITS = 16; //表示最多能有多少个线程可以帮助进行扩容,由于sizeCtl只有低16位用于标识,因此最多只有2^16-1个线程帮助扩容 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //将基数戳左移的位数,保证左移后的基数戳为负值,而后再加上n+1,表示n个线程正在扩容 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int MOVED = -1; // 表示正在转移,是一个forwardNode节点 static final int TREEBIN = -2; // 表示已经转换成树,是一个TreeBin节点 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 用于生成hash值 //计算机的核心数 static final int NCPU = Runtime.getRuntime().availableProcessors(); //底层的table结点数组,存储键值对 transient volatile Node<K,V>[] table; //只有当数组处于扩容过程时,nextTable才不为null;不然其余时刻,nextTable为null; //nextTable主要用于扩容过程当中指向扩容后的新数组 private transient volatile Node<K,V>[] nextTable; /** * 未初始化: * sizeCtl=0:表示没有指定初始容量。 * sizeCtl>0:表示初始容量。 * 初始化中: * sizeCtl=-1,标记做用,告知其余线程,正在初始化 * 正常状态: * sizeCtl=0.75n ,扩容阈值 * 扩容中: * sizeCtl < 0 : 表示有其余线程正在执行扩容 * sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容 */ private transient volatile int sizeCtl; //用于扩容过程当中,指示原数组下一个分割区间的上界位置,从后往前指示 private transient volatile int transferIndex; //key的视图,map中全部key的集合 private transient KeySetView<K,V> keySet; //value的视图 private transient ValuesView<K,V> values; //Entry视图 private transient EntrySetView<K,V> entrySet; //空构造,啥也没干 public ConcurrentHashMap() { } //带指定容量的构造方法,虽然指定了容量,但map的容量的肯定并非指定的容量 //而是最接近该容量的2的幂次方数 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) //判断容量是否合法 throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //扩容策略,容量的肯定方法 private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } //带有map集合的构造方法 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } //带有负载因子及初始化容量的构造方法 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } //带有负载因子及初始化容量、并发等级的构造方法 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // 最低的容量不能小于并发级别 initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } //不安全的属性变量,知道就行 private static final sun.misc.Unsafe U; //不安全的底层操做类 private static final long SIZECTL; //变量sizeCtl的内存地址偏移量 private static final long TRANSFERINDEX; //变量transferIndex的内存地址偏移量 private static final long BASECOUNT; //变量baseCount的内存地址偏移量 private static final long CELLSBUSY; //变量cellsBusy的内存地址偏移量 private static final long CELLVALUE; //变量value的内存地址偏移量 private static final long ABASE; //变量ak的内存地址偏移量 private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } }
public V put(K key, V value) { return putVal(key, value, false); //真正执行put的方法 } //若key不存在map中,则直接将key与value新增到map中,若key已存在,则根据onlyIfAbsent //决定是否覆盖,false为覆盖,true不覆盖 final V putVal(K key, V value, boolean onlyIfAbsent) { //判断key或value是否为null,从这能够看出,ConcurrentHashMap的键与值均不能为null if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //计算key对应的hash值 int binCount = 0; //用来计算在这个链表或红黑树总共有多少个元素,用来控制扩容或者转移为树 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //判断table是否初始化过,若未初始化,则进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //初始哈数组的方法 //判断hash值在table中对应的索引位置是否已经有数据 //若没有数据,则直接公国CAS的方式更新数据(即新建一个结点放到数组table中) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //向数组中新增结点 break; // 新增完毕,退出循环 } //判断数组table对应索引的结点的状态(即链表的表头或红黑树的根结点的hash值) //若为MOVED表示数组正在扩容的复制阶段,那么当前线程也前去帮忙复制 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //帮助转移复制数据 else { V oldVal = null; //对表头或树根结点对象加锁,即同时仅有一个线程能对该链表或红黑树进行新增操做 //从这里能够看出ConcurrentHashMap的并发安全是基于对链表或黑红树的独占操做实现的 synchronized (f) { //判断数组中的索引是否发生改变 //有可能结点的类型发生改变(链表转成红黑树) if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; //遍历链表 for (Node<K,V> e = f;; ++binCount) { K ek; //判断要存放的key在链表中是否已经存在 //如果存在则根据onlyIfAbsent决定是否覆盖 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果key在链表中以前不存在,那么新建一个key和value构成的结点,向链表末尾新增 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //判断结点是不是TreeBin类型,如果则代表链表已经树化了 //应该使用红黑树中的方法进行替换或新增 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; //putTreeVal是红黑树中的新增或替换方法, //若返回不为null,则表示key在树中已存在,返回对应的结点 //若返回是null,代表是新增结点 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //判断链表是否须要树化成红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //判断链表的长度是否超过8 treeifyBin(tab, i); if (oldVal != null) //如果替换结点的value,则返回旧值 return oldVal; break; } } } addCount(1L, binCount); //到这必然新增一个node,该修改size的值了 return null; } //hash值的计算方法,即散列算法 //h缩小2的16次方后与自身向异或,在与上HASH_BITS(0x7fffffff) static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } //初始化table数组 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //当table为空时,尝试对table进行初始化 while ((tab = table) == null || tab.length == 0) { //判断是否有其余线程正在对table数组进行初始化工做 //当sizeCtl==-1时,说明已经有线程正在对table进行初始化,当前线程应该暂停让出cpu资源 if ((sc = sizeCtl) < 0) Thread.yield(); //CAS方式尝试更新sizeCtl的值,将sizeCtl的值由sc更新成-1,-1表示table表要进行初始化了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次确认table为初始化过 if ((tab = table) == null || tab.length == 0) { //初始化指定的容量,若未指定初始容量的大小则使用DEFAULT_CAPACITY(16) int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); //令sizeCtl的值为0.75倍的数组大小 } } finally { sizeCtl = sc; } break; } } return tab; } //安全的获取数组tab中索引为i的位置的数据 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //辅助table数组的扩容转移工做 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //判断数组table是否为null,且结点f是不是ForwardingNode类型,且nextTab是否为null //即判断map是否是处在扩容状态,如果进入帮助扩容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //判断是否还有未转移的桶区间,若没有,则退出 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //若还有,则尝试参与扩容的线程数+1;成功就进行转移, if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; } //链表是否须要树化 private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //判断数组table的大小是否知足树化的最小容量要求 //若不知足,但链表长度又超过8,则进行数组扩容,扩大一倍 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //扩容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //锁结点将链表树化。其实就是新建一棵红黑树。 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); //更新索引位置的结点为树的根结点 } } } } } //数组table扩容 private final void tryPresize(int size) { //判断要扩大的容量是否越界,且是不是2的次方 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //判断数组是否初始化过,若没有初始化过,则初始化数组table //这里和initTable方法是同样的 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } //判断是否须要扩容,c小于等于sc则表示无需扩容,没达到扩容的临界值,n大于MAXIMUM_CAPACITY表示不能再扩大了 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //不知道用来干吗,艹 //判断是否在扩容或初始化,扩容或初始化是sc小于0 //如果初始化(sc==-1) if (sc < 0) { Node<K,V>[] nt; /** * 1 (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1 * 2 sc == rs + 1 和 sc == rs + MAX_RESIZERS :看不懂.. * 3 (nt = nextTable) == null :表示nextTable正在初始化 * 4 transferIndex <= 0 :表示全部hash桶均分配出去 */ //若是不须要帮其扩容,直接返回 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //transfer的线程数+1,该线程将帮忙转移 //在transfer的时候,sc表示在transfer工做的线程数 //第一个执行扩容操做的线程,将sizeCtl设置为:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //没有在初始化或扩容,则开始扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } //这个方法计算n的二进制最高位前有几个零,而后和2的15次方按位或,得出个数,不知道有什么用 static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } /** * 数组扩容的核心方法 * 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置 * 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用, * 每一个CPU最少处理16个长度的数组元素,也就是说,若是一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操做 * 扩容的时候会一直遍历,直到复制完全部节点,每处理一个节点的时候会在链表的头部设置一个fwd节点,这样其余线程就会跳过他, * 复制后在新数组中的链表不是绝对的反序的 * 简要过程 * 一、获取遍历table的步长 * 二、最早进行扩容的线程,会初始化nextTable(正常状况下,nextTable 是table的两倍大小) * 三、计算table某个位置索引 i,该位置上的数据将会被转移到nextTable 中。 * 四、若是索引i 所对应的table的位置上没有存放数据,则放有ForwardingNode 数据,代表该table 正在进行扩容处理。 * (若是有添加数据的线程添加数据到该位置上,将会发现table的状态) * 五、将索引i 位置上的数据进行转移,数据分红两部分,一部分就是数据 在nextTable 中索引没有变(仍然是i), * 另外一部分则是其索引变成i+n的,将这两部分分别添加到nextTable 中。 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //判断并设置每一个cpu核心(即每一个线程)须要转移的结点数量,最少为16个 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab为null,初始化一个table数组2倍长度的数组 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; //transferIndex为原数组的终点,转移时,从后往前转移,控制原数组的转移 } int nextn = nextTab.length; //新数组长度 //新建ForwardingNode结点,用来控制并发的,当一个节点为空或已经被转移以后,就设置为ForwardingNode节点 //是个空的标志结点 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //advance 指的是作完了一个位置的迁移工做,能够准备作下一个位置的了 //表示是否继续向前查找的标志位 boolean advance = true; boolean finishing = false; //转移是否完成的标志位,完成前从新扫面数组,看看有没有遗留的 //计算和控制对table 表中位置i 的数据进行转移 //bound为数组区间下限值,i为当前转移数组的位置,--i处理转移下一个节点位置,从后往前处理 //1 逆序迁移已经获取到的hash桶集合,若是迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶 //2 若是transferIndex=0,表示因此hash桶均被分配,将i置为-1,准备退出transfer方法 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //判断当前这一段桶区间stride是否转移完成 //i小于bound,就表示当前任务完成了 //finishing==true则表示没有转移任务结束了 if (--i >= bound || finishing) advance = false; //判断是否还有 转移任务可领取 //transferIndex==0表示,原数组中全部的桶区间都已经有线程在进行转移了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //CAS操做修改transferIndex值,表明下一个线程要转移原数组的结点的起始索引位置 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //判断i是否小于0,便是不是全部数据都转移完了 //且i是否越界,i是不该该大于原数组长度的,也不应大于新数组长度 if (i < 0 || i >= n || i + n >= nextn) { int sc; //判断转移操做是否完成 //完成的话就将新数组赋给table,sizeCtl变为0.75倍的新数组长度 //nextTable则赋null,扩容完成 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //转移工做没有结束,尝试更新sizeCtl的值,更新成功,表示当前转移该桶区间数据的任务完成了 /** * 最早的线程,执行transfer方法时,会设置 * sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) (为何是这个数?) * 后面辅助扩容的线程,执行transfer方法以前,会设置 sizeCtl = sizeCtl+1 * 每一个线程转移完桶区间的数据后退出以前,会设置 sizeCtl = sizeCtl-1 * 那么最后一个线程退出时:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2), * 即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT */ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //判断是否到最后一个线程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; //最后退出的线程要从新check下是否所有转移完毕 } } //判断原数组索引i处是否为null,为null就放个ForwardingNode结点,表示数组正在扩容转移 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //判断索引i处的结点是不是ForwardingNode结点,如果,表示该结点已经转移了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //对数组索引i的结点进行加锁,该结点进行数据转移,不要打扰 synchronized (f) { //再次判断头结点有没有被修改过,被修改过就循环从新来 if (tabAt(tab, i) == f) { //新建两个链表。用于存放原链表转移到新数组是分开的结点 //ln存放索引不变的结点,hn存放索引变为i+n的结点 Node<K,V> ln, hn; //判断头结点时链表仍是红黑树 /** * fh大于等于0表示当前索引下是链表结构,那么链表中的结点转移到新数组中 * 只有可能存放在与元素组相同的索引i处,或是i+n(n为原数组长度)处,这是由于 * 数组的扩容时原来的2倍,且数组的长度必须是2的幂次方,这就使得同一索引的结点在新数组 * 中的索引只有上述两种可能的去处(这里与hashmap是相同的) */ if (fh >= 0) { //计算hash值与数组长度的按位结果 /** * 咱们知道hash&(n-1)是结点在数组中的索引位置 * 而hash&n,则是判断结点是在新数组中位置是否变化的一个依据 * 例子:e.hash=6,二进制为0000 0110。oldCap假设为16,二进制位0001 0000。newCap为32,二进制位0010 0000 * (e.hash & oldCap)=0000 0110 & 0001 0000=0000 0000 ; * (e.hash & oldCap-1)=0000 0110 & 0000 1111=0000 0110 ; * (e.hash & newCap)=0000 0110 & 0010 0000=0000 0000 * (e.hash & newCap-1)=0000 0110 & 0001 1111=0000 0110 ; * 例子:e.hash=19,二进制为0001 0011。oldCap假设为16,二进制位0001 0000。newCap为32,二进制位0010 0000 * (e.hash & oldCap)=0001 0011 & 0001 0000=0001 0000 ; * (e.hash & oldCap-1)=0001 0011 & 0000 1111=0000 0011 ; * (e.hash & newCap)=0001 0011 & 0010 0000=0000 0000 * (e.hash & newCap-1)=0001 0011 & 0001 1111=0001 0011 ; * 有上面就能够看出当(e.hash & oldCap) == 0时,(e.hash & oldCap-1)与(e.hash & newCap-1)的值时相同的,即在新数组中索引不变 * 而当(e.hash & oldCap) != 0 时,(e.hash & oldCap-1)与(e.hash & newCap-1)的值不相同的,即在新数组的索引变了 * 而且hash& n的结果只有0或n两种,能够借此判断是存放在hn中仍是ln中 int runBit = fh & n; Node<K,V> lastRun = f; //最后一次发生hash值改变的结点 //遍历链表查找结点的hash & n值最后一次有变化的结点 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //lastRun用做哪一个链表的起点,runBit为0表示结点在新数组中的索引不变 //runBit不为0表示结点在新数组中发生改变,变为i+n if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //将结点转移到新数组中 //遍历旧链表,按个判断是移动到i+n位置,仍是索引不动,分别放入ln或hn链表中 //这里hn或ln哪一个链表为null,就会出现结点顺序反转,假设ln为null,则 //有ln==null,那么越早加入链表就会排在越后面 //而hn==lastRun,则会有部分结点反转,即原数组中lastRun结点以前的顺序会反转,以后不变 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将两个新的链表更新到新数组中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); //将原数组的索引位置结点用fwd替换 advance = true; } //这里就是红黑树的转移了 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判断两个树结点是否是小于等于6,要不要退化成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
//标识当前cell数组是否在初始化或扩容中的CAS标志位 private transient volatile int cellsBusy; //counterCells数组,总数值的分值分别存在每一个cell中 private transient volatile CounterCell[] counterCells; //没有竞争的时候使用,或者在初始化的时候做为一个反馈 //总和值的得到方式为 base + 每一个cell中分值之和在并发度较低的场景下,全部值都直接累加在base中 private transient volatile long baseCount; //CounterCell的定义 @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //计算总和,每一个CounterCell中的值相加 final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //若是counterCell数组为null,则直接尝试将增长的键值对数更新到baseCount的上 //若counterCell不为null if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 是否冲突标志,默认未冲突 boolean uncontended = true; // 若是计数数组是空(还没有出现并发) // 若是随机取一个数组位置为空 或者 // 修改这个变量cellvalue失败(出现并发了) // 执行 fullAddCount 方法。并结束 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; //计算总和 s = sumCount(); } //判断是否须要扩容,从putVal方法进来(check一定大于等于0)一定要检查是否要扩容 //这里与上面分析的扩容过程相似,不在多说 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } } /** * 一、初始化probe,该值散列后用于获取counterCells数组的索引值(线程相关) * 二、若是counterCells数组不为空,尝试占据counterCells 数组,建立CounterCell存放到counterCells中的某个位置上,退出循环 * 三、若是counterCells 中该位置上存在数据,若是有冲突,则从新循环,不然尝试累加数据到cell 中,成功则退出循环。 * 四、若是counterCells 数组扩容,或者不须要扩容,则从新循环 * 五、对counterCells 进行扩容操做(容量增长1倍),而后从新循环 * 六、若是counterCells未初始化,则尝试进行初始化 * 七、若是初始化失败,则有其它线程再初始化,更新baseCount,成功就退出,不然从新循环。 */ private final void fullAddCount(long x, boolean wasUncontended) { int h; // 获取当前线程的probe值 // 若是为0,则初始化当前线程probe值 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // 生成一个线程全部的随机值 h = ThreadLocalRandom.getProbe(); //返回生成的probe值,用于选择counterCells数组的下标 wasUncontended = true; //从新生成了probe,未冲突标志位设置为true } boolean collide = false; // 标识是不是最后一个槽位 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //判断counterCells数组是否初始化过,若未初始化则先初始化 //cells数组不为空,即表示初始化已经成功 if ((as = counterCells) != null && (n = as.length) > 0) { //判断当前线程对应的索引是否有数据 //没有数据则新建一个CounterCell存放x,并更新counterCells数组 if ((a = as[(n - 1) & h]) == null) { //判断counterCells数组在什么状态下 //为0表示不在扩容或初始化状态 if (cellsBusy == 0) { // Try to attach new Cell //新建当前线程对应的CounterCell对象 CounterCell r = new CounterCell(x); // Optimistic create // CAS设置cellsBusy,防止其它线程来破坏数据结构 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; //标志位,操做是否成功 try { // Recheck under lock CounterCell[] rs; int m, j; //更新counterCells数组中的数据 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; //恢复标志位 } //更新成功,则退出死循环 if (created) break; continue; // Slot is now non-empty } } collide = false; } //对应索引中已经有数据了 //而且调用该函数前,cellvalue的CAS操做也已经失败(已经发生竞争) else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //执行value的累加 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //成功更新就直接退出 //若是数组比较大了,则不须要扩容,继续重试,或者已经扩容了,重试 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale else if (!collide) collide = true; //对counterCells数组进行扩容,如有其余线程在扩容则从新循环 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; //扩大1倍 for (int i = 0; i < n; ++i) //直接转移数据 rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); //从新生成一个probe值 } //进行初始化 //线程要初始化counterCells数组的条件是:cellsBusy标志位要为0(说明不在初始化或扩容) //counterCells == as这个判断不知道有什么意义? //当前线程更新cellsBusy状态为初始化状态,那么其余线程就不能再进行初始化了 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; //标志初始化是否成功 try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; //counterCells数组默认容量为2 rs[h & 1] = new CounterCell(x); //h&1是计算线程的probe值散列后的存放索引 counterCells = rs; init = true; //初始化成功 } } finally { cellsBusy = 0; //恢复 } if (init) //判断初始化是否成功,成功就退出 break; } //有其它线程占据counterCells数组,直接累加在base变量中 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
//get方法的实现 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); //计算key对应的hash值 //判断table数组是否为空,若不为空,h对应的索引是否有结点 //table不为空或索引位置没有结点说明key不存在,直接返回null if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //判断该索引位置的结点的hash值是否与h相同(有可能头结点的hash值小于0,表示此时正在扩容) if ((eh = e.hash) == h) { //判断要查找的key是否是头结点,如果头结点俺么直接返回头结点的value值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //eh小于0,该结点是ForwardingNode结点或TreeBin结点,调用find方法 //ForwardingNode的find方法比较简单就是遍历新数组来查找对应结点是否存在 //TreeBin的find方法则要复杂 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //遍历链表来查找key对应的value while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } //ForwardingNode中的find方法,遍历新数组对应的索引查找key Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } //TreeBin中的find方法 final Node<K,V> find(int h, Object k) { if (k != null) { // for (Node<K,V> e = first; e != null; ) { int s; K ek; //判断TreeBin的状态,如果等待或写状态,则以链表的形式进行遍历查找 //((s = lockState) & (WAITER|WRITER)) == 0表示TreeBin如今处于读状态 if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } //尝试更新读状态,即获取读锁,成功的话,就以红黑树的形式进行遍历查找 else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode<K,V> r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) LockSupport.unpark(w); } return p; } } } return null; }
public V remove(Object key) { return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); //计算hash值 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //判断table数组是否为空,或hash值对应索引上是否有结点 //table为空或索引上没有结点,代表key不存在,直接返回 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) //若是当前table数组在扩容,当前线程先去辅助扩容,扩容完在删除结点 tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { //加锁,保证对应的链表或红黑树的并发操做 if (tabAt(tab, i) == f) { //再次判断头结点是否被改变 //判断是链表仍是红黑树结构 //fh大于0代表是链表 if (fh >= 0) { validated = true; //遍历链表 for (Node<K,V> e = f, pred = null;;) { K ek; //判断遍历到的当前结点是否是要删除的结点 //是要删除的结点就进行删除,并将删除结点的value返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) //判断下个结点 break; //链表到头了,说明不存在哟啊删除的结点,直接退出循环 } } //底层是红黑树结构,执行红黑树中删除节点的额方法,这里不作深刻,有兴趣能够看源码具体实现 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; //先查找出key对应的结点是否存在,不存在直接结束 //存在的话,获取到value再删除结点 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } //判断要删除的key是否存在 //存在就返回对应的value,不存在就返回null if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }