这几天一直在看Java并发相关的知识,又学习了不少新的知识。看见大佬们一个个更博速度如此之快,可想知识渊博如滔滔江水。这一周就学习一些并发知识,搭建了一个ftp服务器,不知道小可爱们这周学习了那些知识呢?微信公众号【JustKeepCoding】
抛出正题:ConcurrentHashMap是如何实现线程安全
,这里面又隐含那些玄机呢?java
上一节讲述了Hashtable这个结构虽然线程安全
,可是效率不高
,就是由于他的每一个操做都使用了synchronized
同步块。由于synchronized同步块的线程若是拿到了锁,就会阻塞其余线程,阻塞线程会使操做系统从用户态转为阻塞态,从而加大性能消耗。node
因此在高并发的场景下,使用ConcurrentHashMap是最合适不过的了。那么问题来了,为何都喜欢使用ConcurrentHashMap这个结构,它究竟是怎么实现线程安全和并发度的呢?程序员
ConcurrentHashMap是面试必问的知识点,里面涵盖的知识也比较多,为此咱们仍是经过源码分析再切入到Doug Lea
(做者)使用的其余技术。面试
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { static final int DEFAULT_INITIAL_CAPACITY = 16;//默认容量,segments的个数 static final float DEFAULT_LOAD_FACTOR = 0.75f;//负载因子 static final int DEFAULT_CONCURRENCY_LEVEL = 16;// static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量,仍是左移30位 static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//最小的segment容量为2 final Segment<K,V>[] segments;//重要组成 }
采用分段锁
技术,将ConcurrentHashMap的内部分红多个Segment段,每一个Segment能够编程
static final class Segment<K,V> extends ReentrantLock implements Serializable { static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table;//相似于HashMap中的Entry transient int modCount;//修改次数 transient int threshold;//阈值 final float loadFactor;//加载因子 }
segment内部是一个HashEntry<K,V>[] table
,跟JDK1.7版本的HashMap中的Entry相似,都是数组加链表的形式
。可是这里多了一个volatile关键字修饰,这样作到底有什么好处呢?数组
要弄清楚volatile关键字的前因后果须要具有Java内存模型、CPU缓存模型等知识。那么首先从并发编程的三个重要特性讲起:原子性、有序性和可见性。缓存
原子性就是指在一次操做或屡次操做中,要么全部的操做同时成功,那么同时失败。这个在上一篇MySQL的基础讲到过,好比我向淘宝买牙刷,我支出20,那么淘宝就要收到20。要么我转不出来,淘宝也收不到转。要么我支出成功,淘宝也收到成功。不可能存在,我支出但淘宝收不到的操做。安全
有序性就是指代码执行过程当中的前后顺序,因为Java在编译期间的优化,致使代码执行顺序未必是顺序执行,由于JVM可能会指令重排序。服务器
可见性是指当一个线程对一个变量进行修改,另外一个线程能够当即看到修改的最新值。这就相似于MySQL的读取未提交的事务隔离级别,防止读取脏数据。微信
可见性
的呢?咱们假设一个场景,当一个读线程自身有个Init_value的值,每次都须要跟当前值比较,若是值被修改了就打印输出,没修改就不输出。当一个修改线程,每次都会修改当前的值,修改会会进行sleep睡眠10毫秒,用于等待读线程输出,即便其交替输出。(由于代码太长就不贴了)
你猜结果是怎样的?
结果是修改线程一直输出,读线程在控制台没有任何打印。这是由于,共享数据开始是缓存到CPU的cache快里,即读线程的本地内存中。即便修改线程修改数据,读线程也是从本地取,而本地内存值没有被修改,因此不会输出。那么使用volatile关键字,就能够强制读线程从内存刷新到本地线程内存(CPU的cache中)。
volatile关键字语义:
因此回到HashEntry这里,采用volatile关键字就是为了并发环境下保证HashEntry的有序性和可见性。
从源码能够看到Segment继承ReentrantLock,ReentrantLock 又是一个很重要的知识点,咱们都知道能够用互斥同步
来保证线程的安全性,而最基本的同步互斥手段就是synchronized
和java.util.concurrent
(J.U.C)包下的ReentrantLock 来实现同步。在基本用法上,ReentrantLock 和synchronized很类似,但在JDK1.5以前ReentrantLock 的功能是比较丰富的,在JDK1.8中运用普遍。
互斥同步最主要的问题就是线程阻塞和唤醒带来的性能问题,所以这种同步也称阻塞同步。从处理方式来说,互斥同步是一种悲观的并发策略,老是认为只要不去作正确的的同步措施(列入加锁),那确定会出现问题,不管共享数据是否出现竞争,都须要加锁。(须要线程挂起
)
基于冲突检测的乐观并发策略就是先进行操做,若是没有其余线程争用共享资源,那么操做就成功了。若是共享数据有争用,出现冲突,那就再采起其余的不就措施。(不须要线程挂起
)
public V put(K key, V value) {//定位到segment Segment<K,V> s; if (value == null) throw new NullPointerException();//若是为NULL则会出现异常 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); return s.put(key, hash, value, false);//再从HashEntry中插入 }
/** **segment数组中的put操做,相似于HashMap中的put **/ final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//自旋锁策略 V oldValue;//保存旧值 try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash;//计算segment[i]中的table的具体位置,这些就相似hashmap的操做了 HashEntry<K,V> first = entryAt(tab, index);//寻找table其中一个头结点 for (HashEntry<K,V> e = first;;) {//遍历链表 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {//存在值相同的结点 oldValue = e.value;//直接覆盖 if (!onlyIfAbsent) {//状态存在 e.value = value; ++modCount;//修改次数加一 } break; } e = e.next;//更换下一个节点 } else { if (node != null)//证实存在值 node.setNext(first);//头插法 else node = new HashEntry<K,V>(hash, key, value, first);//空节点则直接复制进去 int c = count + 1;// if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
/** **扫描HashEntry节点去获取锁(自旋锁) **/ private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash);//获取插入值当前链表头结点 HashEntry<K,V> e = first; HashEntry<K,V> node = null;//新的节点 int retries = -1; // 尝试获取锁的次数,初始为-1 while (!tryLock()) {//没有获取到锁的话 HashEntry<K,V> f; // to recheck first below if (retries < 0) {//进入第一种状况 if (e == null) {//头结点为空,建立新的节点 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null);//为了少作一点事,之后直接使用 retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) {//第二种状况,超过自旋次数加以阻塞 lock();//获取不到锁,则阻塞等待 break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { //由于当前没有获取锁,可能头结点被更另外一个线程更改,须要判断是不是以前的头结点。若是不是则从新循环判断 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
tryLock()
会尝试获取锁,获取不到就返回false,获取的到就返回true,作其余事,而且不会阻塞。Lock()
方法若是获取不到锁就会阻塞。
流程图:
/** **扩容操做,不须要rehash,直接使用以前的hash **/ private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table;//旧的hash表 int oldCapacity = oldTable.length;//以前的hash表长 int newCapacity = oldCapacity << 1;//扩容两倍 threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];//扩容后的hash表 int sizeMask = newCapacity - 1;//为了计算数组下标 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e;//每次记录最后一串index相同的节点,从新赋值 int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) {//遍历节点,记录最后index值相同的一串节点 int k = last.hash & sizeMask; if (k != lastIdx) {//最后的节点的index不跟数组的index值相同时 lastIdx = k;//更新到最后节点的index lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//将最后一串index值相同的从新移到新的数组下标里 V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // 对新增的节点进行获取index node.setNext(newTable[nodeIndex]);//头插法 newTable[nodeIndex] = node; table = newTable; }
get 就比较简单啦,它是将 Key
经过 Hash
以后定位到具体的 Segment
,再经过一次 Hash 定位到具体的元素上。
即便多个线程对HashEntry修改,有volatile
修饰的HashEntry,每次都会get到最新值。
为何线程安全呢,就是由于在1.7版本它的每一个方法都会加锁,put采用自旋锁不会使线程阻塞(操做状态切换会消耗性能),从而性能比hashtable好不少。并且它的每一个HashEntry[i]都是被volatile
修饰,能够保证线程操做的可见性,即每次不会脏读,即便其余线程修改了值,都会强制刷新到本地内存
。
那为何并发度高呢?
由于是对单个segment[i]进行加锁,意思就是segment若是有16个,那么能够同时有16个线程修改
并且仍是线程安全的。相对于Hashtable的锁,是锁定整个Hashtable对象,那么多个线程访问就须要被阻塞。
1.8版本取消了1.7中的分段锁策略,使用了CAS + synchronized
来保证安全性。这跟1.8版本的HashMap也很类似,引入了红黑树,在链表节点数大于8的时候会转换成红黑树。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量 private static final int DEFAULT_CAPACITY = 16;//默认容量 private static final float LOAD_FACTOR = 0.75f;//负载因子 static final int TREEIFY_THRESHOLD = 8;//链表节点数大于8 转红黑树 static final int UNTREEIFY_THRESHOLD = 6;//删除红黑树节点小于6转链表 transient volatile Node<K,V>[] table;//节点桶,volatile修饰 private transient volatile Node<K,V>[] nextTable;//下一张表 }
final V putVal(K key, V value, boolean onlyIfAbsent) {//put中调用putVal if (key == null || value == null) throw new NullPointerException();//插入的值不能为空 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) {//遍历表节点 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();//初始化表 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//获取table[i]的头结点是空 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//CAS操做,写入node break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED)//当此时另外一个线程正在扩容 tab = helpTransfer(tab, f);//当前线程帮助扩容,增长效率 else {//进入table节点,判断当前节点是链表节点仍是红黑树节点 V oldVal = null; synchronized (f) {//对表头结点获取锁资源 if (tabAt(tab, i) == f) {//从新判断一下是否被修改,被修改返回上级从新获取 if (fh >= 0) {//链表节点 binCount = 1; for (Node<K,V> e = f;; ++binCount) {//遍历链表节点 K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {//相同直接覆盖 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) {//尾插法 插入节点 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) {//红黑树节点 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD)//链表节点大于8 treeifyBin(tab, i);//转红黑树 if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount);//统计节点数 return null; }
TreeBin
保证了即便红黑树自平衡,也不会释放锁资源。由于若是使用HashMap的红黑树结构的话,当插入节点会自平衡使头结点更换,而头结点是获取到锁了,若是头结点更换,其余线程此时就会获取到该节点的锁,形成插入的异常。
put流程:
2.计算出hash值,遍历表结构,进入下面三种状况
CAS
插入(key,value)
,进入2helpTransfer
,进入2synchronized
获锁资源,判断是链表节点仍是红黑树节点,插入乐观锁策略须要CAS指令来保证进行,就是实现乐观锁
的一种方式,CAS操做是由sun.misc.Unsafe类中的compareAndSwapInt()
和compareAndSwapLong()
等几个方法包装提供,是一种轻量级锁,J.U.C中不少工具类就是基于CAS实现的,好比CompareAndSet
和getAndIncrement()
等都使用了Unsafe类
的CAS操做。
CAS操做流程就是线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其余线程修改则写回,若已被修改,则从新执行读取流程。
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {//hash桶存在,且当前的桶不为空 if ((eh = e.hash) == h) {//若是是红黑树,按照红黑树获取值 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0)//按照链表获取 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
总结:
1.8在1.7上作的改动仍是挺大的,ConcurrentHashMap这个数据结构的知识仍是挺多的,好比CAS的操做和理解、1.8中synchronized关键字的优化、1.8中的扩容方法插入方法、红黑树自平衡等等。都是Java程序员面试的必备知识点,这节又写了11000多字,挺辛苦的,因此可以给我点个赞吗?
最后抛出两个问题:
Lastrun
(还记得1.7中扩容的Lastrun吗)