Hashmap多线程会致使HashMap的Entry链表造成环形数据结构,一旦造成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。java
HashTable使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,因此竞争越激烈效率越低。node
HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由,是由于全部访问HashTable的线程都必须竞争同一把锁。算法
那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效的提升并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。数组
另外,ConcurrentHashMap能够作到读取数据不加锁,而且其内部的结构可让其在进行写操做的时候可以将锁的粒度保持地尽可能地小,不用对整个ConcurrentHashMap加锁。安全
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。数据结构
Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。多线程
一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap相似,是一种数组和链表结构,并发
一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素,app
每一个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到它对应的Segment锁。ssh
结构图以下:
从上面的结构咱们能够了解到,ConcurrentHashMap定位一个元素的过程须要进行两次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,所以,这一种结构的带来的反作用是Hash的过程要比普通的HashMap要长,可是带来的好处是写操做的时候能够只对元素所在的Segment进行加锁便可,不会影响到其余的Segment,这样,在最理想的状况下,ConcurrentHashMap能够最高同时支持Segment数量大小的写操做(恰好这些写操做都很是平均地分布在全部的Segment上),因此,经过这一种结构,ConcurrentHashMap的并发能力能够大大的提升。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;
}
复制代码
count用来统计该段数据的个数,它是volatile变量,它用来协调修改和读取操做,以保证读取操做可以读取到几乎最新的修改。协调方式是这样的,每次修改操做作告终构上的改变,如增长/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操做开始都要读取count的值。这利用了 Java 5中对volatile语义的加强,对同一个volatile变量的写和读存在happens-before关系。
modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程当中某个段是否发生改变。
threashold用来表示须要进行rehash的界限值。
table数组存储段中节点,每一个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得可以读取到最新的 table值而不须要同步。loadFactor表示负载因子。
Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
复制代码
能够看到HashEntry的一个特色,除了value之外,其余的几个变量都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,由于这须要修改next 引用值,全部的节点的修改只能从头部开始(头插法)。
对于put操做,能够一概添加到Hash链的头部。
可是对于remove操做,可能须要从中间删除一个节点,这就须要将要删除节点的前面全部节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。。为了确保读操做可以看到最新的值,将value设置成volatile,这避免了加锁。
...
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的并发等级,表示当前更新线程的估计数
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment数量
static final int MAX_SEGMENTS = 1 << 16;
//
static final int RETRIES_BEFORE_LOCK = 2;
// segments 的掩码值, key 的散列码的高位用来选择具体的 segment
final int segmentMask;
// 偏移量
final int segmentShift;
final Segment<K,V>[] segments;
复制代码
// 建立一个带有指定初始容量、加载因子和并发级别的新的空映射
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 寻找最佳匹配参数(不小于给定参数的最接近的 2^n)
int sshift = 0; // 用来记录向左按位移动的次数
int ssize = 1; // 用来记录Segment数组的大小
// 计算并行级别 ssize,由于要保持并行级别是 2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//用于定位元素所在segment。
//segmentShift表示偏移位数,经过前面的int类型的位的描述咱们能够得知,int类型的数字在变大的过程当中,
//低位老是比高位先填满的,为保证元素在segment级别分布的尽可能均匀,计算元素所在segment时,
//老是取hash值的高位进行计算。segmentMask做用就是为了利用位运算中取模的操做:
//a % (Math.pow(2,n)) 等价于 a&( Math.pow(2,n)-1)
// 若为默认值,concurrencyLevel 为 16,sshift 为 4
// 那么计算出 segmentShift 为 28,segmentMask 为 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 记录每一个 Segment 上要放置多少个元素
int c = initialCapacity / ssize;
// 假若有余数,则Segment数量加1
if (c * ssize < initialCapacity)
++c;
//保证每一个Segment中tabel数组的大小,必定为2的幂,初始化的三个参数取默认值时,table数组大小为2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化Segment数组,并实际只填充Segment数组的第0个元素。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
复制代码
CurrentHashMap的初始化一共有三个参数:
整个ConcurrentHashMap的初始化方法仍是很是简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操做来进行hash,加快hash的过程。
接下来就是根据intialCapacity肯定Segment的容量的大小,每个Segment的容量大小也是2的指数,一样使为了加快hash的过程。
这边须要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的做用,假设构造函数肯定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。
当用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
private int hash(Object k) {
int h = hashSeed;
//若是Key是字符串类型,则使用专门为字符串设计的Hash方法,不然使用一连串的异或操做增长hash随机性
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
复制代码
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其余槽来讲,在插入第一个值的时候进行初始化。
这里须要考虑并发,由于极可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就能够。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 这里看到为何以前要初始化 segment[0] 了,
// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
// 为何要用“当前”,由于 segment[0] 可能早就扩容过了
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 内部的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck Segment[k] 是否被其它线程初始化了
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循环,内部用 CAS,当前线程成功设值或其余线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
复制代码
当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,若是相应位置的Segment还未初始化,则经过CAS进行赋值,接着执行Segment对象的put方法经过加锁机制插入数据
场景:线程A和线程B同时执行相同Segment对象的put方法
1. 线程A执行tryLock()方法成功获取锁,则把HashEntry对象插入到相应的位置;
2. 线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会经过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;
3. 当线程A执行完插入操做时,会经过unlock()方法释放锁,接着唤醒线程B继续执行;
复制代码
put 方法的过程:
判断value是否为null,若是为null,直接抛出异常。注:不容许key或者value为null
经过哈希算法定位到Segment(key经过一次hash运算获得一个hash值,将获得hash值向右按位移动segmentShift位,而后再与segmentMask作&运算获得segment的索引j)。
使用Unsafe的方式从Segment数组中获取该索引对应的Segment对象
向这个Segment对象中put值
注:对共享变量进行写入操做为了线程安全,在操做共享变量时必须得加锁,持有段锁(锁定整个segment)的状况下执行的。修改数据是不能并发进行的
判断该值的插入是否会致使该 segment 的元素个数超过阈值,以确保容量不足时可以rehash扩容,再插值。
注:rehash 扩容 segment 数组不能扩容,扩容的是 segment 数组某个位置内部的数组 HashEntry[] 扩容为原来的 2 倍。先进行扩容,再插值
查找是否存在一样一个key的结点,存在直接替换这个结点的值。不然建立一个新的结点并添加到hash链的头部,修改modCount和count的值,修改count的值必定要放在最后一步。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
// 而后和 segmentMask(15) 作一次与操做,也就是说 j 是 hash 值的高 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],可是其余位置仍是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
复制代码
Segment 内部是由 数组+链表 组成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 先获取该 segment 的独占锁
// 每个Segment进行put时,都会加锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// segment 内部的数组
HashEntry<K,V>[] tab = table;
// 利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// 数组该位置处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// 若是链头不为 null
if (e != null) {
K k;
//若是在该链中找到相同的key,则用新值替换旧值,并退出循环
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//若是没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
e = e.next;
}
else {
// node 究竟是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 若是不为 null,那就直接将它设置为链表表头;若是是null,初始化并设置为链表表头。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 若是超过了该 segment 的阈值,这个 segment 须要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 没有达到阈值,将 node 放到数组 tab 的 index 位置,
// 其实就是将新的节点设置成原链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}
复制代码
计算 hash 值,找到 segment 数组中的具体位置,使用Unsafe获取对应的Segment
根据 hash 找到数组中具体的位置
从链表头开始遍历整个链表(由于Hash可能会有碰撞,因此用一个链表保存),若是找到对应的key,则返回对应的value值,不然返回null。
注:get操做不须要锁,因为其中涉及到的共享变量都使用volatile修饰,volatile能够保证内存可见性,因此不会读取到过时数据。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
复制代码
Remove操做的前面一部分和前面的get和put操做同样,都是定位Segment的过程,而后再调用Segment的remove方法:
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
复制代码
首先remove操做也是肯定须要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,前面已经说过HashEntry中的next是final的,一经赋值之后就不可修改,在定位到待删除元素的位置之后,程序就将待删除元素前面的那一些元素所有复制一遍,而后再一个一个从新接到链表上去,看一下下面这一幅图来了解这个过程:
假设链表中原来的元素如上图所示,如今要删除元素3,那么删除元素3之后的链表就以下图所示:
注意:图1和2的元素顺序相反了,为何这样,不防再仔细看看源码或者再读一遍上面remove的分析过程,元素复制是从待删除元素位置起将前面的元素逐一复制的,而后再将后面的连接起来。
size操做须要遍历全部的Segment才能算出整个Map的大小。先采用不加锁的方式,循环全部的Segment(经过Unsafe的getObjectVolatile()以保证原子读语义)连续计算元素的个数,最多计算3次:
注:在put,remove和clean方法里操做元素前都会将变量modCount进行加1,那么在统计size先后比较modCount是否发生变化,从而得知容器的大小是否发生变化。