高并发下ConcurrentHashMap究竟是怎么实现线程安全的?

前言


这几天一直在看Java并发相关的知识,又学习了不少新的知识。看见大佬们一个个更博速度如此之快,可想知识渊博如滔滔江水。这一周就学习一些并发知识,搭建了一个ftp服务器,不知道小可爱们这周学习了那些知识呢?微信公众号【JustKeepCoding】

抛出正题:ConcurrentHashMap是如何实现线程安全,这里面又隐含那些玄机呢?java

上一节讲述了Hashtable这个结构虽然线程安全,可是效率不高,就是由于他的每一个操做都使用了synchronized同步块。由于synchronized同步块的线程若是拿到了锁,就会阻塞其余线程,阻塞线程会使操做系统从用户态转为阻塞态,从而加大性能消耗。node

因此在高并发的场景下,使用ConcurrentHashMap是最合适不过的了。那么问题来了,为何都喜欢使用ConcurrentHashMap这个结构,它究竟是怎么实现线程安全和并发度的呢程序员

ConcurrentHashMap的底层究竟是怎么实现的?

ConcurrentHashMap是面试必问的知识点,里面涵盖的知识也比较多,为此咱们仍是经过源码分析再切入到Doug Lea(做者)使用的其余技术。面试

JDK 1.7的ConcurrentHashMap用了哪些技术?

1.7函数构造和变量等

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
static final int DEFAULT_INITIAL_CAPACITY = 16;//默认容量,segments的个数
static final float DEFAULT_LOAD_FACTOR = 0.75f;//负载因子
static final int DEFAULT_CONCURRENCY_LEVEL = 16;//
static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量,仍是左移30位
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//最小的segment容量为2
final Segment<K,V>[] segments;//重要组成
        }

采用分段锁技术,将ConcurrentHashMap的内部分红多个Segment段,每一个Segment能够编程

Segment组成

static final class Segment<K,V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;//相似于HashMap中的Entry
transient int modCount;//修改次数
transient int threshold;//阈值
final float loadFactor;//加载因子
}

segment内部是一个HashEntry<K,V>[] table,跟JDK1.7版本的HashMap中的Entry相似,都是数组加链表的形式。可是这里多了一个volatile关键字修饰,这样作到底有什么好处呢?数组

volatile关键字

要弄清楚volatile关键字的前因后果须要具有Java内存模型、CPU缓存模型等知识。那么首先从并发编程的三个重要特性讲起:原子性、有序性和可见性。缓存

  • 原子性

原子性就是指在一次操做或屡次操做中,要么全部的操做同时成功,那么同时失败。这个在上一篇MySQL的基础讲到过,好比我向淘宝买牙刷,我支出20,那么淘宝就要收到20。要么我转不出来,淘宝也收不到转。要么我支出成功,淘宝也收到成功。不可能存在,我支出但淘宝收不到的操做。安全

  • 有序性

有序性就是指代码执行过程当中的前后顺序,因为Java在编译期间的优化,致使代码执行顺序未必是顺序执行,由于JVM可能会指令重排序。服务器

  • 可见性

可见性是指当一个线程对一个变量进行修改,另外一个线程能够当即看到修改的最新值。这就相似于MySQL的读取未提交的事务隔离级别,防止读取脏数据。微信

那么JVM是如何来保证可见性的呢?

  • 经过volatile关键字,对于共享资源的读操做会直接在主内存中进行,对于共享数据的写操做固然是先修改工做内存,可是修改后会主动刷新到主内存中。
  • 经过synchronized关键字,synchronized只能保证同一时刻只有一个线程得到锁,而后执行同步方法,而且还会保证在锁释放以前,会将对变量的修改会主动刷新到主内存。
  • 经过JUC提供的显式锁Lock也能保证,Lock的lock方法可以保证在同一时刻只有一个线程得到锁而后执行同步方法,并保证在锁释放以前(Lock的unlock)将修改的变量刷新到主内存。

咱们假设一个场景,当一个读线程自身有个Init_value的值,每次都须要跟当前值比较,若是值被修改了就打印输出,没修改就不输出。当一个修改线程,每次都会修改当前的值,修改会会进行sleep睡眠10毫秒,用于等待读线程输出,即便其交替输出。(由于代码太长就不贴了)

你猜结果是怎样的?

结果是修改线程一直输出,读线程在控制台没有任何打印。这是由于,共享数据开始是缓存到CPU的cache快里,即读线程的本地内存中。即便修改线程修改数据,读线程也是从本地取,而本地内存值没有被修改,因此不会输出。那么使用volatile关键字,就能够强制读线程从内存刷新到本地线程内存(CPU的cache中)。

