ConcurrentHashMap是线程安全的HashMap;在并发的状况下使用HashMap可能会致使死循环,在进行put操做时致使CPU利用率接近100%。是由于在多线程会致使HashMap的Entry链表造成环形数据结构,一旦造成环形数据结构,Entry的next结点永远不能为空,就会产生死循环获取Entry。java
在JDk1.8中ConcurrentHashMap采用Node + CAS + Synchronized来保证并发状况下的更新不会出现问题。其底层的数据结构是:数组 + 链表 + 红黑树 的方式来实现的。node
注:点击了解红黑树。git
/**
* 最大容量,32位的Hash值的最高两位用做控制的目的,
* 这个值必须刚好是1<<30(2的30次方),这样分配的java数组
* 在索引范围内(2的整数次幂)。
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* Hash表默认的初始容量。必须是2的整数次幂,
* 最小为1,最大为MAXIMUM_CAPACITY.
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 最大的数组大小(非2次幂)。
* toArray 和 related方法使用。
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 表默认的并发级别,未使用。为与该类的之前版本兼容而定义
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* T表的默认加载因子,在构造函数中重写此值只影响初始表容量。
* 浮点值一般不被使用,
* 当前表中的容量 = 初始化容量 - (初始化容量无符号右移2位)时扩容
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 链表转红黑树阀值,该值必须大于2,而且应该至少为8。
* 以便与树移除中关于收缩后转换回普通Bin的假设相吻合。
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 用于在调整大小操做期间反树化(拆分)bin的bin计数阈值,
* 应该小于TREEIFY_THRESHOLD, 最多为6.
*/
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 用于生成戳记的位的数目,单位为sizeCtl。
* 32位数组必须至少为6.
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* 2^15-1,help resize的最大线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* 32-16=16,sizeCtl中记录size大小的偏移量
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/* forwarding nodes的hash值*/
static final int MOVED = -1;
/* 树根节点的hash值*/
static final int TREEBIN = -2;
/* ReservationNode的hash值*/
static final int RESERVED = -3;
/* 普通节点哈希的可用位*/
static final int HASH_BITS = 0x7fffffff;
复制代码
/**
* 装载Node的数组,做为ConcurrentHashMap的数据容器,
* 采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操做,
* 数组的大小老是为2的幂次方。
*/
transient volatile Node<K,V>[] table;
/**
* 扩容时使用,只有在扩容的时候才为非null
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 控制Table的初始化与扩容。
* 当值为负数时table正在被初始化或扩容
* -1表示正在初始化
* -N则表示当前正有N-1个线程进行扩容操做
* 正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
复制代码
Node是最核心的内部类,它包装了key-value键值对,全部插入ConcurrentHashMap的数据都包装在这里面。Node类实现了Map.Entry<K,V>接口,Node类中包含有属性有key,value以及下一节点的引用,其中value和next属性使用volatile关键字修饰,保证其在多线程下的可见性。不容许调用setValue方法直接改变Node的value域,它增长了find方法辅助map.get()方法。github
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
/** * Node结点的构造方法 */
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/** * Node结点中提供的find方法,在子类中可重写 */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
复制代码
树节点类,另一个核心的数据结构,包含父接点,左连接的结点,右连接的结点,前驱结点的引用,以及结点的颜色(默认红色)。当链表长度过长的时候,会转换为TreeNode在TreeBins中使用。TreeNode是上述Node类的子类。数组
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/** * 经过给定的key从指定的根节点开始(在其子树)查找 * 对应的TreeNode结点,没有返回null * h 表示当前能够的Hash值 * k 要查找的键(key) * kc k的Class对象,该Class应该是实现了Comparable<K>的,不然应该是null */
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
// 判断对应的键是否为null
if (k != null) {
//获取当前结点
TreeNode<K,V> p = this;
do { //循环
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
/** * 当前结点的Hash值大于要查找的Key的Hash值H * 在当前节点的左子树中查找,反之在右子树中 * 进行下一轮循环 */
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
/** * 当前结点的key等于要查找的key, * 或当前结点的key不为null且equals()方法为true * 返回当前的结点 */
return p;
/** * 执行到这里说明 hash比对相同, * 但当前节点的key与要查找的k不相等 */
else if (pl == null)
/** * 左孩子为空,指向当前节点右孩子,继续循环 */
p = pr;
else if (pr == null)
/** * 右孩子为空,指向当前节点左孩子,继续循环 */
p = pl;
/** * 左右孩子都不为空,再次进行比较, * 肯定在左子树仍是右子树中查找 */
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
/** * comparable方法来比较pk和k的大小 * dir小于0,p指向左孩子,不然指向右孩子 */
p = (dir < 0) ? pl : pr;
/** * 没法经过上一步骤肯定是在左/右子树中查找 * 从右子树中递归调用findTreeNode()方法查找 */
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
//在右子树中没有找到,到左子树中查找
p = pl;
} while (p != null);
}
return null;
}
}
复制代码
红黑树结构。该类并不包装key-value键值对,而是TreeNode的列表和它们的根节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。这个类含有读写锁。 这里咱们先看红黑树相关操做的方法。安全
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
/** * 经过结点b构造红黑树,链表转红黑树 */
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
// 将给定的节点指向头结点
this.first = b;
TreeNode<K,V> r = null;
/** * 定义X节点 为 b 结点;next结点也为b结点 * next 节点初始化为头结点,用来控制遍历 */
for (TreeNode<K,V> x = b, next; x != null; x = next) {
//指向下一结点
next = (TreeNode<K,V>)x.next;
//将x结点的链接属性清空
x.left = x.right = null;
if (r == null) {
/** * r 为null 说明是红黑树中没有结点 * x 结点就是红黑树的根结点 * 根结点的父结点为null,颜色为黑色 */
x.parent = null;
x.red = false;
r = x;
}
else {
// 当前结点的关键字
K k = x.key;
// 当前结点的hash值
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
/** * 从红黑树的根结点开始遍历,查找当前结点对应的位置 * dir 控制查找的方向 * ph 记录当前结点的hash值 */
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
/** * 当前节点的hash值大于要插入结点的hash值, * 在当前结点的左子树中查找 */
dir = -1;
else if (ph < h)
/** * 当前节点的hash值小于要插入结点的hash值, * 在当前结点的右子树中查找 */
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
/** * 若是hash值相等,则比较k值,用其Compare, * 若是还相等,则走tieBreakOrder方法 */
dir = tieBreakOrder(k, pk);
// 暂存当前节点
TreeNode<K,V> xp = p;
/** * 根据dir控制查找方向 */
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
// 插入后平衡红黑树的性质
r = balanceInsertion(r, x);
break;
}
}
}
}
//指定红黑树的根结点
this.root = r;
assert checkInvariants(root);
}
/** * 左旋转过程 */
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) {
TreeNode<K,V> r, pp, rl;
/** * 结点P不为null且p的右结点不为null */
if (p != null && (r = p.right) != null) {
if ((rl = p.right = r.left) != null)
/** * p.right = r.left p的右结点为r的左结点 * 而后将其赋值给rl * 当rl 不为空的时候,肯定p与rl的关系: * 父结点与左子结点 */
rl.parent = p;
if ((pp = r.parent = p.parent) == null)
/** * 结合 r = p.right * r.parent = p.parent 将p的右子树连接到 p的父结点 * 若是P的父结点为null,说明当前p结点为红黑树的根结点 * 通过上述r.parent = p.parent 将红黑树的根节点转为r * 根结点为r结点,颜色尾黑色 */
(root = r).red = false;
else if (pp.left == p)
/** * 到这一步说明 p 结点不是红黑树的根结点 * 且p为其父结点的左子树, * 将r替换原来P结点的位置(左旋转) */
pp.left = r;
else
/** * 到这一步说明 p 结点不是红黑树的根结点 * 且p为其父结点的右子树, * 将r替换原来P结点的位置(左旋转) */
pp.right = r;
/** * 上述过程只是完成了将原先以P 结点为红黑树子树 * 的根结点,替换为以P的右结点为根结点的部分 * 即 p.right = r.left 将原先r的左连接替换 * 成 p的右连接的过程。 */
//r的左结点为p
r.left = p;
// p的父结点为r节点
p.parent = r;
}
return root;
}
/** * 右旋转 为上述左旋转的逆过程 */
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) {
TreeNode<K,V> l, pp, lr;
if (p != null && (l = p.left) != null) {
if ((lr = p.left = l.right) != null)
lr.parent = p;
if ((pp = l.parent = p.parent) == null)
(root = l).red = false;
else if (pp.right == p)
pp.right = l;
else
pp.left = l;
l.right = p;
p.parent = l;
}
return root;
}
/** * 红黑树中插入结点后会打破红黑树性质须要平衡 * TreeNode<K,V> root 根结点 * TreeNode<K,V> x 要插入的结点 */
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
//默认插入结点为红色
x.red = true;
for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
// xp为当前节点的父结点
if ((xp = x.parent) == null) {
/** * 当前结点的父结点为空,说明红黑树中只有一个结点 * 当前结点即为根结点,颜色为黑色 */
x.red = false;
return x;
}
/** * 当前结点的父结点(xp)不为null * 父结点为黑色,没有打破红黑树的平衡性(着色可能有问题) * 父结点的的父结点(xpp)为null,红黑树中只有两个节点 * 上述两种状况直接返回root结点 */
else if (!xp.red || (xpp = xp.parent) == null)
return root;
/** * 当前结点的父结点(xp) 为 其父节点(xpp)的左孩子 */
if (xp == (xppl = xpp.left)) {
if ((xppr = xpp.right) != null && xppr.red) {
/** * 当前结点(x)得父结点(xp)的父结点(xpp)的右孩子(xppr) * 不为null 且 颜色为红色(此时颜色的性质不知足) * 变换颜色 */
xppr.red = false; // 将xppr变为黑色
xp.red = false; // 将xp变为黑色
xpp.red = true; // 将xpp变为红色
x = xpp; // 将xpp指向x 继续循环
}
/** * 当前结点的父结点的父结点右孩子为null或颜色为黑色 */
else {
// 若是(当前结点)x为父结点的右孩子
if (x == xp.right) {
//左旋转
root = rotateLeft(root, x = xp);
// 从新指定xpp
xpp = (xp = x.parent) == null ? null : xp.parent;
}
// 若是当前结点的父结点不为null
if (xp != null) {
// 将xp的颜色置为黑色
xp.red = false;
// 父结点的父结点(xpp)不为null
if (xpp != null) {
//将xpp颜色置为红色
xpp.red = true;
// 有旋转
root = rotateRight(root, xpp);
}
}
}
}
/** * 当前结点的父结点(xp) 为 其父节点(xpp)的右孩子 */
else {
//xppl 为当前结点(x)的父结点(xp)的父结点(xpp)的左孩子
if (xppl != null && xppl.red) {
/** * xppl不为null 且是红结点 * xpp * / \ * red xppl xp red * ---> x结点在这一层 * | 变为 * xpp red ---> 变换事后x结位置 * / \ * black xppl xp black * */
xppl.red = false;
xp.red = false;
xpp.red = true;
// 控制循环
x = xpp;
}
else {
//若是左叔叔为空或者是黑色
if (x == xp.left) {
//若是当前节点是个左孩子 右旋转
root = rotateRight(root, x = xp);
//获取爷爷结点
xpp = (xp = x.parent) == null ? null : xp.parent;
}
if (xp != null) {
/** * 父结点不为null 设置父结点为黑色 */
xp.red = false;
if (xpp != null) {
/** * 爷爷结点不为null * 将其置为红色 * 对其进行左旋转 */
xpp.red = true;
root = rotateLeft(root, xpp);
}
}
}
}
}
}
/** * 红黑树中删除节点后会打破红黑树的性质须要平衡 * TreeNode<K,V> root 根结点 * TreeNode<K,V> x 要删除的节点 */
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) {
for (TreeNode<K,V> xp, xpl, xpr;;) {
if (x == null || x == root)
// x 为 null 或者 x 为根结点 无须平衡
return root;
else if ((xp = x.parent) == null) {
/** * xp null * \ * ---> x 结点位置(可为左结点也可为右结点) * 此时x 为红黑树的根结点 */
x.red = false;
return x;
}
else if (x.red) {
/** * xp * \ * ---> x 结点位置(且为red) * 将其变为黑色结点 */
x.red = false;
return root;
}
else if ((xpl = xp.left) == x) {
// x 为其父结点的左结点
if ((xpr = xp.right) != null && xpr.red) {
/** * xp * / \ * x xpr red * / \ * xprl xprr */
xpr.red = false; // 将xpr置为黑色
xp.red = true; // 将xp置为红色
// 左旋转 xp
root = rotateLeft(root, xp);
/** * xpr * / \ * xp xprr * / \ * x xprl */
// 获取 新的xpr
xpr = (xp = x.parent) == null ? null : xp.right;
}
if (xpr == null)
// 控制循环
x = xp;
else {
/** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sl sr */
TreeNode<K,V> sl = xpr.left, sr = xpr.right;
if ((sr == null || !sr.red) &&
(sl == null || !sl.red)) {
/** * 树中xpr(即上图的xprl) 的叶子结点不存在 或为黑色结点 * 树中xpr(上图中的xprl) 置为红色 * 将x 指向 xp 继续循环 */
xpr.red = true;
x = xp;
}
else {
if (sr == null || !sr.red) {
/** * xpr * / \ * xp xprr * / \ * x xprl * / \ * sr null || black */
if (sl != null)
// sl 不为null 将其置为黑色
sl.red = false;
// 树中的xpr(上图xprl)置为红色
xpr.red = true;
// 右旋转
root = rotateRight(root, xpr);
// 从新获取xp的右子树
xpr = (xp = x.parent) == null ?
null : xp.right;
}
if (xpr != null) {
/** * 从新获取的xpr 不为null * xpr 的颜色与 父结点的颜色相同 */
xpr.red = (xp == null) ? false : xp.red;
if ((sr = xpr.right) != null)
// 从新获取的xpr 的右节点不为null,将其置黑
sr.red = false;
}
if (xp != null) {
/** * xp 不为null * 将xp置为红色 * 左旋转xp */
xp.red = false;
root = rotateLeft(root, xp);
}
x = root;
}
}
}
else { // x 为其父结点的右结点
if (xpl != null && xpl.red) {
/** * xp * / \ * xpl x * / \ * */
xpl.red = false;
xp.red = true;
/** * xp xpl * / \ / \ * xpl x ==> xpll xp * / \ / \ / \ * xpll xplr xplr x */
root = rotateRight(root, xp);
/** * 从新获取xpl * xpl * / \ * xpll xp * / \ * xplr x * |__ 新的xpl指向这里 */
xpl = (xp = x.parent) == null ? null : xp.left;
}
if (xpl == null)
// 新的xpl为null x 指向器父结点
x = xp;
else {
// 获取xpl的左结点与右结点
TreeNode<K,V> sl = xpl.left, sr = xpl.right;
if ((sl == null || !sl.red) &&
(sr == null || !sr.red)) {
/** * 左子结点 为空或 为黑色 * 且 * 右子结点 为空或 为黑色 * * 将 xpl 置为红色 */
xpl.red = true;
// 控制循环
x = xp;
}
else {
if (sl == null || !sl.red) {
/** * xp * / \ * xpl x * / \ / \ * sl(null || black) */
if (sr != null)
// 若是sr不为null 设置为黑色
sr.red = false;
//xpl置为红色
xpl.red = true;
//左旋转xpl
root = rotateLeft(root, xpl);
//从新获取xpl
xpl = (xp = x.parent) == null ?
null : xp.left;
}
if (xpl != null) {
// xpl 不为null xpl的颜色与xp的颜色相同
xpl.red = (xp == null) ? false : xp.red;
if ((sl = xpl.left) != null)
sl.red = false;
}
if (xp != null) {
/** * xp不为null * xp为黑色 * 右旋转xp */
xp.red = false;
root = rotateRight(root, xp);
}
x = root;
}
}
}
}
}
}
复制代码
总结:bash
在上一篇文章中,系统的学习了红黑树相关的知识,包括性质,以及为了维护红黑树的性质须要进行相应的左旋转,右旋转,颜色转换等子过程。在学习ConcurrentHashMap以前,要先对红黑树的操做有必定的了解。这篇文章,从ConcurrentHashMap的底层,经过其内部定义的一些常量,以及相关的内部类看起,从新回顾了一下红黑树的操做,以及并发大师的实现方式。在下一篇文章中,将学习ConcurrentHashMap相关的操做以及实现原理。微信
我的微信公众号: 数据结构