ConcurrentHashMap采用了很是精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。Segment继承了ReentrantLock,因此它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不一样Segment的数据进行操做是不用考虑锁竞争的。c++
当问到咱们有关于ConcurrentHashMap的工做原理以及实现时,能够从如下几个方面说:数组
ConcurrentHashMap的优势,即HashMap和HashTable的缺点。安全
ConcurrentHashMap是Java1.5中引用的一个线程安全的支持高并发的HashMap集合类。多线程
1、线程不安全的HashMap并发
由于多线程环境下,使用Hashmap进行put操做会引发死循环,致使CPU利用率接近100%,因此在并发状况下不能使用HashMap。ssh
2、效率低下的HashTable函数
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。高并发
由于当一个线程访问HashTable的同步方法时,其余线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。this
如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,而且也不能使用get方法来获取元素,因此竞争越激烈效率越低。spa
3、锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由,是由于全部访问HashTable的线程都必须竞争同一把锁,
那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,
从而能够有效的提升并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。
首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。
有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。
这里“按顺序”是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,
可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。
oncurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。
HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每一个 Segment 对象守护整个散列映射表的若干个桶。
每一个桶是由若干个 HashEntry 对象连接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。
每一个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到它对应的Segment锁。
5、HashEntry类
static final class HashEntry<K,V> { final K key; // 声明 key 为 final 型 final int hash; // 声明 hash 值为 final 型 volatile V value; // 声明 value 为 volatile 型 final HashEntry<K,V> next; // 声明 next 为 final 型 HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } }
每一个HashEntry表明Hash表中的一个节点,在其定义的结构中能够看到,除了value值没有定义final,其他的都定义为final类型,咱们知道Java中关键词final修饰的域成为最终域。
用关键词final修饰的变量一旦赋值,就不能改变,也称为修饰的标识为常量。这就意味着咱们删除或者增长一个节点的时候,就必须从头开始从新创建Hash链,由于next引用值须要改变。
因为 HashEntry 的 next 域为 final 型,因此新节点只能在链表的表头处插入。 例如将A,B,C插入空桶中,插入后的结构为:
6、segment类
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 在本 segment 范围内,包含的 HashEntry 元素的个数 * 该变量被声明为 volatile 型,保证每次读取到最新的数据 */ transient volatile int count; /** *table 被更新的次数 */ transient int modCount; /** * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列 */ transient int threshold; /** * table 是由 HashEntry 对象组成的数组 * 若是散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式连接成一个链表 * table 数组的数组成员表明散列映射表的一个桶 * 每一个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分 * 若是并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 */ transient volatile HashEntry<K,V>[] table; /** * 装载因子 */ final float loadFactor; }
Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每一个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。
table 是一个由 HashEntry 对象组成的数组。table 数组的每个数组成员就是散列映射表的一个桶。
每个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。
之因此在每一个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了不出现“热点域”而影响 ConcurrentHashMap 的并发性。
下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图。
7、ConcurrentHashMap 类
默认的状况下,每一个ConcurrentHashMap 类会建立16个并发的segment,每一个segment里面包含多个Hash表,每一个Hash链都是有HashEntry节点组成的。
若是键能均匀散列,每一个 Segment 大约守护整个散列表中桶总数的 1/16。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 散列映射表的默认初始容量为 16,即初始默认为 16 个桶 * 在构造函数中没有指定这个参数时,使用本参数 */ static final int DEFAULT_INITIAL_CAPACITY= 16; /** * 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与 * table 数组长度的比值 * 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时, * 将触发 再散列 * 在构造函数中没有指定这个参数时,使用本参数 */ static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数 * 在构造函数中没有指定这个参数时,使用本参数 */ static final int DEFAULT_CONCURRENCY_LEVEL= 16; /** * segments 的掩码值 * key 的散列码的高位用来选择具体的 segment */ final int segmentMask; /** * 偏移量 */ final int segmentShift; /** * 由 Segment 对象组成的数组 */ final Segment<K,V>[] segments; /** * 建立一个带有指定初始容量、加载因子和并发级别的新的空映射。 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂) int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 偏移量值 segmentMask = ssize - 1; // 掩码值 this.segments = Segment.newArray(ssize); // 建立数组 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; while(cap < c) cap <<= 1; // 依次遍历每一个数组元素 for(int i = 0; i < this.segments.length; ++i) // 初始化每一个数组元素引用的 Segment 对象 this.segments[i] = new Segment<K,V>(cap, loadFactor); } /** * 建立一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) * 的空散列映射表。 */ public ConcurrentHashMap() { // 使用三个默认参数,调用上面重载的构造函数来建立空散列映射表 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
8、 用分离锁实现多个线程间的并发写操做
插入数据后的ConcurrentHashMap的存储形式
(1)Put方法的实现
首先,根据 key 计算出对应的 hash 值:
public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不容许用 null 做为映射值 throw new NullPointerException(); int hash = hash(key.hashCode()); // 计算键对应的散列码 // 根据散列码找到对应的 Segment return segmentFor(hash).put(key, hash, value, false); } 根据 hash 值找到对应的 Segment: /** * 使用 key 的散列码来获得 segments 数组中对应的 Segment */ final Segment<K,V> segmentFor(int hash) { // 将散列值右移 segmentShift 个位,并在高位填充 0 // 而后把获得的值与 segmentMask 相“与” // 从而获得 hash 值对应的 segments 数组的下标值 // 最后根据下标值返回散列码对应的 Segment 对象 return segments[(hash >>> segmentShift) & segmentMask]; }
在这个 Segment 中执行具体的 put 操做:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 若是超过再散列的阈值 rehash(); // 执行再散列,table 数组的长度将扩充一倍 HashEntry<K,V>[] tab = table; // 把散列码值与 table 数组的长度减 1 的值相“与” // 获得该散列码对应的 table 数组的下标值 int index = hash & (tab.length - 1); // 找到散列码对应的具体的那个桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 若是键 / 值对以经存在 oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 设置 value 值 } else { // 键 / 值对不存在 oldValue = null; ++modCount; // 要添加新节点到链表中,因此 modCont 要加 1 // 建立新节点,并添加到链表的头部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 写 count 变量 } return oldValue; } finally { unlock(); // 解锁 } `}
这里的加锁操做是针对(键的 hash 值对应的)某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。
由于插入键 / 值对操做只是在这个 Segment 包含的某个桶中完成,不须要锁定整个ConcurrentHashMap。
此时,其余写线程对另外 15 个Segment 的加锁并不会由于当前线程对这个 Segment 的加锁而阻塞。
同时,全部读线程几乎不会因本线程的加锁而阻塞(除非读线程恰好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时须要加锁后从新读取该值)。
(2)Get方法的实现
V get(Object key, int hash) { if(count != 0) { // 首先读 count 变量 HashEntry<K,V> e = getFirst(hash); while(e != null) { if(e.hash == hash && key.equals(e.key)) { V v = e.value; if(v != null) return v; // 若是读到 value 域为 null,说明发生了重排序,加锁后从新读取 return readValueUnderLock(e); } e = e.next; } } return null; } V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
ConcurrentHashMap中的读方法不须要加锁,全部的修改操做在进行结构修改时都会在最后一步写count 变量,经过这种机制保证get操做可以获得几乎最新的结构更新。
(3)Remove方法的实现
V remove(Object key, int hash, Object value) { lock(); //加锁 try{ int c = count - 1; HashEntry<K,V>[] tab = table; //根据散列码找到 table 的下标值 int index = hash & (tab.length - 1); //找到散列码对应的那个桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while(e != null&& (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if(e != null) { V v = e.value; if(value == null|| value.equals(v)) { //找到要删除的节点 oldValue = v; ++modCount; //全部处于待删除节点以后的节点原样保留在链表中 //全部处于待删除节点以前的节点被克隆到新链表中 HashEntry<K,V> newFirst = e.next;// 待删节点的后继结点 for(HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); //把桶连接到新的头结点 //新的头结点是原链表中,删除节点以前的那个节点 tab[index] = newFirst; count = c; //写 count 变量 } } return oldValue; } finally{ unlock(); //解锁 } }
整个操做是在持有段锁的状况下执行的,空白行以前的行主要是定位到要删除的节点e。
若是不存在这个节点就直接返回null,不然就要将e前面的结点复制一遍,尾结点指向e的下一个结点。
e后面的结点不须要复制,它们能够重用。
中间那个for循环是作什么用的呢?从代码来看,就是将定位以后的全部entry克隆并拼回前面去,但有必要吗?
每次删除一个元素就要将那以前的元素克隆一遍?这点实际上是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其余全部属性都是用final来修饰的,
这意味着在第一次设置了next域以后便不能再改变它,取而代之的是将它以前的节点全都克隆一次。至于entry为何要设置为不变性,这跟不变性的访问不须要同步从而节省时间有关。
(4)containsKey方法的实现,它不须要读取值。
boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
(5)size()
咱们要统计整个ConcurrentHashMap里元素的大小,就必须统计全部Segment里元素的大小后求和。
Segment里的全局变量count是一个volatile变量,那么在多线程场景下,咱们是否是直接把全部Segment的count相加就能够获得整个ConcurrentHashMap大小了呢?
不是的,虽然相加时能够获取每一个Segment的count的最新值,可是拿到以后可能累加前使用的count发生了变化,那么统计结果就不许了。
因此最安全的作法,是在统计size的时候把全部Segment的put,remove和clean方法所有锁住,可是这种作法显然很是低效。
由于在累加count操做过程当中,以前累加过的count发生变化的概率很是小,因此ConcurrentHashMap的作法是先尝试2次经过不锁住Segment的方式来统计各个Segment大小,若是统计的过程当中,容器的count发生了变化,则再采用加锁的方式来统计全部Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操做元素前都会将变量modCount进行加1,那么在统计size先后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
9、总结
1.在使用锁来协调多线程间并发访问的模式下,减少对锁的竞争能够有效提升并发性。
有两种方式能够减少对锁的竞争:
减少请求同一个锁的频率。
减小持有锁的时间。
2.ConcurrentHashMap 的高并发性主要来自于三个方面:
用分离锁实现多个线程间的更深层次的共享访问。
用 HashEntery 对象的不变性来下降执行读操做的线程在遍历链表期间对加锁的需求。
经过对同一个 Volatile 变量的写 / 读访问,协调不一样线程间读 / 写操做的内存可见性。
使用分离锁,减少了请求同一个锁的频率。