ConcurrentHashMap源码分析

构造方法

/**
 * Creates a new, empty map with the default initial table size (16). * 默认大小16 */ public ConcurrentHashMap() { } /** * 自定义大小的构造函数 */ public ConcurrentHashMap(int initialCapacity) { // 初始大小小于0抛出异常 if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } /** * Returns a power of two table size for the given desired capacity. * 返回最接近传参值的2的次方数 * 参考 Hackers Delight 第3.2节 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 合法性判断 throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }

 

 

初始化table

初始化table的源码以下,关于为何代码13行中,为何要重复判断table是否为空,举例说明:java

①当有两个线程A和B同时触发initTable时,此时的sizeCtl要么是初始值,要么是已经初始化成功后的值数组

②当线程A和B都进入循环时,判断执行U.compareAndSwapInt(this, SIZECTL, sc, -1)【其中U是sun.misc.Unsafe,Unsafe是借助C调用Cpu底层指令的类,SIZECTL是sizeCtl的偏移量】,此时线程A和B只有一个返回true,而且sc置为-1,这一步为原子操做多线程

③假设线程A成功执行,sc置为-1,那么线程B则返回false并继续循环,此时因为线程A还未执行完,sc为-1,B则一直让出CPU并发

④此时线程A执行判断sc是否大于0,大于0则赋值n为sc,不然置为默认值16,而后建立一个大小为n的Node数组,并赋值到table函数

⑤sc=n-(n>>>2),sizeCtl置为新的sc值,此时线程A结束初始化,线程B可能触发运行,这时候在代码第13行判断table是否为空,就能防止从新建立table了this

/**
 * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // table为空或者长度为0才初始化 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // sc用于并发的线程判断,小于0让出CPU else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 这里重复判空,是为了防止多线程重复建立table if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }

 

添加元素

 

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) { // key和value为空的抛出空指针异常 if (key == null || value == null) throw new NullPointerException(); // 重算hashcode,将hashcode的高位也应用到hashcode的取值,增长hashcode的复杂度,下降碰撞 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // table为空的状况或者长度为0,初始化table if (tab == null || (n = tab.length) == 0) tab = initTable(); // 若是table中查询不到该hash值的数据,则直接建立节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 若是f.hash值为MOVED ,即等于-1,表明有线程在处理 else if ((fh = f.hash) == MOVED) // 帮助扩容,后面分析这个方法 tab = helpTransfer(tab, f); else { V oldVal = null; // 对首个节点进行加锁,减小线程冲突 synchronized (f) { // 这里再次获取hash对应的table节点,判断节点是否一致,防止table发生改变 if (tabAt(tab, i) == f) { // fh>=0,证实是正常的节点插入,不须要考虑红黑树 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 若是在链表中找到值为key的节点e,直接设置e.val = value便可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 若是没有找到值为key的节点,直接新建Node并加入链表便可。 Node<K,V> pred = e; // put到末尾节点 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 若是首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操做。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 添加完以后,判断是否须要树化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数加1,可能会触发扩容 addCount(1L, binCount); return null; } // 查找元素是否存在,原子操做,支持并发 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } // 新增一个元素,偏移量的计算都是((long)i << ASHIFT) + ABASE static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }

 

附加说明atom

在 Node 的子类 ForwardingNode 的构造方法中,能够看到MOVED做为 hash 值进行了初始化。spa

ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }

而这个构造方法只在一个地方调用了,即 transfer(扩容) 方法。线程

相关文章
相关标签/搜索