上次写了一文看懂HashMap,谈到HashMap的线程安全问题就不得不聊聊ConcurrentHashMap
,若你不了解HashMap的话能够看看上面那篇文章,ConcurrentHashMap和HashMap在不少地方是相似的,好比底层都是数组+链表+红黑树、数组大小都是2的幂次方等.......一些重复的知识点在这里就不细讲了。这篇文章主要会解决如下几个问题:node
其实ConcurrentHashMap相比HashMap复杂了许多,主要是由于会涉及到许多并发层面的知识点,好比CAS
算法、volitale
以及synchronized
关键字等,本文会粗略介绍一下相关知识点,接下来咱们先聊聊HashMap的线程安全问题以及为何要使用ConcurrentHashMap。面试
HashMap在并发环境下主要有如下几个问题:算法
在1.7版本,当扩容后生成新数组,在转移元素的过程当中,使用的是头插法,也就是链表的顺序会翻转,当多个线程执行插入操做时可能会发生死循环。在1.8版本时将头插法改为了尾插法,解决了死循环的问题。segmentfault
当两个线程同时插入元素时可能会发生数据被覆盖的状况数组
先看下源码安全
if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null);
当两个线程同时执行到以上代码时,发现没有发生哈希冲突,因而新建Node节点插入,这时先插入的节点会被后插入的节点覆盖,致使数据丢失。数据结构
给全部方法加synchronized
锁,很是低效,如今已经淘汰。多线程
Collections
包提供的一个方法,会同步整个对象,也不推荐使用架构
尽管没有同步整个Map,可是它仍然是线程安全的,读操做很是快,而写操做则是经过加锁完成的,推荐使用并发
在开始以前须要先介绍下CAS
算法,这也是ConcurrentHashMap实现线程安全的一个关键点。
CAS能够看作是乐观锁
的一种实现方式,Java原子类中的递增操做就经过CAS自旋实现的。
CAS全称 Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的状况下实现多线程之间的变量同步。
CAS底层就是经过Unsafe类中的方法来实现的,以下所示:
unsafe.compareAndSwapInt(this, valueOffset, expect, update)
下面介绍一下各个参数
经过valueOffset能够拿到value的值,当且仅当value的值等于expect时,CAS经过原子方式用新值update来更新value的值,不然不会执行任何操做。
整个“比较+更新”操做封装在
compareAndSwapInt()
中,在JNI里是借助于一个CPU指令完成的,属于原子操做,能够保证多个线程都可以看到同一个变量的修改值。
ConcurrentHashMap的源码中除了普通的CAS操做,还定义了三个原子操做,用于对指定位置的节点进行操做。正是这些原子操做保证了ConcurrentHashMap的线程安全,以下所示:
//获取tab数组的第i个node static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //使用CAS尝试更新table[i] static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //写入table[i] static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
ConcurrentHashMap支持并发的读写。跟1.7版本相比,JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized
和CAS
来操做,虽然源码里面还保留了,也只是为了兼容性的考虑,所以本文主要讲解的是JDK1.8版本的ConcurrentHashMap。
先来介绍一个核心属性sizeCtl
private transient volatile int sizeCtl;
用途:控制table数组的初始化和扩容的操做,不一样的值有不一样的含义
其它属性
transient volatile Node<K,V>[] table;//哈希数组,保存Ndode节点 private transient volatile Node<K,V>[] nextTable;//扩容用的数组,只有在扩容时才不为null private static final int DEFAULT_CAPACITY = 16;//默认大小 private static final float LOAD_FACTOR = 0.75f;//负载因子 static final int MOVED = -1; //表示正在扩容
在上面咱们能够看到volatile
关键字,这里先简单介绍一下该关键字的做用:
在多线程环境下,某个共享变量若是被其中一个线程给修改了,其余线程可以当即知道这个共享变量已经被修改了,当其余线程要读取这个变量的时候,最终会去内存中读取,而不是从本身的工做空间中读取
虚拟机在进行代码编译优化的时候,对于那些改变顺序以后不会对最终变量的值形成影响的代码,是有可能将他们进行重排序的,可是在多线程下可能会引起线程安全问题,使用volatile能够禁止重排序。
注意:volatile关键字没法保证变量的原子性。
在面试中volatile底层实现机制也是常考的一个知识点,因为篇幅有限这里只是简单介绍一下概念,若是对原理感兴趣的同窗能够上网搜索一下相关资料。
ConcurrentHashMap和HashMap都是由数组+链表+红黑树构成,不过有一个不一样的是ConcurrentHashMap的数组中放入的不是TreeNode结点,而是将TreeNode包装起来的TreeBin对象,以下图所示:
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
和HashMap实现差很少,也是用tableSizeFor
方法来确保数组大小为2的幂次方, 能够看出构造函数主要是设定sizeCtl
的值,并未对表进行初始化。当表未初始化的时候,sizeCtl的值其实指定的是表的长度。
tableSizeFor方法用来保证数组为2的幂次方,若是不了解其实现能够参考一文看懂HashMap
在ConcurrentHashMap里table数组第一次初始化是在initTable
里执行的,这点和HashMap有点不一样,简单看下初始化步骤:
sizeCtl < 0
说明有别的线程正在初始化或扩容,自旋等待CAS
去更新sizeCtl
的值sizeCtl
设置为容量阈值(也就是HashMap的threshold)private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //当sizeCtl<0说明有别的线程正在初始化或扩容,自旋等待 if ((sc = sizeCtl) < 0) Thread.yield(); //SIZECTL:表示当前对象的内存偏移量,sc表示指望值,-1表示要替换的值,设定为-1表示要初始化表 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //检查table数组是否已经被初始化 if ((tab = table) == null || tab.length == 0) { //若sc=0则设置默认容量16,不然设置为指定容量大小 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化数组 table = tab = nt; sc = n - (n >>> 2);//n - (n >>> 2) = 0.75n,也就是说sc的值等于threshold } } finally { sizeCtl = sc; } break; } } return tab; }
这里须要注意的一点是在else if块里面须要从新判断一次table是否未初始化,由于在finally块里改变了sizeCtl值,这时候其它线程是可以进入else if块中的,这样就会执行两次初始化操做了。
在介绍get方法以前先来看看ConurrentHashMap如何计算key的hash值,ConcurrentHashMap用了spread函数来求hash值,它与HashMap的hash函数有略微不一样,代码以下:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
除了高16位和低16位或操做以外,最后还和HASH_BITS
相与,其值为0x7fffffff
。它的做用主要是使hash值为正数。在ConcurrentHashMap中,Hash值为负数有特别的意义,如-1表示ForwardingNode结点,-2表示TreeBin结点。
什么是ForwardingNode结点和TreeBin结点?
//只在扩容时出现,实现了扩容时旧表和新表的链接 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null);//MOVED = -1 this.nextTable = tab; } ...... }
ForwardingNode节点是Node节点的子类,hash值固定为-1,只在扩容 transfer的时候出现,当旧数组中所有的节点都迁移到新数组中时,旧数组就在数组中放置一个ForwardingNode。读操做或者迭代读时碰到ForwardingNode时,将操做转发到扩容后的新的table数组上去执行,写操做遇见它时,则尝试帮助扩容。
至于TreeBin节点也是继承自Node,hash值固定为-2,是红黑树的包装结点。(有关红黑树因为篇幅有限这里就不展开讲了)
查询步骤
(n-1)&hash
计算下标)hash = -1
则调用ForwardingNode的find函数转发到nextTable上查找;若 hash = -2
则调用TreeBin的find函数查找元素接下来看看get方法源码
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode());//计算hash值 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) {//判断第一个节点是否就是目标节点 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //若是hash值小于0,有两种状况 //-1是ForwardingNode,则用find函数转发到nextTable上查找 //-2是TreeBin,调用TreeBin的find函数。 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //正常节点则遍历查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
tabAt方法使用volatile来获取数组上的元素,在介绍CA时已经说过了,若是忘记了请翻到上面查看。
从代码也能够看出get方法是不加锁的,这里比较须要注意的一点是hash值为-1的ForwardingNode节点,当读操做碰到ForwardingNode时会调用find方法转发到扩容后的新的table数组上去执行,咱们来看看find方法的实现:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //到新数组上查找元素 Node<K,V> find(int h, Object k) { //使用循环,避免屡次碰到ForwardingNode致使递归过深 outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; //第一个节点就是要找的节点 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { //继续遇见ForwardingNode的状况,这里至关因而递归调用一次本方法 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else//遇见特殊节点,调用其find方法进行查找 return e.find(h, k); } if ((e = e.next) == null)//普通节点直接循环遍历链表 return null; } } } }
扩容时当数组为空或完成扩容后将ForwardingNode结点插入数组槽中,而find操做在新表中进行查询。巧妙利用ForwardingNode将旧表和新表链接起来,保证了其余线程扩容时也能对结点正常访问。
仍是同样先来看看插入过程:
tabAt
方法读取节点,若没有发生hash冲突则用CAS
插入节点ForwardingNode
节点,说明在扩容,调用hlepTransfer
帮助扩容synchronized
对节点加锁,以后遍历链表,若元素已存在则更新旧值,不然在尾部插入节点TreeBin
节点则调用putTreeVal
方法插入addCount
方法对节点数量+1,在该方法里面也会判断是否须要扩容put方法源码:
public V put(K key, V value) { return putVal(key, value, false);//false表示若已存在则进行覆盖 } final V putVal(K key, V value, boolean onlyIfAbsent) { //不容许key或value为null,这点和HashMap不一样 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0;//记录当前链表或红黑树长度 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //判断是否须要初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //用tabAt方法读取table[i],若没有发生hash冲突则用CAS插入节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } //若hash值为-1,则为ForwardingNode结点,说明在扩容,调用hlepTransfer帮助扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //若是是普通链表结点或树结点,使用synchronized对节点加锁 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) {//二次检查,相似于单例模式的双重检查 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //若元素已存在则更新value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //插入节点到链表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }//若是是TreeBin节点则调用putTreeVal方法插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //若链表长度太长,则调用treeifyBin将链表转换为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount);//节点数量+1,检查是否须要进行扩容 return null; }
插入过程也不是很难,不少地方和HashMap差很少,能够照着注释多看几遍就懂了。
在put源码最后会调用addCount
方法来修改元素个数,在addCount方法里面又会检查是否须要调用transfer
方法来扩容,ConcurrentHashMap的并发扩容是设计的一个精髓,因为博主能力有限至今还未搞懂,如有兴趣的能够参考这篇:阿里十年架构师,教你深度分析ConcurrentHashMap原理分析
源码不是很难,主要是replaceNode方法的几个参数搞懂就行,这里再也不细讲。
public V remove(Object key) { return replaceNode(key, null, null); } //cv是指望值,当待删除节点的值等于cv时,用value替换旧值 final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //table未初始化 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; //符合更新value或者删除节点的条件 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; //更新value if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else //CAS设置节点 setTabAt(tab, i, e.next); } break; } //当前节点不是目标节点,继续遍历下一个节点 pred = e; if ((e = e.next) == null) //到达链表尾部,依旧没有找到,跳出循环 break; } } //红黑树 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { //若是删除了节点,更新size if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
有关ConcurrentHashMap到这里暂时先告一段落了,博主当初还觉得三天能够写完,到今天已经第五天了没想到还没搞定,其中还有一些经典的设计好比transfer扩容方法因为能力有限没列出来,不过本篇对于面试来讲应该已经够用了,若是有哪里写得不对欢迎各位指出来!