在ConcurrentHashmap,已经有一个线程安全的容器HashTable,可是ConcurrentHashMap比HashTable更加高效html
HashTable容器使用synchronized来保证线程安全:
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下,由于在一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法是,会进入阻塞或轮询状态
java
ConcurrentHashMap采用CAS+Synchronized方式,来保证线程安全:
在jdk8,之前ConcurrentHashMap采用**“锁分段技术”**,首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。在jdk8之后,采用CAS+SynChronized方式来保证线程安全。
关于为何用“CAS+Synchronized”代替“ReentrantLick+Segment”,看这篇文章ConcurrentHashMap 1.8为何要使用CAS+Synchronized取代Segment+ReentrantLock?
接下来则是关于插入操做的具体分析node
//put()中调用了putVal,咱们直接对putval进行分析
final V putVal(K key, V value, boolean onlyIfAbsent) {
//从这里能够看出ConcurrentHashMap并不容许 k,v为null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//一个死循环,当插入操做完成后才会跳成循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//第一次插入时,初始化table,initTable()的分析,见后边
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//若是要插入的数组的节点为null,直接进行插入操做,casTabAt()为原子操做,保证了线程安全
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//若是table在扩容,就让当前线程帮助table扩容,提高效率,helpTransfer()中调用了transfer方法,这两个方法的分析见后边
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//传入的 onlyIfAbsent=false,因此不会走这个部分
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
//数组的插入节点不为null,则要向后查找
else {
V oldVal = null;
//同步代码块,f为这条bins或则tree的头节点,为何锁对象为这个头节点?
synchronized (f) {
//tabAt()也为原子操做,为何加锁以后还要采用原子操做?由于判断的是当前这个节点,也就是这个锁对象,是否已经改变
//再次判断头节点是否为先前得出的节点,由于以前操做没有加锁,可能这个节点已经被改变
if (tabAt(tab, i) == f) {
//fh=f.hash>0,也就是说没有进行扩容操做
if (fh >= 0) {
//链表的长度
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//在这条bin中有相同的node,则进行更新
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//不断向后查找,若是没有节点和插入节点相同,则将节点插入到末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//f,为TreeBin,实际上就表明了,这个节点为TreeNode,即为红黑树,
//ConcurrenthashMap数组中放入的实际是TreeBin,treeBin完成了对红黑树的包装
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
//最后在判断一次链表长度是否超过阈值,超过则进行转换位红黑树的操做
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//进行计数,并检查是否须要扩容,或者正在扩容时,帮助进行扩容
addCount(1L, binCount);
return null;
}
复制代码
/**
* Initializes table, using the size recorded in sizeCtl.
官方注释中的sizeCtl很是重要
private transient volatile int sizeCtl;
负数表明正在进行初始化或扩容操做
-1表明正在初始化
-N 表示有N-1个线程正在进行扩容操做
正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点相似于扩容阈值的概念
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sc<0,表明正在进行初始化,将线程挂起
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//CAS操做,将sizeCtl置为-1,表明抢到了锁,进行init
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//DEFAULT_CAPACITY=16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//对sizeCtl也进行增大,n-n>>>2等价于 n*o.75
sc = n - (n >>> 2);
}
} finally {
//对sizeCtl更新
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
//helpTransfer调用了transfer方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
/* *ForwardingNode:官方注释 A node inserted at head of bins during transfer operations. * 当当前节点完成转移操做后就会将当前节点设为ForwardingNode,来表示当前节点已经完成转移操做 * nextTab:ForwardingNode中的一个变量,新的table,ForwardingNode会在transfer中进行初始化,所以nextTab会在那个时候赋值 */
//节点正在进行转移操做
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//resizeStamp(),产生一个标志位
int rs = resizeStamp(tab.length);//实际上高16位为0,只有低16位有效
//若是 nextTab 没有被并发修改 且 tab 也没有被并发修改
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/* *sc>>>RESIZE_STAMP_SHIFT(16)!=rs,sc左移16位不等于rs,标识符发生了变化,从这里能够看出sc即sizeCtl的高16位标识符 *sc==rs+1,表示扩容已经结束了,为何表示扩容结束了?具体分析见后面sizeCtl的分析 *sc=rs+MAX_RESIZERS(65535),表示达到最大线程数 *transferIndex,转移的下标,表示正在调整下标 */
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//调用transfer增长一个线程为其扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
复制代码
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride,能够理解为步长,当数组长度太长时,就会将数组分段,一个线程处理一段
//这个stride即为每段的长度
int n = tab.length, stride;
//对数组分段得出stride的大小,MIN_TRANSFER_STRIDE(16),stride最小值为16
//从MIN_TRANSFER_STRIDE的介绍中能够看出,是为了防止将stride设置的过小,就会产生过多线程,进行过分的内存竞争
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab==null,进行扩容操做,为原table2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//从这里能够看出,转移操做是从数组末尾开始的
transferIndex = n;
}
int nextn = nextTab.length;
//初始化fwd,将以前初始化的nextTab传进去
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance标志位表示作完了一个位置的转移操做,能够进行下一个位置的转移操做
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
//将transferIndex赋值给nextIndex,transferIndex<=0,表示原数组的全部位置都有线程进行处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//这里进行,nextIndex的赋值 = nextBound,nextBound=nextIndex-stride为上一次的边界
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//全部的转移操做以及完成
if (finishing) {
nextTable = null;
table = nextTab;
//从新计算sizeCtl
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//采用CAS,更新sc的值,每一个线程完成操做后就会将sc-1,
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//全部的操做已经完成,为何这里表示全部操做以及完成?见后边sc的分析
//简单说一下,在第一个线程进入是, sc=rs<<16+2;每次增长一条线程sc+1,减小一条sc-1,当sc=rs<<16+2时表示全部线程完成操做
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//若是位置 i=null,那么放入刚刚初始化的 ForwardingNode ”空节点“,表明已经完成操做
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//加锁处理转移操做
synchronized (f) {
if (tabAt(tab, i) == f) {
//和hashmap相同,将一个链表分为两个,一个的索引是原来的位置,另外一个是原索引+n;
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//构建两条反序链表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//放在原索引的链表
setTabAt(nextTab, i, ln);
//放在索引为原索引+n的链表
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//若是为treeNode,进行treenode的相关split操做
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//若是扩容以后不为长度小于UNTREEIFY_THRESHOLD,则转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//将,两条链表赋值到新数组
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
sizectl的分析
1. rs=resizeStamp(table.length);
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Integer.numberOfLeadingZeros()返回最高位之前0的个数,例如16 00**010000,返回27
RESIZE_STAMP_BITS=16
因此咱们能够得出, rs实际是一个16有效值的数字,由于高16位全为0;
2. addCount()
部分源码
···
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//正在进行转移操做
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//若是能够帮助进行transfer则将sc+1,表明多了一条线程,帮助转移操做
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//若是,没在扩容,或第一次进行扩容时,sc=re<<16+2,即sc的初始值
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
3.结论
从这部分源码中咱们能够看出sc,和rs的关系,
即sc 高16位表示length生成的标识符,低16位则表示正在帮助扩容的线程数,初始值为2
因此在前边 sc-2=rs<<16,来判断是否已经结束扩容操做
sc=rs+1,当第一个线程结束后,sc-1=rs+2-1=rs+1;也表示扩容已经结束
复制代码
(1)ConcurrentHashMap的get()没有加锁,如何保证读到的数据的正确性?
数组
实际上 volatile V val;
volatile Node<K,V> next;
transient volatile Node<K,V>[] table;
对于Node节点的val,next,以及table都用volatile修饰
可是table用volatile修饰是保证数组在扩容时的可见性,而不能保证对数组中元素的可见性,
由于table[i]保证的是 table[i]这个对应的地址的可见性
而真正保证读操做正确的是,Node节点中的val,next被volatile修饰
复制代码
(2)在进行扩容时,锁住的对象为每一个hash桶的头节点,保证了多线程对数组的并发分段修改安全