ConcurrentHashMap 用法上与 HashMap 差异不大,但 ConcurrentHashMap 是线程安全的,能够在多线程环境中使用。这篇文章主要会说明 ConcurrentHashMap 专有的一些特色,与 HashMap 相似部分将再也不赘述。java
本文基于 JDK1.8数组
ConcurrentHashMap 的代码复杂度高了很多,用到了不少的成员变量和常量,先认识一下(HashMap 已经存在的变量或常量就再也不赘述)。安全
// 默认并发度,同时容许多少个线程访问
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 扩容时每一个线程扩容时至少要迁移的桶的数量,最低不能少于 16
private static final int MIN_TRANSFER_STRIDE = 16;
// 辅助变量,没啥用
private static int RESIZE_STAMP_BITS = 16;
// 可用于扩容的最大线程数,但通常确定到不了这个数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 会用来计算一个标志位,实际上也没什么用
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 下面三个常量是几个特殊的哈希值
// MOVED:表示桶正在经被迁移
// TREEBIN:表示桶正在进行树化
// RESERVED:表示节点运行 computeIfAbsent 等方法
static final int MOVED = -1;
static final int TREEBIN = -2;
static final int RESERVED = -3;
// 用于计算 key 的 hash 值
static final int HASH_BITS = 0x7fffffff;
// CPU 的数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
复制代码
这些变量基本都使用了 volatile 关键字,那是由于这些变量的再并发环境中必须都保持可见性。微信
// 桶数组,与 HashMap 基本一致,也是延迟加载,不过这里使用了 volatile 关键字
transient volatile Node<K,V>[] table;
// 桶数组,用于扩容
private transient volatile Node<K,V>[] nextTable;
// 记录全部的元素的个数,相似于 HashMap 的 size
private transient volatile long baseCount;
// 初始化和扩容的标志位
// 默认值:0
// 初始化前:初始化容量大小
// 正在初始化:-1
// 扩容前:触发扩容操做的元素个数,至关于 HashMap 的 threshold
// 正在扩容:-(1 + 参与扩容的线程数量)
private transient volatile int sizeCtl;
// 扩容的时候须要对桶内的元素进行迁移,这个变量用来记录桶的下标,表示迁移的进度,下面会详细介绍这个变量
private transient volatile int transferIndex;
// 更新 counterCells 时使用的自旋锁
private transient volatile int cellsBusy;
// 计数用,用于计算还没来的及更新到 baseCount 中的变化
private transient volatile CounterCell[] counterCells;
复制代码
与 HashMap 相比,ConcurrentHashMap 是线程安全的。容许多个线程并发的访问容器的不一样部分来减小线程间的竞争。这个容器设计出来不是为了替代 HashMap,而是为了在知足多线程环境下的需求,它有两个设计目标:多线程
总的来讲就是 ConcurrentHashMap 既要能支持高并发,也要有高性能。具体实现也通过了屡次变化,特别是在 JDK1.8,几乎进行了重写,底层的存储机制也彻底不一致。JDK 1.7 和 JDK1.8 底层存储的差别:并发
// JDK1.7
final Segment<K,V>[] segments;
transient volatile HashEntry<K,V>[] table; // 每个分段锁都会有一个 table
复制代码
// JDK1.8
transient volatile Node<K,V>[] table;
复制代码
在 JDK1.8 中,并发的粒度更细一些,能够认为 table 的长度就是并发数,而以前的版本中,Segment 的数量是并发度。dom
由于使用了 CAS,因此在 ConcurrentHashMap 中存在大量的自旋操做,自旋操做其实就是一个死循环,等到完成操做时就会经过 break 跳出循环。ide
ConcurrentHashMap 的 hash 函数与 HashMap 的相差不大,不过除了与自身进行 XOR(异或) 操做,还会与 HASH_BITES
进行与运算:函数
// ConcurrentHashMap.spread()
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
复制代码
HASH_BITS
的二进制表示是:高并发
01111111 11111111 11111111 11111111
复制代码
在 JDK1.8 之前,ConcurrentHashMap 主要使用分段锁的机制来实现,在 JDK 1.8 及之后,主要使用了 CAS(sun.misc.Unsafe) + synchronized 来实现。CAS 是一种无锁的并发技术,以高效率著称,CAS 须要硬件的支持,现在的 CPU 都支持这一特性。
但 ConcurrentHashMap 并无实现本身的 CAS,而是直接使用了 sun.misc.Unsafe
(最新的 JDK 中已经换成 jdk.internal.misc.Unsafe)。
ConcurrentHashMap 利用 CAS 实现了了如下三个原子方法来访问桶的第一个元素:
// 获取桶的某个位置,任何状况下可使用
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
// 插入桶的第一个键值对,能够在并发环境下,任何状况下可使用
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 把键值对插入到桶中,只在有锁的的区域使用
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
复制代码
桶的第一个元素有特殊的意义,在 ConcurrentHashMap 中一般被用做桶的锁
CAS 除了用来访问桶以外,在用在其余须要并发更新变量的地方。好比更新 sizeCtl 变量:
// ConcurrentHashMap.initTable()
// 将容器的状态设置为正在扩容
U.compareAndSetInt(this, SIZECTL, sc, -1)
复制代码
synchronized 给人的印象是很慢,很臃肿,其实这是一个误解,synchronized 底层通过不断的优化,目前性能已经与可重入锁至关。并且 synchronized 使用简单,也不会形成是死锁的状况,因此通常状况下能用 synchronized 就别用锁了,除非知足不了需求再考虑用锁。
在 ConcurrentHashMap 中 synchronized 使用时粒度都比较小,被 synchronized 包裹的代码不是不少,因此仍是能够保持高性能。这也是 ConcurrentHashMap 与 Hashtable 的最大区别。Hashtable 也是使用 synchronized 来保证线程安全,可是 synchronized 都是在方法级别使用,这样就会让整个容器的并发级别很低。
扩容是一个很慢的操做,能够事先预估好大小,能够减小扩容的次数。扩容机制与 HashMap 有些不一样,由于 ConcurrentHashMap 能够并发访问,因此在扩容时写操做的线程都不能继续,可是这些线程也能够被利用起来,参与到扩容操做中。
对容器的扩容分为两种状况:
初始化和扩容这两个过程不是独立存在的,经过下面这个图来看清总体流程时如何进行的:
实例化时会肯定table大小,可是不初始化table,以及肯定下一次扩容的临界点,若是构造函数传入的是另外一个 Map,调用 tryPresize 来扩容。
在首次插入元素时,会初始化 table(延迟加载),调用 initTable() 进行初始化。
若是不是首次插入元素,判断是否正在扩容,若是是,则中止操做(除了 get() 操做),参与扩容流程。扩容完成后,经过自旋再次进行操做(插入或者更新),插入元素时须要检查是否达到树化的条件,若是知足,将链表转成树。插入完成后调用 addCount() 检查容器状态,若是元素大于等于扩容临界点的值,则开始扩容
初始化经过 initTable()
方法来完成。
// ConcurrentHashMap.initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 检查当前桶是否为空,为空则开始初始化
while ((tab = table) == null || tab.length == 0) {
// 发现正在初始化或者在扩容,则什么也不作,进入自旋状态等待表被初始化(或扩容)完成
if ((sc = sizeCtl) < 0)
Thread.yield(); // 放弃 CPU 资源
// 线程开始扩容时会把 sizeCtl 的值置为 -1,让其余线程发现正在进行初始化
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 肯定初始化桶的数量,若是 sizeCtl 大于 0 则使用 sizeCtl 的值,不然使用默认容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 设置扩容阀值,ConcurrentHashMap 中的装载因子仅仅在构造函数中使用
sc = n - (n >>> 2); // 至关于 n * 0.75
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
扩容操做经过下面两个方法来发起:
addcount() 在改变容器元素的方法中被调用,主要就是检查容器当前的状态,判断是否须要扩容,若是须要,就会进行扩容。
// ConcurrentHashMap.addcount()
// 这个方法主要用来给当前容器的数量进行计数顺便检查一下是否须要扩容
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
// 给容器中的元素进行增或者减
// 若是 cs 不为 null(说明有并发状况)或者 baseCount 增减运算失败,
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
// 那么就会经过 cs 来进行计数,
// 若是 cs 是空(还不是并发)或者 (cs 中随机取余一个数组位置为空 或者 cs 这个位置的变量失败)
// 说明经过 cs 来计数也失败了,最后才会调用 fullAddCount 来进行计数
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
// 与 LongAdder 实现一致,能够理解为并发状况下的一个计数器
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 统计当前节点的数量
s = sumCount();
}
// 在增长元素的操做中 check 都会知足这个条件
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 检查扩容条件:
// 1. 是否达到阀值: s >= sizeCtl (上文已经解释了 sizeCtl,sizeCtl 大于 0 时表示下次扩容的临界点)
// 2. 是否能够扩容: tab != null && tab 当前的长度小于 1 << 30
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据当前桶的数量生成一个标志位
int rs = resizeStamp(n);
// 若是正在扩容
if (sc < 0) {
// 检查当前扩容的进展:
// 1. 若是 sc 的低 16 位不等于标识位( sizeCtl 变化了,说明容器状态已经变化),退出
// 2. 若是 sc == 标识位 + 1 (经过下面代码可知,刚开始扩容时, sc = rs + 2,若是 sc = rs + 1,说明已经没有线程在扩容),退出
// 3. 若是 sc == 标识符 + 65535,参与扩容的线程已经达到最大数量,当前线程再也不参与,退出
// 4. 若是 nextTable == null 说明扩容结束(nextTable 在扩容中起中转做用,全部的元素会被限移到 nextTable 中,最后让 tab = nextTable,nextTable == null 来完成扩容),退出
// 5. transferIndex <= 0 说明没有桶还须要迁移了(transferIndex 用于标识当前迁移到哪一个桶了,小于等于 0 说明已经迁移到最后一个桶或者已经迁移完成,迁移的顺序是从最后一个桶开始),退出。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 若是迁移仍是进行,当前线程尝试参与扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 若是当前不在扩容中,则发起一个新的扩容
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
// 统计当前节点的数量
s = sumCount();
}
}
}
复制代码
tryPresize 相比于 addcount 方法相对简单,就是尝试进行扩容:
// ConcurrentHashMap.tryPresize()
private final void tryPresize(int size) {
// 根据 size 计算扩容的容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 判断是否能够进行扩容,若是 sizeCtl <= 0,说明已经在扩容中,那么久不会再进行扩容
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 若是当前容器尚未初始化,则进行初始化,与 initTable 相同
if (tab == null || (n = tab.length) == 0) {
// 当前的扩容阀值与传入的值之间选大的做为此次初始化的大小
n = (sc > c) ? sc : c;
// 进入初始化状态
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 至关于 n * 0.75
}
} finally {
sizeCtl = sc;
}
}
}
// 若是还每达到扩容的阀值或者超过了最大容量,则中止扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 开始进行扩容
int rs = resizeStamp(n);
if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
复制代码
扩容的具体操做是经过 transfer()
方法来完成。
// ConcurrentHashMap.transfer() 该方法用于将元素都迁移到 nextTable 中
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 在迁移元素时,会将桶分段,stride 表示每段的长度,最小值为 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 初始化 nextTable
if (nextTab == null) {
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 这个变量用于记录当前迁移的进度,须要注意的是迁移元素从最后一个桶开始
transferIndex = n;
}
int nextn = nextTab.length;
// fwd 是一个特殊的 Node,没有 key,也没有 val,hash 值为 MOVED,用来标识一个桶已经迁移完毕
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 用来控制迁移的进展,若是为 true 说明当前此次循环要干的事情已经完成,能够开始下一个循环
boolean advance = true;
// 标示当前线程全部桶的迁移是否完成
boolean finishing = false;
// 当前线程须要处理的桶的范围 [nextBound, nextindex)
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// transferIndex <= 0 表示已经迁移完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 为当前线程分配桶的区间,当前线程须要将负责这个区间内的桶元素迁移到 nextTable 中
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 判断当前线程是否完成全部桶的迁移
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 若是为 true,说明全部的迁移任务已经完成
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 至关于 n * 0.75
return;
}
// 将参与扩容的线程数量减 1
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 若是不相等说明还有其余的线程在参与扩容,当前线程直接退出就行,这行代码与 tryPresize() 中传入的参数有关,第一个进行扩容的线程传入的 sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,因此若是这是最后一个线程,那么 sc - 2 == resizeStamp(n) << RESIZE_STAMP_SHIFT
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 最后退出的线程须要再检查一遍容器的状态
finishing = advance = true;
i = n;
}
}
// 若是桶中的元素都迁移完成了,则在桶的节点置为 MOVED,表示桶中的元素都迁移完成了
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // 当前桶已经被处理
else {
// 若是上面条件都不知足说明要开始迁移桶中的元素
synchronized (f) {
// 省略搬运元素的代码...
}
}
}
}
复制代码
树化的方式与时机和 HashMap 基本一致。在单个桶的链表元素个数大于 8 时尝试进行树化操做,可是若是此时整个容器的容量少于 64 时,会进行扩容操做,而不是进行树化操做,树化后一样也维护元素的 next 指针来保持链接关系。
树化操做只须要对当前线程所访问的桶进行操做,因此整个过程比扩容要简单不少,是经过 CAS + synchronized 来完成。
// ConcurrentHashMap.treeifyBin()
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
// 若是容器的容量小于 64,则会进行扩容操做,而不是进行树化操做
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// 利用 CAS + synchronized 来把链表转成红黑树
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 把转换好的树放到桶上
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
复制代码
在每一个桶上插入第一个元素的时候使用 CAS 就够了。若是插入的不是桶上的第一个元素,或者是删除或者更新操做,就仍是要用到 synchronized。但不会为每个元素都建立一个锁对象,而是使用桶上的第一个元素做为锁对象。可是仅仅将第一个元素上锁还不够,在更新以前,还须要验证它依然是这个桶的第一个节点,若是不是,就要进行重试。
除了 get() 操做以外,其余的 put()、clear() 等操做,都须要使用 CAS + synchronized 来进行并发访问。get 操做相对简单,直接经过 tabAt
方法获取就行。其余的操做逻辑总体就是同样的。这里主要介绍 putVal()
方法,put()、add()等向容器中增长或者更新元素的方法都是经过 putVal() 方法来完成的。
// ConcurrentHashMap.putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 都不容许为 null
if (key == null || value == null) throw new NullPointerException();
// 作 hash 运算
int hash = spread(key.hashCode());
int binCount = 0;
// 进入自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 若是桶尚未被初始化,则进入初始化(延迟加载)
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 若是这个桶为空,直接使用 CAS 方式来插入元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
// 若是发现正在扩容,则参与进扩容,扩容完成以后,经过自旋的方式再次执行插入操做
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 执行 computeOnlyAbsent 之类的方法
else if (onlyIfAbsent
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
// 使用 CAS + synchronized 机制插入元素
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 对现有的键值对进行更新
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 使用尾插法插入新的元素
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 若是桶上挂的是树,那就按照树的方法来插入节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// 若是发现这个节点正在进行 computeIfAbsent 之类的操做,则抛出异常
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
// 检查桶上节点的数量,若是超过 8 了,则尝试进行树化操做
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 若是是更新节点操做,那么节点数量就没有增长,直接返回便可
if (oldVal != null)
return oldVal;
break;
}
}
}
// 用这个方法来检查是否知足扩容的条件,与上面的 helpTransfer 方法不一样,addCount 是在键值对插入以后再去检查是否须要扩容
addCount(1L, binCount);
return null;
}
复制代码
其余操做如 clear、comput、remove 等会改变容器元素的方法原理都相似,都是经过 CAS + synchronized 来更新元素,最后调用 addcount 方法来更新计数以及判断是否须要扩容。
由于是支持并发的,因此 size 方法的实现也会有点不同,size 实际调用的是 sumCount 方法:
//ConcurrentHashMap.sumCount()
final long sumCount() {
// 统计 cs 和 baseCount 的和
CounterCell[] cs = counterCells;
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
复制代码
在扩容代码中咱们看到了 cs 和 baseCount 其实都是用来的统计容器个数,在并发状况下,会先记录到 cs 最后可是须要注意的是,由于 sumCount 没有加锁,因此最后返回的值也不是彻底准确的。
另外 ConcurrentHashMap 使用的是 fail-safe 的机制,也就是说在迭代的过程当中若是容器中的元素变化,也不会抛出 ConcurrentModificationException 异常。
最后说一下迭代器的问题,KeySetView,ValuesView,EntrySetView 这三个类分别能够迭代键、值、和键值对。具体的实现相对比较简单,并且对于迭代的过程也没有加上并发的控制,因此最后遍历的结果也不必定是准确的。
相关文章
关注微信公众号,聊点其余的