面试:为了进阿里,死磕了ConcurrentHashMap源码和面试题(一)

该系列文章收录在公众号【Ccww技术博客】,原创技术文章早于博客推出

前言

在平时中集合使用中,当涉及多线程开发时,若是使用HashMap可能会致使死锁问题,使用HashTable效率又不高。而ConcurrentHashMap在保持同步同时并发效率比较高,ConcurrentHashmap是最好的选择,那面试中也会被经常问到,那可能的问题是:java

  • ConcurrentHashMap的实现原理node

    • ConcurrentHashMap1.7和1.8的区别?
    • ConcurrentHashMap使用什么技术来保证线程安全
  • ConcurrentHashMap的put()方法面试

    • ConcurrentHashmap 不支持 key 或者 value 为 null 的缘由?
    • put()方法如何实现线程安全呢?
  • ConcurrentHashMap扩容机制
  • ConcurrentHashMap的get方法是否要加锁,为何?
  • 其余问题算法

    • 为何使用ConcurrentHashMap
    • ConcurrentHashMap迭代器是强一致性仍是弱一致性?HashMap呢?
    • JDK1.7与JDK1.8中ConcurrentHashMap的区别

ConcurrentHashMap的实现原理

ConcurrentHashMap的出现主要为了解决hashmap在并发环境下不安全,JDK1.8ConcurrentHashMap的设计与实现很是精巧,大量的利用了volatile,CAS等乐观锁技术来减小锁竞争对于性能的影响,ConcurrentHashMap保证线程安全的方案是:segmentfault

  • JDK1.8:synchronized+CAS+HashEntry+红黑树;
  • JDK1.7:ReentrantLock+Segment+HashEntry。

JDK7 ConcurrentHashMap

在JDK1.7中ConcurrentHashMap由Segment(分段锁)数组结构和HashEntry数组组成,且主要经过Segment(分段锁)段技术实现线程安全。数组

Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每一个HashEntry又是一个链表结构,所以在ConcurrentHashMap查询一个元素的过程须要进行两次Hash操做,以下所示:安全

  • 第一次Hash定位到Segment,
  • 第二次Hash定位到元素所在的链表的头部

正是经过Segment分段锁技术,将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问,可以实现真正的并发访问。数据结构

这样结构会使Hash的过程要比普通的HashMap要长,影响性能,但写操做的时候能够只对元素所在的Segment进行加锁便可,不会影响到其余的Segment,ConcurrentHashMap提高了并发能力。多线程

JDK8 ConcurrentHashMap

在JDK8ConcurrentHashMap内部机构:数组+链表+红黑树,Java 8在链表长度超过必定阈值(8)时将链表(寻址时间复杂度为O(N))转换为红黑树(寻址时间复杂度为O(long(N))),结构基本上与功能和JDK8的HashMap同样,只不过ConcurrentHashMap保证线程安全性。并发

但在JDK1.8中摒弃了Segment分段锁的数据结构,基于CAS操做保证数据的获取以及使用synchronized关键字对相应数据段加锁来实现线程安全,这进一步提升了并发性。(CAS原理详情《面试:为了进阿里,又把并发CAS(Compare and Swap)实现从新精读一遍》)
))

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;  //使用了volatile属性
        volatile Node<K,V> next;  //使用了volatile属性
        ...
    }

ConcurrentHashMap采用Node类做为基本的存储单元,每一个键值对(key-value)都存储在一个Node中,使用了volatile关键字修饰value和next,保证并发的可见性。其中Node子类有:

  • ForwardingNode:扩容节点,只是在扩容阶段使用的节点,主要做为一个标记,在处理并发时起着关键做用,有了ForwardingNodes,也是ConcurrentHashMap有了分段的特性,提升了并发效率
  • TreeBin:TreeNode的代理节点,用于维护TreeNodes,ConcurrentHashMap的红黑树存放的是TreeBin
  • TreeNode:用于树结构中,红黑树的节点(当链表长度大于8时转化为红黑树),此节点不能直接放入桶内,只能是做为红黑树的节点
  • ReservationNode:保留结点

ConcurrentHashMap中查找元素、替换元素和赋值元素都是基于sun.misc.Unsafe原子操做实现多并发的无锁化操做。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }

ConcurrentHashMap的put()方法

