/**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments;
ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。若是使用传统的技术,如HashMap中的实现,若是容许能够在hash链的中间添加或删除元素,读操做不加锁将获得不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,其结构以下所示:
static final class HashEntry<K,V> {
final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
能够看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,由于这须要修改next 引用值,全部的节点的修改只能从头部开始。对于put操做,能够一概添加到Hash链的头部。可是对于remove操做,可能须要从中间删除一个节点,这就须要将要删除节点的前面全部节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操做时还会详述。为了确保读操做可以看到最新的值,将value设置成volatile,这避免了加锁。
其它
为了加快定位段以及段中hash槽的速度,每一个段hash槽的的个数都是2^n,这使得经过位运算就能够定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪一个段中。可是咱们也不要忘记《算法导论》给咱们的教训:hash槽的的个数不该该是 2^n,这可能致使hash槽分配不均,这须要对hash值从新再hash一次。(这段彷佛有点多余了 )
定位操做:
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask]; }
既然ConcurrentHashMap使用分段锁Segment来保护不一样段的数据,那么在插入和获取元素的时候,必须先经过哈希算法定位到Segment。能够看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
再哈希,其目的是为了减小哈希冲突,使元素可以均匀的分布在不一样的Segment上,从而提升容器的存取效率。假如哈希的质量差到极点,那么全部的元素都在一个Segment中,不只存取元素缓慢,分段锁也会失去意义。我作了一个测试,不经过再哈希而直接执行哈希计算。
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
计算后输出的哈希值全是15,经过这个例子能够发现若是不进行再哈希,哈希冲突会很是严重,由于只要低位同样,不管高位是什么数,其哈希值老是同样。咱们再把上面的二进制数据进行再哈希后结果以下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
能够发现每一位的数据都散列开了,经过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减小哈希冲突。ConcurrentHashMap经过如下哈希算法定位segment。
默认状况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,能够看到hash值没有发生冲突。
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask]; }
数据结构
全部的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。
关于Hash表的基础数据结构,这里不想作过多的探讨。Hash表的一个很重要方面就是如何解决hash冲突,ConcurrentHashMap 和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不一样的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。
每一个Segment至关于一个子Hash表,它的数据成员以下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* The number of elements in this segment's region.
*/
transient volatileint count; /** * Number of updates that alter the size of the table. This is * used during bulk-read methods to make sure they see a * consistent snapshot: If modCounts change during a traversal * of segments computing size or checking containsValue, then * we might have an inconsistent view of state so (usually) * must retry. */ transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient int threshold; /** * The per-segment table. */ transient volatile HashEntry<K,V>[] table; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor; }
count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操做,以保证读取操做可以读取到几乎最新的修改。协调方式是这样的,每次修改操做作告终构上的改变,如增长/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操做开始都要读取count的值。这利用了 Java 5中对volatile语义的加强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程当中某个段是否发生改变,在讲述跨段操做时会还会详述。threashold用来表示须要进行rehash的界限值。table数组存储段中节点,每一个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得可以读取到最新的 table值而不须要同步。loadFactor表示负载因子。
删除操做remove(key)
public V remove(Object key) {
hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null); }
整个操做是先定位到段,而后委托给段的remove操做。当多个删除操做并发进行时,只要它们所在的段不相同,它们就能够同时进行。
下面是Segment的remove方法实现:
V remove(Object key, int hash, Object value) {
lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; *for (HashEntry<K,V> p = first; p != e; p = p.next) *newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
整个操做是在持有段锁的状况下执行的,空白行以前的行主要是定位到要删除的节点e。接下来,若是不存在这个节点就直接返回null,不然就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不须要复制,它们能够重用。
中间那个for循环是作什么用的呢?(*号标记)从代码来看,就是将定位以后的全部entry克隆并拼回前面去,但有必要吗?每次删除一个元素就要将那以前的元素克隆一遍?这点实际上是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其余全部属性都是用final来修饰的,这意味着在第一次设置了next域以后便不能再改变它,取而代之的是将它以前的节点全都克隆一次。至于entry为何要设置为不变性,这跟不变性的访问不须要同步从而节省时间有关
下面是个示意图
删除元素以前:
第二个图其实有点问题,复制的结点中应该是值为2的结点在前面,值为1的结点在后面,也就是恰好和原来结点顺序相反,还好这不影响咱们的讨论。
整个remove实现并不复杂,可是须要注意以下几点。第一,当要删除的结点存在时,删除的最后一步操做要将count的值减一。这必须是最后一步操做,不然读取操做可能看不到以前对段所作的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是由于table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写作任何优化,直接屡次访问非volatile实例变量没有多大影响,编译器会作相应优化。
get操做
ConcurrentHashMap的get操做是直接委托给Segment的get方法,直接看Segment的get方法:
V get(Object key, int hash) {
if (count != 0) { // read-volatile 当前桶的数据个数是否为0
HashEntry<K,V> e = getFirst(hash); 获得头节点 while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } returnnull; }
get操做不须要锁。
除非读到的值是空的才会加锁重读,咱们知道HashTable容器的get方法是须要加锁的,那么ConcurrentHashMap的get操做是如何作到不加锁的呢?缘由是它的get方法里将要使用的共享变量都定义成volatile
第一步是访问count变量,这是一个volatile变量,因为全部的修改操做在进行结构修改时都会在最后一步写count 变量,经过这种机制保证get操做可以获得几乎最新的结构更新。对于非结构更新,也就是结点值的改变,因为HashEntry的value变量是 volatile的,也能保证读取到最新的值。
接下来就是根据hash和key对hash链进行遍历找到要获取的结点,若是没有找到,直接访回null。对hash链进行遍历不须要加锁的缘由在于链指针next是final的。可是头指针却不是final的,这是经过getFirst(hash)方法返回,也就是存在 table数组中的值。这使得getFirst(hash)可能返回过期的头结点,例如,当执行get方法时,刚执行完getFirst(hash)以后,另外一个线程执行了删除操做并更新头结点,这就致使get方法中返回的头结点不是最新的。这是能够容许,经过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要获得最新的数据,只有采用彻底的同步。
最后,若是找到了所求的结点,判断它的值若是非空就直接返回,不然在有锁的状态下再读一次。这彷佛有些费解,理论上结点的值不可能为空,这是由于 put的时候就进行了判断,若是为空就要抛NullPointerException。空值的惟一源头就是HashEntry中的默认值,由于 HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操做的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被从新排序,这就可能致使结点的值为空。这里当v为空时,多是一个线程正在改变节点,而以前的get操做都未进行锁定,根据bernstein条件,读后写或写后读都会引发数据的不一致,因此这里要对这个e从新上锁再读一遍,以保证获得的是正确值。
V readValueUnderLock(HashEntry<K,V> e) {
lock(); try { return e.value; } finally { unlock(); } }
如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,可以在线程之间保持可见性,可以被多线程同时读,而且保证不会读到过时的值,可是只能被单线程写(有一种状况能够被多线程写,就是写入的值不依赖于原值),在get操做里只须要读不须要写共享变量count和value,因此能够不用加锁。之因此不会读到过时的值,是根据java内存模型的happen before原则,对volatile字段的写入操做先于读操做,即便两个线程同时修改和获取volatile变量,get操做也能拿到最新的值,这是用volatile替换锁的经典应用场景
put操做
一样地put操做也是委托给段的put方法。下面是段的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }
该方法也是在持有段锁(锁定整个segment)的状况下执行的,这固然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时可以rehash。接着是找是否存在一样一个key的结点,若是存在就直接替换这个结点的值。不然建立一个新的结点并添加到hash链的头部,这时必定要修改modCount和count的值,一样修改count的值必定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n,这里就不介绍了。而比较难懂的是这句int index = hash & (tab.length - 1),原来segment里面才是真正的hashtable,即每一个segment是一个传统意义上的hashtable,如上图,从二者的结构就能够看出区别,这里就是找出须要的entry在table的哪个位置,以后获得的entry就是这个链的第一个节点,若是e!=null,说明找到了,这是就要替换节点的值(onlyIfAbsent == false),不然,咱们须要new一个entry,它的后继是first,而让tab[index]指向它,什么意思呢?实际上就是将这个新entry插入到链头,剩下的就很是容易理解了
因为put方法里须要对共享变量进行写入操做,因此为了线程安全,在操做共享变量时必须得加锁。Put方法首先定位到Segment,而后在Segment里进行插入操做。插入操做须要经历两个步骤,第一步判断是否须要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置而后放在HashEntry数组里。
- 是否须要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),若是超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,由于HashMap是在插入元素后判断元素是否已经到达容量的,若是到达了就进行扩容,可是颇有可能扩容以后没有新元素插入,这时HashMap就进行了一次无效的扩容。
- 如何扩容。扩容的时候首先会建立一个两倍于原容量的数组,而后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
另外一个操做是containsKey,这个实现就要简单得多了,由于它不须要读取值:
boolean containsKey(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) returntrue; e = e.next; } } returnfalse; }
size()操做
若是咱们要统计整个ConcurrentHashMap里元素的大小,就必须统计全部Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,咱们是否是直接把全部Segment的count相加就能够获得整个ConcurrentHashMap大小了呢?不是的,虽然相加时能够获取每一个Segment的count的最新值,可是拿到以后可能累加前使用的count发生了变化,那么统计结果就不许了。因此最安全的作法,是在统计size的时候把全部Segment的put,remove和clean方法所有锁住,可是这种作法显然很是低效。
由于在累加count操做过程当中,以前累加过的count发生变化的概率很是小,因此ConcurrentHashMap的作法是先尝试2次经过不锁住Segment的方式来统计各个Segment大小,若是统计的过程当中,容器的count发生了变化,则再采用加锁的方式来统计全部Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操做元素前都会将变量modCount进行加1,那么在统计size先后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
参考: