🖕欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。java
前记,从这篇文章开始咱们换一种学习的方式,彤哥先抛出问题,你们尝试着在脑海中回答这些问题,而后再进入咱们的源码分析过程,最后彤哥再挑几个问题回答。git
(1)ConcurrentHashMap与HashMap的数据结构是否同样?数组
(2)HashMap在多线程环境下什么时候会出现并发安全问题?安全
(3)ConcurrentHashMap是怎么解决并发安全问题的?数据结构
(4)ConcurrentHashMap使用了哪些锁?多线程
(5)ConcurrentHashMap的扩容是怎么进行的?并发
(6)ConcurrentHashMap是不是强一致性的?源码分析
(7)ConcurrentHashMap不能解决哪些问题?性能
(8)ConcurrentHashMap中有哪些不常见的技术值得学习?学习
ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。
相比于一样线程安全的HashTable来讲,效率等各方面都有极大地提升。
这里先简单介绍一下各类锁,以便下文讲到相关概念时能有个印象。
(1)synchronized
java中的关键字,内部实现为监视器锁,主要是经过对象监视器在对象头中的字段来代表的。
synchronized从旧版本到如今已经作了不少优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。
偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,下降获取锁的代价。
轻量级锁,是指当锁是偏向锁时,被另外一个线程所访问,偏向锁会升级为轻量级锁,这个线程会经过自旋的方式尝试获取锁,不会阻塞,提升性能。
重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了必定的次数后,尚未获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其余线程阻塞,性能下降。
(2)CAS
CAS,Compare And Swap,它是一种乐观锁,认为对于同一个数据的并发操做不必定会发生修改,在更新数据的时候,尝试去更新数据,若是失败就不断尝试。
(3)volatile(非锁)
java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其余线程可以当即看获得修改的值。(这里牵涉到java内存模型的知识,感兴趣的同窗能够本身查查相关资料)
volatile只保证可见性,不保证原子性,好比 volatile修改的变量 i,针对i++操做,不保证每次结果都正确,由于i++操做是两步操做,至关于 i = i +1,先读取,再加1,这种状况volatile是没法保证的。
(4)自旋锁
自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减小线程的上下文切换带来的开锁,提升性能,缺点是循环会消耗CPU。
(5)分段锁
分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操做,当操做不须要更新整个数组时,就只锁数组中的一项就能够了。
(5)ReentrantLock
可重入锁,是指一个线程获取锁以后再尝试获取锁时会自动获取锁,可重入锁的优势是避免死锁。
其实,synchronized也是可重入锁。
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
复制代码
构造方法与HashMap对比能够发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,并且只存储了容量在里面,那么它是怎么用的呢?官方给出的解释以下:
(1)-1,表示有线程正在进行初始化操做
(2)-(1 + nThreads),表示有n个线程正在一块儿扩容
(3)0,默认值,后续在真正初始化的时候使用默认容量
(4)> 0,初始化或扩容完成后下一次的扩容门槛
至于,官方这个解释对不对咱们后面再讨论。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key和value都不能为null
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
// 要插入的元素所在桶的元素个数
int binCount = 0;
// 死循环,结合CAS使用(若是CAS失败,则会从新取整个桶进行下面的流程)
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 若是桶未初始化或者桶个数为0,则初始化桶
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 若是要插入的元素所在的桶尚未元素,则把这个元素插入到这个桶中
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 若是使用CAS插入元素时,发现已经有元素了,则进入下一次循环,从新操做
// 若是使用CAS插入元素成功,则break跳出循环,流程结束
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 若是要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一块儿迁移元素
tab = helpTransfer(tab, f);
else {
// 若是这个桶不为空且不在迁移元素,则锁住这个桶(分段锁)
// 并查找要插入的元素是否在这个桶中
// 存在,则替换值(onlyIfAbsent=false)
// 不存在,则插入到链表结尾或插入树中
V oldVal = null;
synchronized (f) {
// 再次检测第一个元素是否有变化,若是有变化则进入下一次循环,从头来过
if (tabAt(tab, i) == f) {
// 若是第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
// 那就是桶中的元素使用的是链表方式存储
if (fh >= 0) {
// 桶中元素个数赋值为1
binCount = 1;
// 遍历整个桶,每次结束binCount加1
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 若是找到了这个元素,则赋值了新值(onlyIfAbsent=false)
// 并退出循环
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 若是到链表尾部尚未找到元素
// 就把它插入到链表结尾并退出循环
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 若是第一个元素是树节点
Node<K,V> p;
// 桶中元素个数赋值为2
binCount = 2;
// 调用红黑树的插入方法插入元素
// 若是成功插入则返回null
// 不然返回寻找到的节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 若是找到了这个元素,则赋值了新值(onlyIfAbsent=false)
// 并退出循环
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 若是binCount不为0,说明成功插入了元素或者寻找到了元素
if (binCount != 0) {
// 若是链表元素个数达到了8,则尝试树化
// 由于上面把元素插入到树中时,binCount只赋值了2,并无计算整个树中元素的个数
// 因此不会重复树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 若是要插入的元素已经存在,则返回旧值
if (oldVal != null)
return oldVal;
// 退出外层大循环,流程结束
break;
}
}
}
// 成功插入元素,元素个数加1(是否要扩容在这个里面)
addCount(1L, binCount);
// 成功插入元素返回null
return null;
}
复制代码
总体流程跟HashMap比较相似,大体是如下几步:
(1)若是桶数组未初始化,则初始化;
(2)若是待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;
(3)若是正在扩容,则当前线程一块儿加入到扩容的过程当中;
(4)若是待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
(5)若是当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
(6)若是当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
(7)若是元素存在,则返回旧值;
(8)若是元素不存在,整个Map的元素个数加1,并检查是否须要扩容;
添加元素操做中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。
为何使用synchronized而不是ReentrantLock?
由于synchronized已经获得了极大地优化,在特定状况下并不比ReentrantLock差。
未完待续~~
如今文章没办法留言了,若是有任何建议或者意见,欢迎你们在公众号后台给我留言,谢谢~
欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。