ConcurrentHashMap的put的流程步骤

  1. 若是key或者value为null,则抛出空指针异常,和HashMap不一样的是HashMap单线程是容许为Null;

    if (key == null || value == null) throw new NullPointerException();

  2. for的死循环,为了实现CAS的无锁化更新,若是table为null或者table的长度为0,则初始化table,调用initTable()方法(第一次put数据,调用默认参数实现,其中重要的sizeCtl参数)。

    //计算索引的第一步,传入键值的hash值
        int hash = spread(key.hashCode());
        int binCount = 0; //保存当前节点的长度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //初始化Hash表
            ...
        }
  3. 肯定元素在Hash表的索引

    经过hash算法能够将元素分散到哈希桶中。在ConcurrentHashMap中经过以下方法肯定数组索引:

    第一步:

    static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS; 
        }

    第二步:(length-1) & (h ^ (h >>> 16)) & HASH_BITS);

  4. 经过tableAt()方法找到位置tab[i]Node,当Node为null时为没有hash冲突的话,使用casTabAt()方法CAS操做将元素插入到Hash表中,ConcurrentHashmap使用CAS无锁化操做,这样在高并发hash冲突低的状况下,性能良好。

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //利用CAS操做将元素插入到Hash表中
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;  // no lock when adding to empty bin(插入null的节点,无需加锁)
                }
  5. 当f不为null时,说明发生了hash冲突,当f.hash == MOVED==-1 时,说明ConcurrentHashmap正在发生resize操做,使用helpTransfer()方法帮助正在进行resize操做。

    else if ((fh = f.hash) == MOVED) //f.hash == -1 
            //hash为-1 说明是一个forwarding nodes节点,代表正在扩容
            tab = helpTransfer(tab, f);
  6. 以上状况都不知足的时,使用synchronized同步块上锁当前节点Node ,并判断有没有线程对数组进行了修改,若是没有则进行:

    • 遍历该链表并统计该链表长度binCount,查找是否有和key相同的节点,若是有则将查找到节点的val值替换为新的value值,并返回旧的value值,不然根据key,value,hash建立新Node并将其放在链表的尾部
    • 若是Node fTreeBin的类型,则使用红黑树的方式进行插入。而后则退出synchronized(f)锁住的代码块
    //当前节点加锁
     synchronized (f) {
     //判断下有没有线程对数组进行了修改
     if (tabAt(tab, i) == f) {
           //若是hash值是大于等于0的说明是链表
            if (fh >= 0) {
                  binCount = 1;
                  for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                       //插入的元素键值的hash值有节点中元素的hash值相同,替换当前元素的值
                          if (e.hash == hash &&
                               ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    //替换当前元素的值
                                    e.val = value;
                                break;
                            }
                         Node<K,V> pred = e;
                          //若是循环到链表结尾还没发现,那么进行插入操做
                         if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                               }
                    }
              }else if (f instanceof TreeBin) { //节点为树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    //替换旧值
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
  7. 执行完synchronized(f)同步代码块以后会先检查binCount,若是大于等于TREEIFY_THRESHOLD = 8则进行treeifyBin操做尝试将该链表转换为红黑树。

    if (binCount != 0) {
                  //若是节点长度大于8,转化为树
                  if (binCount >= TREEIFY_THRESHOLD)
                       treeifyBin(tab, i);
                  if (oldVal != null)
                       return oldVal; 
                   break;
             }
  8. 执行了一个addCount方法,主要用于统计数量以及决定是否须要扩容.

    addCount(1L, binCount);

ConcurrentHashmap 不支持 key 或者 value 为 null 的缘由?

ConcurrentHashmaphashMap不一样的是,concurrentHashMapkeyvalue都不容许为null,

由于concurrenthashmap它们是用于多线程的,并发的 ,若是map.get(key)获得了null,不能判断究竟是映射的value是null,仍是由于没有找到对应的key而为空,

而用于单线程状态的hashmap却能够用containKey(key) 去判断究竟是否包含了这个null。

put()方法如何实现线程安全呢?

  1. 在第一次put数据时,调用initTable()方法
/**  
 * Hash表的初始化和调整大小的控制标志。为负数,Hash表正在初始化或者扩容;  
 * (-1表示正在初始化,-N表示有N-1个线程在进行扩容)  
 * 不然,当表为null时,保存建立时使用的初始化大小或者默认0;  
 * 初始化之后保存下一个调整大小的尺寸。  
 */  
 private transient volatile int sizeCtl;  
     //第一次put,初始化数组  
     private final Node<K,V>[] initTable() {  
         Node<K,V>[] tab; int sc;  
         while ((tab = table) == null || tab.length == 0) {  
             //若是已经有别的线程在初始化了,这里等待一下  
             if ((sc = sizeCtl) < 0)  
             Thread.yield(); // lost initialization race; just spin  
             //-1 表示正在初始化  
             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
             ...  
         } finally {  
            sizeCtl = sc;  
         }  
            break;  
         }  
     }  
     return tab;  
 }

使用sizeCtl参数做为控制标志的做用,当在从插入元素时,才会初始化Hash表。在开始初始化的时候,

  • 首先判断sizeCtl的值,若是sizeCtl < 0,说明有线程在初始化当前线程便放弃初始化操做。不然,将SIZECTL设置为-1Hash表进行初始化
  • 初始化成功之后,将sizeCtl的值设置为当前的容量值
  1. 在不存在hash冲突的时
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
     //利用CAS操做将元素插入到Hash表中  
     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))  
     break;  // no lock when adding to empty bin(插入null的节点,无需加锁)  
 }

(f = tabAt(tab, i = (n - 1) & hash)) == null中使用tabAt原子操做获取数组,并利用casTabAt(tab, i, null, new Node<K,V>(hash, key, value))CAS操做将元素插入到Hash表中

  1. 在存在hash冲突时,先把当前节点使用关键字synchronized加锁,而后再使用tabAt()原子操做判断下有没有线程对数组进行了修改,最后再进行其余操做。

为何要锁住更新操做的代码块?

由于发生了哈希冲突,当前线程正在f所在的链表上进行更新操做,假如此时另一个线程也须要到这个链表上进行更新操做,则须要等待当前线程更新完后再执行

//当前节点加锁  
synchronized (f) {  
     //这里判断下有没有线程对数组进行了修改  
     if (tabAt(tab, i) == f) {  
     ......//do something  
 }
}

因为篇幅过于长,分红两部分来说讲,接下来的内容请看[《面试:为了进阿里,死磕了ConcurrentHashMap源码和面试题(二)》]()

各位看官还能够吗?喜欢的话,动动手指点个💗,点个关注呗!!谢谢支持
欢迎关注公众号【Ccww技术博客】,原创技术文章第一时间推出
相关文章
相关标签/搜索