HashMap是非线程安全的,在并发场景中若是不保持足够的同步,就有可能在执行HashMap.get时进入死循环,将CPU的消耗到100%。java
HashMap采用链表解决Hash冲突。由于是链表结构,那么就很容易造成闭合的链路,这样在循环的时候只要有线程对这个HashMap进行get操做就会产生死循环,node
单线程状况下,只有一个线程对HashMap的数据结构进行操做,是不可能产生闭合的回路的。c++
只有在多线程并发的状况下才会出现这种状况,那就是在put操做的时候,若是size>initialCapacity*loadFactor,hash表进行扩容,那么这时候HashMap就会进行rehash操做,随之HashMap的结构就会很大的变化。颇有可能就是在两个线程在这个时候同时触发了rehash操做,产生了闭合的回路。算法
推荐使用currentHashMap编程
多线程下[HashMap]的问题:数组
一、多线程put操做后,get操做致使死循环。
二、多线程put非NULL元素后,get操做获得NULL值。
三、多线程put操做,致使元素丢失。安全
Java的HashMap是非线程安全的,因此在并发下必然出现问题,如下作详细的解释:数据结构
从前咱们的Java代码由于一些缘由使用了HashMap这个东西,可是当时的程序是单线程的,一切都没有问题。由于考虑到程序性能,因此须要变成多线程的,因而,变成多线程后到了线上,发现程序常常占了100%的CPU,查看堆栈,你会发现程序都Hang在了HashMap.get()这个方法上了,重启程序后问题消失。可是过段时间又会来。并且,这个问题在测试环境里可能很难重现。多线程
咱们简单的看一下咱们本身的代码,咱们就知道HashMap被多个线程操做。而Java的文档说HashMap是非线程安全的,应该用ConcurrentHashMap。并发
简单地说一下HashMap这个经典的数据结构。
HashMap一般会用一个指针数组(假设为table[])来作分散全部的key,当一个key被加入时,会经过Hash算法经过key算出这个数组的下标i,而后就把这个<key, value>插到table[i]中,若是有两个不一样的key被算在了同一个i,那么就叫冲突,又叫碰撞,这样会在table[i]上造成一个链表。
咱们知道,若是table[]的尺寸很小,好比只有2个,若是要放进10个keys的话,那么碰撞很是频繁,因而一个O(1)的查找算法,就变成了链表遍历,性能变成了O(n),这是Hash表的缺陷(可参看《Hash Collision DoS 问题》)。
因此,Hash表的尺寸和容量很是的重要。通常来讲,Hash表这个容器当有数据要插入时,都会检查容量有没有超过设定的thredhold,若是超过,须要增大Hash表的尺寸,这样一来,整个Hash表里的无素都须要被重算一遍。这叫rehash,这个成本至关的大。
下面,咱们来看一下Java的HashMap的源代码。
Put一个Key,Value对到Hash表中:
public V put(K key, V value) { ...... //算Hash值 int hash = hash(key.hashCode()); int i = indexFor(hash, table.length); //若是该key已被插入,则替换掉旧的value (连接操做) for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; //该key不存在,须要增长一个结点 addEntry(hash, key, value, i); return null; }
检查容量是否超标
void addEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<K,V>(hash, key, value, e); //查看当前的size是否超过了咱们设定的阈值threshold,若是超过,须要resize if (size++ >= threshold) resize(2 * table.length); }
新建一个更大尺寸的hash表,而后把数据从老的Hash表中迁移到新的Hash表中。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; ...... //建立一个新的Hash Table Entry[] newTable = new Entry[newCapacity]; //将Old Hash Table上的数据迁移到New Hash Table上 transfer(newTable); table = newTable; threshold = (int)(newCapacity * loadFactor); }
迁移的源代码,注意高亮处:
void transfer(Entry[] newTable) { Entry[] src = table; int newCapacity = newTable.length; //下面这段代码的意思是: // 从OldTable里摘一个元素出来,而后放到NewTable中 for (int j = 0; j < src.length; j++) { Entry<K,V> e = src[j]; if (e != null) { src[j] = null; do { Entry<K,V> next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null); } } }
好了,这个代码算是比较正常的。并且没有什么问题。
画了个图作了个演示。
并发下的Rehash
1)假设咱们有两个线程。我用红色和浅蓝色标注了一下。
咱们再回头看一下咱们的 transfer代码中的这个细节:
而咱们的线程二执行完成了。因而咱们有下面的这个样子。
注意,由于Thread1的 e 指向了key(3),而next指向了key(7),其在线程二rehash后,指向了线程二重组后的链表。咱们能够看到链表的顺序被反转后。
2)线程一被调度回来执行。
3)一切安好。
线程一接着工做。把key(7)摘下来,放到newTable[i]的第一个,而后把e和next往下移。
4)环形连接出现。
e.next = newTable[i] 致使 key(3).next 指向了 key(7)
注意:此时的key(7).next 已经指向了key(3), 环形链表就这样出现了。
因而,当咱们的线程一调用到,HashTable.get(11)时,悲剧就出现了——Infinite Loop。
有人把这个问题报给了Sun,不过Sun不认为这个是一个问题。由于HashMap原本就不支持并发。要并发就用ConcurrentHashmap
我在这里把这个事情记录下来,只是为了让你们了解并体会一下并发环境下的危险。
ConcurrentHashMap的读取并发,由于在读取的大多数时候都没有用到锁定,因此读取操做几乎是彻底的并发操做,而写操做锁定的粒度又很是细。只有在求size等操做时才须要锁定整个表。而在迭代时,ConcurrentHashMap使用了不一样于传统集合的快速失败迭代器的弱一致迭代器。在这种迭代方式中,当iterator被建立后集合再发生改变就再也不是抛出ConcurrentModificationException,取而代之的是在改变时new新的数据从而不影响原有的数据,iterator完成后再将头指针替换为新的数据,这样iterator线程可使用原来老的数据,而写线程也能够并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提高的关键。
Hashtable继承的是Dictionary(Hashtable是其惟一公开的子类),HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法时,其余线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,而且也不能使用get方法来获取元素,因此竞争越激烈效率越低。
Hashtable的实现方式---锁整个hash表;而ConcurrentHashMap的实现方式---锁桶(或段)
HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由,是由于全部访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效的提升并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。
从上面看出,ConcurrentHashMap定位一个元素的过程须要进行两次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,所以,这一种结构的带来的反作用是Hash的过程要比普通的HashMap要长,可是带来的好处是写操做的时候能够只对元素所在的Segment进行加锁便可,不会影响到其余的Segment,这样,在最理想的状况下,ConcurrentHashMap能够最高同时支持Segment数量大小的写操做(恰好这些写操做都很是平均地分布在全部的Segment上),并发能力大大提升。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap相似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素, 每一个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到它对应的Segment锁。
3、ConcurrentHashMap实现原理
锁分离 (Lock Stripping)
ConcurrentHashMap内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的hash table,它们有本身的锁。只要多个修改操做发生在不一样的段上,它们就能够并发进行。一样当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。
ConcurrentHashMap有些方法须要跨段,好比size()和containsValue(),它们可能须要锁定整个表而而不只仅是某个段,这须要按顺序锁定全部段,操做完毕后,又按顺序释放全部段的锁。这里"按顺序"是很重要的,不然极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,而且其成员变量实际上也是final的,可是,仅仅是将数组声明为final的并不保证数组成员也是final的,这须要实现上的保证。这能够确保不会出现死锁,由于得到锁的顺序是固定的。不变性是多线程编程占有很重要的地位,下面还要谈到。
final Segment<K,V>[] segments;
不变(Immutable)和易变(Volatile)
ConcurrentHashMap彻底容许多个读操做并发进行,读操做并不须要加锁。若是使用传统的技术,如HashMap中的实现,若是容许能够在hash链的中间添加或删除元素,读操做不加锁将获得不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry表明每一个hash链中的一个节点,其结构以下所示:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
能够看到除了value不是final的,其它值都是final的,为了防止链表结构被破坏,出现ConcurrentModification的状况。这意味着不能从hash链的中间或尾部添加或删除节点,由于这须要修改next引用值,全部的节点的修改只能从头部开始。对于put操做,能够一概添加到Hash链的头部。可是对于remove操做,可能须要从中间删除一个节点,这就须要将要删除节点的前面全部节点整个复制一遍,最后一个节点指向要删除结点的下一个结点,为了确保读操做可以看到最新的值,将value设置成volatile,这避免了加锁。
下面咱们来结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:
CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,表明ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续若是ConcurrentHashMap的元素数量增长致使ConrruentHashMap须要扩容,ConcurrentHashMap不会增长Segment的数量,而只会增长Segment中链表数组的容量大小,这样的好处是扩容过程不须要对整个ConcurrentHashMap作rehash,而只须要对Segment里面的元素作一次rehash就能够了。
整个ConcurrentHashMap的初始化方法仍是很是简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操做来进行hash,加快hash的过程。接下来就是根据intialCapacity肯定Segment的容量的大小,每个Segment的容量大小也是2的指数,一样使为了加快hash的过程。
这边须要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的做用,假设构造函数肯定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。
前面提到过ConcurrentHashMap的get操做是不用加锁的,咱们这里看一下其实现:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
segmentFor这个函数用于肯定操做应该在哪个segment中进行,几乎对ConcurrentHashMap的全部操做都须要用到这个函数,咱们看下这个函数的实现:
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
这个函数用了位操做来肯定Segment,根据传入的hash值向右无符号右移segmentShift位,而后和segmentMask进行与操做,结合咱们以前说的segmentShift和segmentMask的值,就能够得出如下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就能够肯定元素到底在哪个Segment中。
在肯定了须要在哪个segment中进行操做之后,接下来的事情就是调用对应的Segment的get方法:
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
get操做不须要锁。第一步是访问count变量,这是一个volatile变量,因为全部的修改操做在进行结构修改时都会在最后一步写count变量,经过这种机制保证get操做可以获得几乎最新的结构更新。对于非结构更新,也就是结点值的改变,因为HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,若是没有找到,直接访回null。对hash链进行遍历不须要加锁的缘由在于链指针next是final的。可是头指针却不是final的,这是经过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过期的头结点,例如,当执行get方法时,刚执行完getFirst(hash)以后,另外一个线程执行了删除操做并更新头结点,这就致使get方法中返回的头结点不是最新的。这是能够容许,经过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要获得最新的数据,只有采用彻底的同步。
V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
最后,若是找到了所求的结点,判断它的值若是非空就直接返回,不然在有锁的状态下再读一次。这彷佛有些费解,理论上结点的值不可能为空,这是由于put的时候就进行了判断,若是为空就要抛NullPointerException。空值的惟一源头就是HashEntry中的默认值,由于HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操做的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被从新排序,这就可能致使结点的值为空。这种状况应当很罕见,一旦发生这种状况,ConcurrentHashMap采起的方式是在持有锁的状况下再读一遍,这可以保证读到最新的值,而且必定不会为空值。
由于实际上put、remove等操做也会更新count的值,因此当竞争发生的时候,volatile的语义能够保证写操做在读操做以前,也就保证了写操做对后续的读操做都是可见的,这样后面get的后续操做就能够拿到完整的元素内容。
看完了get操做,再看下put操做,put操做的前面也是肯定Segment的过程,直接看关键的segment的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); //加锁
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash(); //看是否须要rehash
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index]; //肯定链表头部的位置
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //若是存在,替换掉value
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount; //修改modCount和count?
tab[index] = new HashEntry<K,V>(key, hash, first, value); //建立一个新的结点并添加到hash链的头部
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
首先对Segment的put操做是加锁完成的,而后在第五行,若是Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这须要进行对Segment扩容,而且要进行rehash。
第8和第9行的操做就是getFirst的过程,肯定链表头部的位置。
第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,若是找到,就直接更新更新key的value,若是没有找到,则进入21行这里,生成一个新的HashEntry而且把它加到整个Segment的头部,而后再更新count的值。
修改操做还有putAll和replace。putAll就是屡次调用put方法。replace甚至不用作结构上的更改,实现要比put和delete要简单得多.
Remove操做的前面一部分和前面的get和put操做同样,都是定位Segment的过程,而后再调用Segment的remove方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; //空白行以前的行主要是定位到要删除的节点e V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
整个操做是先定位到段,而后委托给段的remove操做。当多个删除操做并发进行时,只要它们所在的段不相同,它们就能够同时进行。下面是Segment的remove方法实现
首先remove操做也是肯定须要删除的元素的位置,HashEntry中的next是final的,一经赋值之后就不可修改,在定位到待删除元素的位置之后,程序就将待删除元素前面的那一些元素所有复制一遍,而后再一个一个从新接到链表上去.
将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不须要复制,它们能够重用.
假设链表中原来的元素如上图所示,如今要删除元素3,那么删除元素3之后的链表就以下图所示:
在前面的章节中,咱们涉及到的操做都是在单个Segment中进行的,可是ConcurrentHashMap有一些操做是在多个Segment中进行,好比size操做,ConcurrentHashMap的size操做也采用了一种比较巧的方式,来尽可能避免对全部的Segment都加锁。
前面咱们提到了一个Segment中的有一个modCount变量,表明的是对Segment中元素的数量形成影响的操做的次数,这个值只增不减,size操做就是遍历了两次Segment,每次记录Segment的modCount值,而后将两次的modCount进行比较,若是相同,则表示期间没有发生过写入操做,就将原先遍历的结果返回,若是不相同,则把这个过程再重复作一次,若是再不相同,则就须要将全部的Segment都锁住,而后一个一个遍历了.
参考:http://www.iteye.com/topic/344876