ConcurrentHashMap
顾名思义就是同步的HashMap,也就是线程安全的HashMap,因此本篇介绍的ConcurrentHashMap和HashMap有着很重要的关系,因此建议以前没有了解过HashMap的能够先看看这篇关于HashMap的原理分析《HashMap从认识到源码分析》,本篇继续以JDK1.8
版本的源码进行分析,最后在介绍完ConcurrentHashMap以后会对ConcurrentHashMap、Hashtable和HashMap作一个比较和总结。html
咱们先看一下ConcurrentHashMap
实现了哪些接口、继承了哪些类,对ConcurrentHashMap
有一个总体认知。 算法
ConcurrentHashMap
继承
AbstractMap
接口,这个和
HashMap
同样,而后实现了
ConcurrentMap
接口,这个和
HashMap
不同,
HashMap
是直接实现的
Map
接口。
ConcurrentHashMap
的结构,这里列举几个重要的成员变量
table
、
nextTable
、
baseCount
、
sizeCtl
、
transferIndex
、
cellsBusy
Map.Entry
接口threshold
的做用,在不一样的地方有不一样的值也有不一样的用途
-1
表明正在初始化-N
表明有N-1
个线程正在进行扩容操做0
表明hash表尚未被初始化ConcurrentHashMap
和HashMap
同样都是采用拉链法处理哈希冲突,且都为了防止单链表过长影响查询效率,因此当链表长度超过某一个值时候将用红黑树代替链表进行存储,采用了数组+链表+红黑树的结构
HashMap
和ConcurrentHashMap
仍是很类似的,只是ConcurrentHashMap
在某些操做上采用了CAS
+ synchronized
来保证并发状况下的安全。ConcurrentHashMap
处理并发状况下的线程安全问题,这不得不提到Hashtable
,由于Hashtable
也是线程安全的,那ConcurrentHashMap
和Hashtable
有什么区别或者有什么高明之处嘛?以致于官方都推荐使用ConcurrentHashMap
来代替Hashtable
Hashtable
采用对象锁(synchronized修饰对象方法)来保证线程安全,也就是一个Hashtable
对象只有一把锁,若是线程1拿了对象A的锁进行有synchronized
修饰的put
方法,其余线程是没法操做对象A中有synchronized
修饰的方法的(如get
方法、remove
方法等),竞争激烈因此效率低下。而ConcurrentHashMap
采用CAS
+ synchronized
来保证并发安全性,且synchronized
关键字不是用在方法上而是用在了具体的对象上,实现了更小粒度的锁,等会源码分析的时候在细说这个SUN大师们的鬼斧神工Hashtable
采用的是数组 + 链表,当链表过长会影响查询效率,而ConcurrentHashMap
采用数组 + 链表 + 红黑树,当链表长度超过某一个值,则将链表转成红黑树,提升查询效率。ConcurrentHashMap
的构造函数有5个,从数量上看就和HashMap
、Hashtable
(4个)的不一样,多出的那个构造函数是public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
,即除了传入容量大小、负载因子以外还多传入了一个整型的concurrencyLevel
,这个整型是咱们预先估计的并发量,好比咱们估计并发是30
,那么就能够传入30
。
其余的4个构造函数的参数和HashMap
的同样,而具体的初始化过程却又不相同,HashMap
和Hashtable
传入的容量大小和负载因子都是为了计算出初始阈值(threshold),而ConcurrentHashMap
传入的容量大小和负载因子是为了计算出sizeCtl用于初始化table
,这个sizeCtl即table数组的大小,不一样的构造函数计算sizeCtl方法都不同。数组
//无参构造函数,什么也不作,table的初始化放在了第一次插入数据时,默认容量大小是16和HashMap的同样,默认sizeCtl为0
public ConcurrentHashMap() {
}
//传入容量大小的构造函数。
public ConcurrentHashMap(int initialCapacity) {
//若是传入的容量大小小于0 则抛出异常。
if (initialCapacity < 0)
throw new IllegalArgumentException();
//若是传入的容量大小大于容许的最大容量值 则cap取容许的容量最大值 不然cap =
//((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
//即若是传入的容量大小是12 则 cap = 32(12 + (12 >>> 1) + 1=19
//向上取2的幂次方即32),这里为啥必定要是2的幂次方,缘由和HashMap的threshold同样,都是为
//了让位运算和取模运算的结果同样。
//MAXIMUM_CAPACITY即容许的最大容量值 为2^30。
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//tableSizeFor这个函数即实现了将一个整数取2的幂次方。
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,表明进行扩容的容量大小。
this.sizeCtl = cap;
}
//包含指定Map的构造函数。
//置sizeCtl为默认容量大小 即16。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//传入容量大小和负载因子的构造函数。
//默认并发数大小是1。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//传入容量大小、负载因子和并发数大小的构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//若是传入的容量大小 小于 传入的并发数大小,
//则容量大小取并发数大小,这样作的缘由是确保每个Node只会分配给一个线程,而一个线程则
//能够分配到多个Node,好比当容量大小为64,并发数大
//小为16时,则每一个线程分配到4个Node。
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//size = 1.0 + (long)initialCapacity / loadFactor 这里计算方法和上面的构造函数不同。
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//若是size大于容许的最大容量值则 sizeCtl = 容许的最大容量值 不然 sizeCtl =
//size取2的幂次方。
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
复制代码
null
,为null
抛出异常。spread()
方法计算key的hashCode()得到哈希地址,这个HashMap类似。synchronized
,也就是容许多个线程去尝试初始化table,可是在初始化函数里面使用了CAS
保证只有一个线程去执行初始化过程。null
,则直接调用实现CAS
原子性操做的casTabAt()
方法将节点插入到table中,若是插入成功则完成put操做,结束返回。插入失败(被别的线程抢先插入了)则继续往下执行。helpTransfer()
方法协助扩容。treeifyBin()
方法将链表转成红黑树,以避免链表过长影响效率。addCount()
方法,做用是将ConcurrentHashMap的键值对数量+1,还有另外一个做用是检查ConcurrentHashMap是否须要扩容。public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不容许键值为null,这点与线程安全的Hashtable保持一致,和HashMap不一样。
if (key == null || value == null) throw new NullPointerException();
//取键key的hashCode()和HashMap、Hashtable都同样,而后再执行spread()方法计算获得哈希地
//址,这个spread()方法和HashMap的hash()方法同样,都是将hashCode()作无符号右移16位,只不
//过spread()加多了 &0x7fffffff,让结果为正数。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//若是table数组为空或者长度为0(未初始化),则调用initTable()初始化table,初始化函数
//下面介绍。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//调用实现了CAS原子性操做的tabAt方法
//tabAt方法的第一个参数是Node数组的引用,第二个参数在Node数组的下标,实现的是在Nod
//e数组中查找指定下标的Node,若是找到则返回该Node节点(链表头节点),不然返回null,
//这里的i = (n - 1)&hash便是计算待插入的节点在table的下标,即table容量-1的结果和哈
//希地址作与运算,和HashMap的算法同样。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//若是该下标上并无节点(即链表为空),则直接调用实现了CAS原子性操做的
//casTable()方法,
//casTable()方法的第一个参数是Node数组的引用,第二个参数是待操做的下标,第三
//个参数是指望值,第四个参数是待操做的Node节点,实现的是将Node数组下标为参数二
//的节点替换成参数四的节点,若是指望值和实际值不符返回false,不然参数四的节点成
//功替换上去,返回ture,即插入成功。注意这里:若是插入成功了则跳出for循环,插入
//失败的话(其余线程抢先插入了),那么会执行到下面的代码。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//若是该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
//table须要扩容,值得注意的是ConcurrentHashMap初始化和扩容不是用同一个方法,而
//HashMap和Hashtable都是用同一个方法,当前线程会去协助扩容,扩容过程后面介绍。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//若是该下标上的节点既不是空也不是须要扩容,则表示这个链表能够插入值,将进入到链表
//中,将新节点插入或者覆盖旧值。
else {
V oldVal = null;
//经过关键字synchroized对该下标上的节点加锁(至关于锁住锁住
//该下标上的链表),其余下标上的节点并无加锁,因此其余线程
//能够安全的得到其余下标上的链表进行操做,也正是由于这个所
//以提升了ConcurrentHashMap的效率,提升了并发度。
synchronized (f) {
if (tabAt(tab, i) == f) {
//若是该下标上的节点的哈希地址大于等于0,则表示这是
//个链表。
if (fh >= 0) {
binCount = 1;
//遍历链表。
for (Node<K,V> e = f;; ++binCount) {
K ek;
//若是哈希地址、键key相同 或者 键key不为空
//且键key相同,则表示存在键key和待插入的键
//key相同,则执行更新值value的操做。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//若是找到了链表的最后一个节点都没有找到相
//同键Key的,则是插入操做,将插入的键值新建
//个节点而且添加到链表尾部,这个和HashMap一
//样都是插入到尾部。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//若是该下标上的节点的哈希地址小于0 且为树节点
//则将带插入键值新增到红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//若是插入的结果不为null,则表示为替换
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
key,value)) != null){
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//判断链表的长度是否大于等于链表的阈值(8),大于则将链表转成
//红黑树,提升效率。这点和HashMap同样。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
复制代码
spread()
方法计算key的hashCode()得到哈希地址。public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//运用键key的hashCode()计算出哈希地址
int h = spread(key.hashCode());
//若是table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
//则表明这个bucket存在,进入到bucket中查找,
//其中(n - 1) & h为计算出键key相对应的数组下标的算法。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//若是哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//若是bucket头节点的哈希地址小于0,则表明bucket为红黑树,在红黑树中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//若是bucket头节点的哈希地址不小于0,则表明bucket为链表,遍历链表,在链表中查找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
复制代码
spread()
方法计算出键key的哈希地址。null
。addCount
方法,将当前table存储的键值对数量-1。public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
//计算须要移除的键key的哈希地址。
int hash = spread(key.hashCode());
//遍历table。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//table为空,或者键key所在的bucket为空,则跳出循环返回。
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//若是当前table正在扩容,则调用helpTransfer方法,去协助扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//将键key所在的bucket加锁。
synchronized (f) {
if (tabAt(tab, i) == f) {
//bucket头节点的哈希地址大于等于0,为链表。
if (fh >= 0) {
validated = true;
//遍历链表。
for (Node<K,V> e = f, pred = null;;) {
K ek;
//找到哈希地址、键key相同的节点,进行移除。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//若是bucket的头节点小于0,即为红黑树。
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到节点,而且移除。
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//调用addCount方法,将当前ConcurrentHashMap存储的键值对数量-1。
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
复制代码
table
的初始化主要由initTable()方法实现的,initTable()方法初始化一个合适大小的数组,而后设置sizeCtl。 咱们知道ConcurrentHashMap
是线程安全的,即支持多线程的,那么一开始不少个线程同时执行put()
方法,而table
又没初始化,那么就会不少个线程会去执行initTable()方法尝试初始化table,而put
方法和initTable
方法都是没有加锁的(synchronize),那SUN的大师们是怎么保证线程安全的呢? 经过源码能够看得出,table的初始化只能由一个线程完成,可是每一个线程均可以争抢去初始化table。安全
null
,即需不须要首次初始化,若是某个线程进到这个方法后,其余线程已经将table初始化好了,那么该线程结束该方法返回。null
,进入到while循环,若是sizeCtl
小于0(其余线程正在对table初始化),那么该线程调用Thread.yield()
挂起该线程,让出CPU时间,该线程也从运行态转成就绪态,等该线程从就绪态转成运行态的时候,别的线程已经table初始化好了,那么该线程结束while循环,结束初始化方法返回。若是从就绪态转成运行态后,table仍然为null
,则继续while循环。null
且sizeCtl
不小于0,则调用实现CAS
原子性操做的compareAndSwap()
方法将sizeCtl设置成-1,告诉别的线程我正在初始化table,这样别的线程没法对table进行初始化。若是设置成功,则再次判断table是否为空,不为空则初始化table,容量大小为默认的容量大小(16),或者为sizeCtl。其中sizeCtl的初始化是在构造函数中进行的,sizeCtl = ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方)private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//若是table为null或者长度为0, //则一直循环试图初始化table(若是某一时刻别的线程将table初始化好了,那table不为null,该//线程就结束while循环)。
while ((tab = table) == null || tab.length == 0) {
//若是sizeCtl小于0,
//即有其余线程正在初始化或者扩容,执行Thread.yield()将当前线程挂起,让出CPU时间,
//该线程从运行态转成就绪态。
//若是该线程从就绪态转成运行态了,此时table可能已被别的线程初始化完成,table不为
//null,该线程结束while循环。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//若是此时sizeCtl不小于0,即没有别的线程在作table初始化和扩容操做,
//那么该线程就会调用Unsafe的CAS操做compareAndSwapInt尝试将sizeCtl的值修改为
//-1(sizeCtl=-1表示table正在初始化,别的线程若是也进入了initTable方法则会执行
//Thread.yield()将它的线程挂起 让出CPU时间),
//若是compareAndSwapInt将sizeCtl=-1设置成功 则进入if里面,不然继续while循环。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次确认当前table为null即还未初始化,这个判断不能少。
if ((tab = table) == null || tab.length == 0) {
//若是sc(sizeCtl)大于0,则n=sc,不然n=默认的容量大
小16,
//这里的sc=sizeCtl=0,即若是在构造函数没有指定容量
大小,
//不然使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//建立指定容量的Node数组(table)。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,n - (n >>> 2) = 0.75n当ConcurrentHashMap储存的键值对数量
//大于这个阈值,就会发生扩容。
//这里的0.75至关于HashMap的默认负载因子,能够发现HashMap、Hashtable若是
//使用传入了负载因子的构造函数初始化的话,那么每次扩容,新阈值都是=新容
//量 * 负载因子,而ConcurrentHashMap无论使用的哪种构造函数初始化,
//新阈值都是=新容量 * 0.75。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
transfer()
方法为ConcurrentHashMap
扩容操做的核心方法。因为ConcurrentHashMap
支持多线程扩容,并且也没有进行加锁,因此实现会变得有点儿复杂。整个扩容操做分为两步:bash
//协助扩容方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//若是当前table不为null 且 f为ForwardingNode节点 且 //新的table即nextTable存在的状况下才能协助扩容,该方法的做用是让线程参与扩容的复制。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//更新sizeCtl的值,+1,表明新增一个线程参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
//扩容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//根据服务器CPU数量来决定每一个线程负责的bucket数量,避免由于扩容的线程过多反而影响性能。
//若是CPU数量为1,则stride=1,不然将须要迁移的bucket数量(table大小)除以CPU数量,平分给
//各个线程,可是若是每一个线程负责的bucket数量小于限制的最小是(16)的话,则强制给每一个线程
//分配16个bucket数。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//若是nextTable还未初始化,则初始化nextTable,这个初始化和iniTable初始化同样,只能由
//一个线程完成。
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//分配任务和控制当前线程的任务进度,这部分是transfer()的核心逻辑,描述了如何与其余线
//程协同工做。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移过程(对当前指向的bucket),这部分的逻辑与HashMap相似,拿旧数组的容量当作一
//个掩码,而后与节点的hash进行与操做,能够得出该节点的新增有效位,若是新增有效位为
//0就放入一个链表A,若是为1就放入另外一个链表B,链表A在新数组中的位置不变(跟在旧数
//组的索引一致),链表B在新数组中的位置为原索引加上旧数组容量。
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
addCount()
作的工做是更新table的size,也就是table存储的键值对数量,在使用put()
和remove()
方法的时候都会在执行成功以后调用addCount()
来更新table的size。对于ConcurrentHashMap
来讲,它到底有储存有多少个键值对,谁也不知道,由于他是支持并发的,储存的数量无时无刻都在变化着,因此说ConcurrentHashMap
也只是统计一个大概的值,为了统计出这个值也是大费周章才统计出来的。 服务器
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//若是计算盒子不是空,或者修改baseCount的值+x失败,则放弃对baseCount的修改。
//这里的大概意思就是首先尝试直接修改baseCount,达到计数的目的,若是修改baseCount失败(
//多个线程同时修改,则失败)
//则使用CounterCell数组来达到计数的目的。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//若是计数盒子是空的 或者随机取余一个数组为空 或者修改这个槽位的变量失败,
//即表示出现了并发,则执行fullAddCount()方法进行死循环插入,同时返回,
//不然表明修改这个槽位的变量成功了,继续往下执行,不进入if。
//每一个线程都会经过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell,
//而后进行计数。ThreadLocalRandom是一个线程私有的伪随机数生成器,
//每一个线程的probe都是不一样的。CounterCell数组的大小永远是一个2的n次方,初始容量
//为2,每次扩容的新容量都是以前容量乘以二,处于性能考虑,它的最大容量上限是机器
//的CPU数量,因此说CounterCell数组的碰撞冲突是很严重的。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//并发过大,使用CAS修改CounterCell失败时候执行fullAddCount,
fullAddCount(x, uncontended);
return;
}
//若是上面对盒子的赋值成功,且check<=1,则直接返回,不然调用sumConut()方法计算
if (check <= 1)
return;
s = sumCount();
}
//若是check>=0,则检查是否须要扩容。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
复制代码
size
和mappingCount
方法都是用来统计table的size的,这二者不一样的地方在size
返回的是一个int
类型,便可以表示size的范围是[-2^31,2^31-1],超过这个范围就返回int能表示的最大值,mappingCount
返回的是一个long
类型,便可以表示size的范围是[-2^63,2^63-1]。
这两个方法都是调用的sumCount()方法实现统计。数据结构
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
复制代码
\ | HashMap | Hashtable | ConcurrentHashMap |
---|---|---|---|
是否线程安全 | 否 | 是 | 是 |
线程安全采用的方式 | 采用synchronized 类锁,效率低 |
采用CAS + synchronized ,锁住的只有当前操做的bucket,不影响其余线程对其余bucket的操做,效率高 |
|
数据结构 | 数组+链表+红黑树(链表长度超过8则转红黑树) | 数组+链表 | 数组+链表+红黑树(链表长度超过8则转红黑树) |
是否容许null 键值 |
是 | 否 | 否 |
哈希地址算法 | (key的hashCode)^(key的hashCode无符号右移16位) | key的hashCode | ( (key的hashCode)^(key的hashCode无符号右移16位) )&0x7fffffff |
定位算法 | 哈希地址&(容量大小-1) | (哈希地址&0x7fffffff)%容量大小 | 哈希地址&(容量大小-1) |
扩容算法 | 当键值对数量大于阈值,则容量扩容到原来的2倍 | 当键值对数量大于等于阈值,则容量扩容到原来的2倍+1 | 当键值对数量大于等于sizeCtl,单线程建立新哈希表,多线程复制bucket到新哈希表,容量扩容到原来的2倍 |
链表插入 | 将新节点插入到链表尾部 | 将新节点插入到链表头部 | 将新节点插入到链表尾部 |
继承的类 | 继承abstractMap 抽象类 |
继承Dictionary 抽象类 |
继承abstractMap 抽象类 |
实现的接口 | 实现Map 接口 |
实现Map 接口 |
实现ConcurrentMap 接口 |
默认容量大小 | 16 | 11 | 16 |
默认负载因子 | 0.75 | 0.75 | 0.75 |
统计size方式 | 直接返回成员变量size |
直接返回成员变量count |
遍历CounterCell 数组的值进行累加,最后加上baseCount 的值即为size |
【死磕Java并发】—–J.U.C之Java并发容器:ConcurrentHashMap
Map 你们族的那点事儿 ( 7 ) :ConcurrentHashMap
Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
Java 8 ConcurrentHashMap源码分析多线程
原文地址:https://ddnd.cn/2019/03/10/jdk1-8-concurrenthashmap/并发