volatile关键字语义

  1. 保证可见性
  2. 保证顺序性
  3. 不保证原子性

因此回到HashEntry这里,采用volatile关键字就是为了并发环境下保证HashEntry的有序性可见性

image

ReentrantLock 的使用与理解

从源码能够看到Segment继承ReentrantLock,ReentrantLock 又是一个很重要的知识点,咱们都知道能够用互斥同步来保证线程的安全性,而最基本的同步互斥手段就是synchronizedjava.util.concurrent(J.U.C)包下的ReentrantLock 来实现同步。在基本用法上,ReentrantLock 和synchronized很类似,但在JDK1.5以前ReentrantLock 的功能是比较丰富的,在JDK1.8中运用普遍。

JDK 1.8的ConcurrentHashMap又采用了哪些技术呢?

悲观锁策略(阻塞同步)

互斥同步最主要的问题就是线程阻塞和唤醒带来的性能问题,所以这种同步也称阻塞同步。从处理方式来说,互斥同步是一种悲观的并发策略,老是认为只要不去作正确的的同步措施(列入加锁),那确定会出现问题,不管共享数据是否出现竞争,都须要加锁。(须要线程挂起

乐观锁策略(非阻塞同步)

基于冲突检测的乐观并发策略就是先进行操做,若是没有其余线程争用共享资源,那么操做就成功了。若是共享数据有争用,出现冲突,那就再采起其余的不就措施。(不须要线程挂起

image

put操做

public V put(K key, V value) {//定位到segment
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();//若是为NULL则会出现异常
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null) 
        s = ensureSegment(j);
    return s.put(key, hash, value, false);//再从HashEntry中插入
}
/**
**segment数组中的put操做,相似于HashMap中的put
**/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);//自旋锁策略
            V oldValue;//保存旧值
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//计算segment[i]中的table的具体位置,这些就相似hashmap的操做了
                HashEntry<K,V> first = entryAt(tab, index);//寻找table其中一个头结点
                for (HashEntry<K,V> e = first;;) {//遍历链表
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {//存在值相同的结点
                            oldValue = e.value;//直接覆盖
                            if (!onlyIfAbsent) {//状态存在
                                e.value = value;
                                ++modCount;//修改次数加一
                            }
                            break;
                        }
                        e = e.next;//更换下一个节点
                    }
                    else {
                        if (node != null)//证实存在值
                            node.setNext(first);//头插法
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);//空节点则直接复制进去
                        int c = count + 1;//
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

自旋锁

