concurrentHashMap 是用的最多的一个concurrent包数据结构,了解内部设计对高并发有帮助。node
Segment继承ReentrantLock用来充当锁的角色,每一个 Segment 对象守护每一个散列映射表的若干个桶 。数组
HashEntry 用来封装映射表的键 / 值对数据结构
每一个桶是由若干个 HashEntry 对象连接起来的链表并发
ConcurrentHashMap定位一个元素的过程须要进行两次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部。这一种结构的带来的反作用是Hash的过程要比普通的HashMap要长,可是带来的好处是写操做的时候能够只对元素所在的Segment进行加锁便可,不会影响到其余的Segmentssh
扩容:因为HashEntry.next都是/final/函数
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { '用于分段' //容许的最大段数;用于绑定构造函数参数。必须是小于1小于24的幂。 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //根据这个数来计算segment的个数,segment的个数是仅小于这个数且是2的几回方的一个数(ssize) static final int DEFAULT_CONCURRENCY_LEVEL = 16; //每一个分段表的最小容量。必定要有2个,至少2个,以免在懒惰的建筑后使用下一个使用。 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; '用于hashEntry' //默认的用于计算Segment数组中的每个segment的HashEntry[]的容量,可是并非每个segment的HashEntry[]的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //这个表的默认加载因子,在构造函数中没有指定的时候使用 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 用于计算Segment数组中的每个segment的HashEntry[]的最大容量(2的30次方) static final int MAXIMUM_CAPACITY = 1 << 30; //在使用锁以前,在大小和容器值方法上进行不一样步的重试。若是表进行连续修改,就会避免无界重试,这样就不可能得到准确的结果。 static final int RETRIES_BEFORE_LOCK = 2; static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } } static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //存储表 transient volatile HashEntry<K,V>[] table; //元素的数量。 transient int count; //并发标记 transient int modCount; //容量阈值,用于扩容 transient int threshold; //负载因子,用于肯定threshold final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } '真实的插入' final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : '加锁成功在插入' scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) / key || (e.hash / hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); '释放锁' } return oldValue; } } '插入' public V put(K key, V value) { Segment<K,V> s; if (value / null) throw new NullPointerException(); int hash = hash(key); 'key的hash值' 'segmentShift:28' 'segmentMask:1111' int j = (hash >>> segmentShift) & segmentMask; '无符号右位移28位,模运算' if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) / null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); '调用segment的put ' } '获取' public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) / key || (e.hash / h && key.equals(k))) return e.value; } } return null; } }
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); '设置最大并发级别:65535' if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; '' int ssize = 1; '段数组大小' while (ssize < concurrencyLevel) { ++sshift; 'sshift=4' ssize <<= 1; '左移1位,,ssize *=2 /16' } this.segmentShift = 32 - sshift; '偏移量:28' this.segmentMask = ssize - 1; '掩码:15:1111' if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; '最大为:1073741824。默' int c = initialCapacity / ssize; 'C=1' if (c * ssize < initialCapacity) '判断是有没除尽,保证:c * ssize >initialCapacity' ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; '最小HashEntry大小:' while (cap < c) cap <<= 1; // create segments and segments[0] // 建立段S0 'loadFactor=0.75' 'threshold=(int)0.75*2=1' 'HashEntry[2]' Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); //建立 segment 数组 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
'调用' public V put(K key, V value) { '判断值' Segment<K,V> s; if (value / null) throw new NullPointerException(); int hash = hash(key); //计算hash值 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) / null) // in ensureSegment s = ensureSegment(j); '' return s.put(key, hash, value, false); } '新建segment' private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; 'cap =2' float lf = proto.loadFactor; ' 0.75' int threshold = (int)(cap * lf); 'threshold = 1' HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } '添加' final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : '加锁' scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; '获取table' int index = (tab.length - 1) & hash; '获取table中的偏移量' HashEntry<K,V> first = entryAt(tab, index); '从table中,找出链表头' '无限循环添加进去' for (HashEntry<K,V> e = first;;) { if (e != null) { '头部位null' K k; "若是key相同,则直接替换" if ((k = e.key) / key ||(e.hash / hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; "若是key不一样,则操做e后面的元素" }else { "头为null" if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); '新建,并将头放在新HashEntry的后面' int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); '扩容' else setEntryAt(tab, index, node); '将新HashEntry直接添加在头上面' ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; "获取旧的段table" int oldCapacity = oldTable.length; '旧的HashEntry数量' int newCapacity = oldCapacity << 1; '扩大一倍' threshold = (int)(newCapacity * loadFactor); '剩扩展因子:0.75' HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; "位模运算:1111或者11111。每次扩大2被" for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; "旧数组的琏表头" if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; "新的index" if (next / null) // 只有一个节点 newTable[idx] = e; "若是只有一个数据,直接放到新的index中" else { "在相同的槽内重用连续序列" HashEntry<K,V> lastRun = e; "新的起始点" int lastIdx = idx; "新的index" '找到e节点第一个索引值相同的HashEntry节点' for (HashEntry<K,V> last = next;last != null;last = last.next) { int k = last.hash & sizeMask; "后继的index" if (k != lastIdx) { "若是后继的新index和当前结点index相同的元素" lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; "而后将相同的结点到结束,直接放在新的槽中" '复制e节点到第一个索引值相同(该结点和后继结点index相同)节之间的HashEntry节点' for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }