从ConcurrentHashMap的演进看Java多线程核心技术 Java进阶(六)

本文分析了HashMap的实现原理,以及resize可能引发死循环和Fast-fail等线程不安全行为。同时结合源码从数据结构,寻址方式,同步方式,计算size等角度分析了JDK 1.7和JDK 1.8中ConcurrentHashMap的实现原理。java

原创文章,同步首发自做者我的博客,转载请在文章开头处以超连接注明出处。http://www.jasongj.com/java/concurrenthashmap/react

线程不安全的HashMap

众所周知,HashMap是非线程安全的。而HashMap的线程不安全主要体如今resize时的死循环及使用迭代器时的fast-fail上。数组

注:本章的代码均基于JDK 1.7.0_67缓存

HashMap工做原理

HashMap数据结构

经常使用的底层数据结构主要有数组和链表。数组存储区间连续,占用内存较多,寻址容易,插入和删除困难。链表存储区间离散,占用内存较少,寻址困难,插入和删除容易。安全

HashMap要实现的是哈希表的效果,尽可能实现O(1)级别的增删改查。它的具体实现则是同时使用了数组和链表,能够认为最外层是一个数组,数组的每一个元素是一个链表的表头。数据结构

HashMap寻址方式

对于新插入的数据或者待读取的数据,HashMap将Key的哈希值对数组长度取模,结果做为该Entry在数组中的index。在计算机中,取模的代价远高于位操做的代价,所以HashMap要求数组的长度必须为2的N次方。此时将Key的哈希值对2^N-1进行与运算,其效果即与取模等效。HashMap并不要求用户在指定HashMap容量时必须传入一个2的N次方的整数,而是会经过Integer.highestOneBit算出比指定整数小的最大的2^N值,其实现方法以下。多线程

public static int highestOneBit(int i) {
  i |= (i >>  1);
  i |= (i >>  2);
  i |= (i >>  4);
  i |= (i >>  8);
  i |= (i >> 16);
  return i - (i >>> 1);
}

因为Key的哈希值的分布直接决定了全部数据在哈希表上的分布或者说决定了哈希冲突的可能性,所以为防止糟糕的Key的hashCode实现(例如低位都相同,只有高位不相同,与2^N-1取与后的结果都相同),JDK 1.7的HashMap经过以下方法使得最终的哈希值的二进制形式中的1尽可能均匀分布从而尽量减小哈希冲突。并发

int h = hashSeed;
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);

resize死循环

transfer方法

当HashMap的size超过Capacity*loadFactor时,须要对HashMap进行扩容。具体方法是,建立一个新的,长度为原来Capacity两倍的数组,保证新的Capacity仍为2的N次方,从而保证上述寻址方式仍适用。同时须要经过以下transfer方法将原来的全部数据所有从新插入(rehash)到新的数组中。ssh

void transfer(Entry[] newTable, boolean rehash) {
  int newCapacity = newTable.length;
  for (Entry<K,V> e : table) {
    while(null != e) {
      Entry<K,V> next = e.next;
      if (rehash) {
        e.hash = null == e.key ? 0 : hash(e.key);
      }
      int i = indexFor(e.hash, newCapacity);
      e.next = newTable[i];
      newTable[i] = e;
      e = next;
    }
  }
}

该方法并不保证线程安全,并且在多线程并发调用时,可能出现死循环。其执行过程以下。从步骤2可见,转移时链表顺序反转。高并发

  1. 遍历原数组中的元素
  2. 对链表上的每个节点遍历:用next取得要转移那个元素的下一个,将e转移到新数组的头部,使用头插法插入节点
  3. 循环2,直到链表节点所有转移
  4. 循环1,直到全部元素所有转移

单线程rehash

单线程状况下,rehash无问题。下图演示了单线程条件下的rehash过程
HashMap rehash single thread

多线程并发下的rehash

这里假设有两个线程同时执行了put操做并引起了rehash,执行了transfer方法,并假设线程一进入transfer方法并执行完next = e.next后,由于线程调度所分配时间片用完而“暂停”,此时线程二完成了transfer方法的执行。此时状态以下。

HashMap rehash multi thread step 1

接着线程1被唤醒,继续执行第一轮循环的剩余部分

e.next = newTable[1] = null
newTable[1] = e = key(5)
e = next = key(9)

结果以下图所示
HashMap rehash multi thread step 2

接着执行下一轮循环,结果状态图以下所示
HashMap rehash multi thread step 3

继续下一轮循环,结果状态图以下所示
HashMap rehash multi thread step 4

此时循环链表造成,而且key(11)没法加入到线程1的新数组。在下一次访问该链表时会出现死循环。

Fast-fail

产生缘由

在使用迭代器的过程当中若是HashMap被修改,那么ConcurrentModificationException将被抛出,也即Fast-fail策略。

当HashMap的iterator()方法被调用时,会构造并返回一个新的EntryIterator对象,并将EntryIterator的expectedModCount设置为HashMap的modCount(该变量记录了HashMap被修改的次数)。

HashIterator() {
  expectedModCount = modCount;
  if (size > 0) { // advance to first entry
  Entry[] t = table;
  while (index < t.length && (next = t[index++]) == null)
    ;
  }
}

在经过该Iterator的next方法访问下一个Entry时,它会先检查本身的expectedModCount与HashMap的modCount是否相等,若是不相等,说明HashMap被修改,直接抛出ConcurrentModificationException。该Iterator的remove方法也会作相似的检查。该异常的抛出意在提醒用户及早意识到线程安全问题。

线程安全解决方案

单线程条件下,为避免出现ConcurrentModificationException,须要保证只经过HashMap自己或者只经过Iterator去修改数据,不能在Iterator使用结束以前使用HashMap自己的方法修改数据。由于经过Iterator删除数据时,HashMap的modCount和Iterator的expectedModCount都会自增,不影响两者的相等性。若是是增长数据,只能经过HashMap自己的方法完成,此时若是要继续遍历数据,须要从新调用iterator()方法从而从新构造出一个新的Iterator,使得新Iterator的expectedModCount与更新后的HashMap的modCount相等。

多线程条件下,可以使用Collections.synchronizedMap方法构造出一个同步Map,或者直接使用线程安全的ConcurrentHashMap。

Java 7基于分段锁的ConcurrentHashMap

注:本章的代码均基于JDK 1.7.0_67

数据结构

Java 7中的ConcurrentHashMap的底层数据结构仍然是数组和链表。与HashMap不一样的是,ConcurrentHashMap最外层不是一个大的数组,而是一个Segment的数组。每一个Segment包含一个与HashMap数据结构差很少的链表数组。总体数据结构以下图所示。
JAVA 7 ConcurrentHashMap

寻址方式

在读写某个Key时,先取该Key的哈希值。并将哈希值的高N位对Segment个数取模从而获得该Key应该属于哪一个Segment,接着如同操做HashMap同样操做这个Segment。为了保证不一样的值均匀分布到不一样的Segment,须要经过以下方法计算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

一样为了提升取模运算效率,经过以下计算,ssize即为大于concurrencyLevel的最小的2的N次方,同时segmentMask为2^N-1。这一点跟上文中计算数组长度的方法一致。对于某一个Key的哈希值,只须要向右移segmentShift位以取高sshift位,再与segmentMask取与操做便可获得它在Segment数组上的索引。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

同步方式

Segment继承自ReentrantLock,因此咱们能够很方便的对每个Segment上锁。

对于读操做,获取Key所在的Segment时,须要保证可见性(请参考如何保证多线程条件下的可见性)。具体实现上可使用volatile关键字,也可以使用锁。但使用锁开销太大,而使用volatile时每次写操做都会让全部CPU内缓存无效,也有必定开销。ConcurrentHashMap使用以下方法保证可见性,取得最新的Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

获取Segment中的HashEntry时也使用了相似方法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

对于写操做,并不要求同时获取全部Segment的锁,由于那样至关于锁住了整个Map。它会先获取该Key-Value对所在的Segment的锁,获取成功后就能够像操做一个普通的HashMap同样操做该Segment,并保证该Segment的安全性。
同时因为其它Segment的锁并未被获取,所以理论上可支持concurrencyLevel(等于Segment的个数)个线程安全的并发读写。

获取锁时,并不直接使用lock来获取,由于该方法获取锁失败时会挂起(参考可重入锁)。事实上,它使用了自旋锁,若是tryLock获取锁失败,说明锁被其它线程占用,此时经过循环再次以tryLock的方式申请锁。若是在循环过程当中该Key所对应的链表头被修改,则重置retry次数。若是retry次数超过必定值,则使用lock方法申请锁。

这里使用自旋锁是由于自旋锁的效率比较高,可是它消耗CPU资源比较多,所以在自旋次数超过阈值时切换为互斥锁。

size操做

put、remove和get操做只须要关心一个Segment,而size操做须要遍历全部的Segment才能算出整个Map的大小。一个简单的方案是,先锁住全部Sgment,计算完后再解锁。但这样作,在作size操做时,不只没法对Map进行写操做,同时也没法进行读操做,不利于对Map的并行操做。

为更好支持并发操做,ConcurrentHashMap会在不上锁的前提逐个Segment计算3次size,若是某相邻两次计算获取的全部Segment的更新次数(每一个Segment都与HashMap同样经过modCount跟踪本身的修改次数,Segment每修改一次其modCount加一)相等,说明这两次计算过程当中无更新操做,则这两次计算出的总size相等,可直接做为最终结果返回。若是这三次计算过程当中Map有更新,则对全部Segment加锁从新计算Size。该计算方法代码以下

public int size() {
  final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  return overflow ? Integer.MAX_VALUE : size;
}

不一样之处

ConcurrentHashMap与HashMap相比,有如下不一样点

  • ConcurrentHashMap线程安全,而HashMap非线程安全
  • HashMap容许Key和Value为null,而ConcurrentHashMap不容许
  • HashMap不容许经过Iterator遍历的同时经过HashMap修改,而ConcurrentHashMap容许该行为,而且该更新对后续的遍历可见

Java 8基于CAS的ConcurrentHashMap

注:本章的代码均基于JDK 1.8.0_111

数据结构

Java 7为实现并行访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。Java 8为进一步提升并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。同时为了提升哈希碰撞下的寻址性能,Java 8在链表长度超过必定阈值(8)时将链表(寻址时间复杂度为O(N))转换为红黑树(寻址时间复杂度为O(long(N)))。其数据结构以下图所示


JAVA 8 ConcurrentHashMap

寻址方式

Java 8的ConcurrentHashMap一样是经过Key的哈希值与数组长度取模肯定该Key在数组中的索引。一样为了不不太好的Key的hashCode设计,它经过以下方法计算获得Key的最终哈希值。不一样的是,Java 8的ConcurrentHashMap做者认为引入红黑树后,即便哈希冲突比较严重,寻址效率也足够高,因此做者并未在哈希值的计算上作过多设计,只是将Key的hashCode值与其高16位做异或并保证最高位为0(从而保证最终结果为正整数)。

static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

同步方式

对于put操做,若是Key对应的数组元素为null,则经过CAS操做将其设置为当前值。若是Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,而后进行操做。若是该put操做使得当前链表长度超过必定阈值,则将该链表转换为树,从而提升寻址效率。

对于读操做,因为数组被volatile关键字修饰,所以不用担忧数组的可见性问题。同时每一个元素是一个Node实例(Java 7中每一个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变动,无须关心它们被修改后的可见性问题。而其Value及对下一个元素的引用由volatile修饰,可见性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

对于Key对应的数组元素的可见性,由Unsafe的getObjectVolatile方法保证。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size操做

put方法和remove方法都会经过addCount方法维护Map的size。size方法经过sumCount获取由addCount方法维护的Map的size。

Java进阶系列

相关文章
相关标签/搜索