/**
**扫描HashEntry节点去获取锁(自旋锁)
**/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);//获取插入值当前链表头结点
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;//新的节点
            int retries = -1; // 尝试获取锁的次数,初始为-1
            while (!tryLock()) {//没有获取到锁的话
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {//进入第一种状况
                    if (e == null) {//头结点为空,建立新的节点
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);//为了少作一点事,之后直接使用
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {//第二种状况,超过自旋次数加以阻塞
                    lock();//获取不到锁,则阻塞等待
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                         //由于当前没有获取锁,可能头结点被更另外一个线程更改,须要判断是不是以前的头结点。若是不是则从新循环判断
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

tryLock()会尝试获取锁,获取不到就返回false,获取的到就返回true,作其余事,而且不会阻塞。
Lock()方法若是获取不到锁就会阻塞。

流程图

image

扩容操做

/**
**扩容操做,不须要rehash,直接使用以前的hash
**/
private void rehash(HashEntry<K,V> node) {
            
            HashEntry<K,V>[] oldTable = table;//旧的hash表
            int oldCapacity = oldTable.length;//以前的hash表长
            int newCapacity = oldCapacity << 1;//扩容两倍
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];//扩容后的hash表
            int sizeMask = newCapacity - 1;//为了计算数组下标
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;//每次记录最后一串index相同的节点,从新赋值
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {//遍历节点,记录最后index值相同的一串节点
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {//最后的节点的index不跟数组的index值相同时
                                lastIdx = k;//更新到最后节点的index
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {//将最后一串index值相同的从新移到新的数组下标里
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // 对新增的节点进行获取index
            node.setNext(newTable[nodeIndex]);//头插法
            newTable[nodeIndex] = node;
            table = newTable;
        }

image

get操做

get 就比较简单啦,它是将 Key 经过 Hash 以后定位到具体的 Segment ,再经过一次 Hash 定位到具体的元素上。
即便多个线程对HashEntry修改,有volatile修饰的HashEntry,每次都会get到最新值

结论

为何线程安全呢,就是由于在1.7版本它的每一个方法都会加锁,put采用自旋锁不会使线程阻塞(操做状态切换会消耗性能),从而性能比hashtable好不少。并且它的每一个HashEntry[i]都是被volatile修饰,能够保证线程操做的可见性,即每次不会脏读,即便其余线程修改了值,都会强制刷新到本地内存

那为何并发度高呢?
由于是对单个segment[i]进行加锁,意思就是segment若是有16个,那么能够同时有16个线程修改并且仍是线程安全的。相对于Hashtable的锁,是锁定整个Hashtable对象,那么多个线程访问就须要被阻塞。

JDK1.8的ConcurrentHashMap

1.8版本取消了1.7中的分段锁策略,使用了CAS + synchronized来保证安全性。这跟1.8版本的HashMap也很类似,引入了红黑树,在链表节点数大于8的时候会转换成红黑树。

1.8源码构造

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    private static final int MAXIMUM_CAPACITY = 1 << 30;//最大容量
    private static final int DEFAULT_CAPACITY = 16;//默认容量
    private static final float LOAD_FACTOR = 0.75f;//负载因子
    static final int TREEIFY_THRESHOLD = 8;//链表节点数大于8 转红黑树
    static final int UNTREEIFY_THRESHOLD = 6;//删除红黑树节点小于6转链表
    transient volatile Node<K,V>[] table;//节点桶,volatile修饰
    private transient volatile Node<K,V>[] nextTable;//下一张表
}

1.8的put方法

final V putVal(K key, V value, boolean onlyIfAbsent) {//put中调用putVal
        if (key == null || value == null) throw new NullPointerException();//插入的值不能为空
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {//遍历表节点
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//初始化表
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//获取table[i]的头结点是空
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))//CAS操做,写入node
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//当此时另外一个线程正在扩容
                tab = helpTransfer(tab, f);//当前线程帮助扩容,增长效率
            else {//进入table节点,判断当前节点是链表节点仍是红黑树节点
                V oldVal = null;
                synchronized (f) {//对表头结点获取锁资源
                    if (tabAt(tab, i) == f) {//从新判断一下是否被修改,被修改返回上级从新获取
                        if (fh >= 0) {//链表节点
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//遍历链表节点
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {//相同直接覆盖
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {//尾插法 插入节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树节点 
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)//链表节点大于8
                        treeifyBin(tab, i);//转红黑树
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//统计节点数
        return null;
    }

TreeBin保证了即便红黑树自平衡,也不会释放锁资源。由于若是使用HashMap的红黑树结构的话,当插入节点会自平衡使头结点更换,而头结点是获取到锁了,若是头结点更换,其余线程此时就会获取到该节点的锁,形成插入的异常。
image

put流程

  • 1.先判断key和value是否为空,为空抛出异常
  • 2.计算出hash值,遍历表结构,进入下面三种状况

    • 是否须要初始化表,空则初始化,再进入2
    • 获取table[i]的头节点,空则用CAS插入(key,value),进入2
    • 看是否当前有线程正在扩容,有则帮忙扩容helpTransfer,进入2
    • synchronized获锁资源,判断是链表节点仍是红黑树节点,插入
  • 3.统计节点数
  • 返回空

比较并替换(Compare-and-swap,简称为CAS)

乐观锁策略须要CAS指令来保证进行,就是实现乐观锁的一种方式,CAS操做是由sun.misc.Unsafe类中的compareAndSwapInt()compareAndSwapLong()等几个方法包装提供,是一种轻量级锁,J.U.C中不少工具类就是基于CAS实现的,好比CompareAndSetgetAndIncrement()等都使用了Unsafe类的CAS操做。
CAS操做流程就是线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其余线程修改则写回,若已被修改,则从新执行读取流程。

1.8 get操做

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {//hash桶存在,且当前的桶不为空
            if ((eh = e.hash) == h) {//若是是红黑树,按照红黑树获取值
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)//按照链表获取
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

总结
1.8在1.7上作的改动仍是挺大的,ConcurrentHashMap这个数据结构的知识仍是挺多的,好比CAS的操做和理解、1.8中synchronized关键字的优化、1.8中的扩容方法插入方法红黑树自平衡等等。都是Java程序员面试的必备知识点,这节又写了11000多字,挺辛苦的,因此可以给我点个赞吗?

最后抛出两个问题

  • 1.8中的ConcurrentHashMap为何没有使用Lastrun(还记得1.7中扩容的Lastrun吗)
  • unsafe类你属于吗?Java程序中最牛逼的类,为何最牛逼呢?

你的点赞就是我最大的努力!关注个人公众号【JustKeepCoding】,每周两篇为你推送不同的知识!

公众号

相关文章
相关标签/搜索