java8 ConcurrentHashMap源码解析

Java8较java7的改进:java

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V> table保存数据,采用table数组元素做为锁,从而实现了对每一行数据进行加锁,进一步减小并发冲突的几率。node

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two(2的幂). Accessed directly by iterators.
 */
transient volatile Node<K,V>[] table;

 

改进二:将原先table数组+单向链表的数据结构,变动为table数组+单向链表+红黑树的结构。对于hash表来讲,最核心的能力在于将key hash以后能均匀的分布在数组中。若是hash以后散列的很均匀,那么table数组中的每一个队列长度主要为0或者1。但实际状况并不是老是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,可是在数据量过大或者运气不佳的状况下,仍是会存在一些队列长度过长的状况,若是仍是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);所以,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度能够下降到O(logN),能够改进性能。算法

重要属性:数组

/**

 * 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是全部修改都在CAS中进行,可是sizeCtl为何不设计成LongAdder(jdk8出现的)类型呢?

 * 或者设计成AtomicLong(在高并发的状况下比LongAdder低效),这样就能减小本身操做CAS了。

 *

 * 默认为0,用来控制table的初始化和扩容操做,具体应用在后续会体现出来。

 * -1 表明table正在初始化

 * -N 表示有N-1个线程正在进行扩容操做

 * 其他状况:

 *一、若是table未初始化,表示table须要初始化的大小。

 *二、若是table初始化完成,表示table的容量,默认是table大小的0.75 倍,竟然用这个公式算0.75(n - (n >>> 2))。

 **/

private static final long SIZECTL;

private static final long TRANSFERINDEX;

/**

 * races. Updated via CAS.

 * 记录容器的容量大小,经过CAS更新

 */

private static final long BASECOUNT;

/**

 *  自旋锁 (锁定经过 CAS) 在调整大小和/或建立 CounterCells 时使用。 在CounterCell类更新value中会使用,功能相似显示锁和内置锁,性能更好

 *  在Striped64类也有应用

 */

private static final long CELLSBUSY;

private static final long CELLVALUE;

private static final long ABASE;

private static final int ASHIFT;
/**

 * Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

 * @param <K>

 * @param <V>

 */

static class Node<K,V> implements Entry<K,V> {

    final int hash;

    final K key;

    volatile V val;



..    volatile Node<K,V> next;
}
/**

 * ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。

 * @param <K>

 * @param <V>

 */

static final class ForwardingNode<K,V> extends Node<K,V> {

    final Node<K,V>[] nextTable;
…
}

构造函数:数据结构

/**
 *initialCapacity 初始化容量
 **/
public ConcurrentHashMap(int initialCapacity) 
/**
 *
 *建立与给定map具备相同映射的新map
 **/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) 
/**
 *initialCapacity 初始容量
 *loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容
 **/
public ConcurrentHashMap(int initialCapacity, float loadFactor)
/**
 *initialCapacity 初始容量
 *loadFactor 负载因子
 *concurrencyLevel 预估的并发更新线程数
 **/

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

 

方法:多线程

put:并发

public V put(K key, V value) {
    return putVal(key, value, false);
}


final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());//对hashCode进行再散列,算法为(h ^ (h >>> 16)) & HASH_BITS
    int binCount = 0;
    //这边加了一个循环,就是不断的尝试,由于在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
    //由于若是其余线程正在修改tab,那么尝试就会失败,因此这边要加一个for循环,不断的尝试
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 若是table为空,初始化;不然,根据hash值计算获得数组索引i,若是tab[i]为空,直接新建节点Node便可。注:tab[i]实质为链表或者红黑树的首节点。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 若是tab[i]不为空而且hash值为MOVED(-1),说明该链表正在进行transfer操做,返回扩容完成后的table
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 针对首个节点进行加锁操做,而不是segment,进一步减小线程冲突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 若是在链表中找到值为key的节点e,直接设置e.val = value便可。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;

                            }

                            // 若是没有找到值为key的节点,直接新建Node并加入链表便可。

                            Node<K,V> pred = e;

                            if ((e = e.next) == null) {

                                pred.next = new Node<K,V>(hash, key,

                                                          value, null);

                                break;

                            }

                        }

                    }

                    // 若是首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操做。

                    else if (f instanceof TreeBin) {

                        Node<K,V> p;

                        binCount = 2;

                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {

                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 若是节点数>=8,那么转换链表结构为红黑树结构。
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 计数增长1,有可能触发transfer操做(扩容)。
    addCount(1L, binCount);
    return null;

}

helpTransfer:app

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {

    Node<K,V>[] nextTab; int sc;

    if (tab != null && (f instanceof ForwardingNode) &&

        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {

        int rs = resizeStamp(tab.length);

        while (nextTab == nextTable && table == tab &&

               (sc = sizeCtl) < 0) {

            //下面几种状况和addCount的方法同样,请参考addCount的备注

            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

                sc == rs + MAX_RESIZERS || transferIndex <= 0)

                break;

            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {

                transfer(tab, nextTab);

                break;

            }

        }

        return nextTab;

    }

    return table;

}

tabAt:dom

@SuppressWarnings("unchecked")

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {

    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);

}





/*

 *可是这边为何i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量

 *ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址

 *因此呢,((long)i << ASHIFT) + ABASE就算i最后的地址

 * 那么compareAndSwapObject的做用就算tab[i]和c比较,若是相等就tab[i]=v不然tab[i]=c;

 */

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,

                                    Node<K,V> c, Node<K,V> v) {

    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

}



static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {

    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);

}

addCount:ide

private final void addCount(long x, int check) {

    CounterCell[] as; long b, s;

    //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次进来都baseCount都加1由于x=1

    if ((as = counterCells) != null ||

        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1

        CounterCell a; long v; int m;

        boolean uncontended = true;

        if (as == null || (m = as.length - 1) < 0 ||

            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

            !(uncontended =

              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

            //多线程CAS发生失败的时候执行

            fullAddCount(x, uncontended);//2

            return;

        }

        if (check <= 1)

            return;

        s = sumCount();

    }

    if (check >= 0) {

        Node<K,V>[] tab, nt; int n, sc;

        //当条件知足开始扩容

        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&

               (n = tab.length) < MAXIMUM_CAPACITY) {

            int rs = resizeStamp(n);

            if (sc < 0) {//若是小于0说明已经有线程在进行扩容操做了

                //一下的状况说明已经有在扩容或者多线程进行了扩容,其余线程直接break不要进入扩容操做

                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

                    transferIndex <= 0)

                    break;

                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//若是相等说明扩容已经完成,能够继续扩容

                    transfer(tab, nt);

            }

            //这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数,

            // 这边加上2很巧妙,由于transfer后面对sizeCtl--操做的时候,最多只能减两次就结束

            else if (U.compareAndSwapInt(this, SIZECTL, sc,

                                         (rs << RESIZE_STAMP_SHIFT) + 2))

                transfer(tab, null);

            s = sumCount();

        }

    }

}

上面注释1,每次都会对baseCount 加1,若是并发竞争太大,那么可能致使U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,那么为了提升高并发的时候baseCount可见性失败的问题,又避免一直重试,这样性能会有很大的影响,那么在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的,是AtomicLong的增强版,AtomicLong在高并发场景性能会比LongAdder差。可是LongAdder的空间复杂度会高点。

 

咱们每次进来都对baseCount进行加1当达到必定的容量时,就须要对table进行扩容。扩容方法就是transfer

/**

 * Moves and/or copies the nodes in each bin to new table. See

 * above for explanation.

 */

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

    int n = tab.length, stride;

    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)

        stride = MIN_TRANSFER_STRIDE; // subdivide range

    if (nextTab == null) {            // initiating

        try {

            @SuppressWarnings("unchecked")

            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];

            nextTab = nt;

        } catch (Throwable ex) {      // try to cope with OOME

            sizeCtl = Integer.MAX_VALUE;

            return;

        }

        nextTable = nextTab;

        transferIndex = n;

    }

    int nextn = nextTab.length;

    //构建一个连节点的指针,用于标识位

    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

    boolean advance = true;

    boolean finishing = false; // to ensure sweep before committing nextTab

    //循环的关键变量,判断是否已经扩容完成,完成就return,退出循环

    for (int i = 0, bound = 0;;) {

        Node<K,V> f; int fh;

        //循环的关键i,i--操做保证了倒序遍历数组

        while (advance) {

            int nextIndex, nextBound;

            if (--i >= bound || finishing)

                advance = false;

            else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默认16)

                i = -1;

                advance = false;

            }

            else if (U.compareAndSwapInt

                     (this, TRANSFERINDEX, nextIndex,

                      nextBound = (nextIndex > stride ?

                                   nextIndex - stride : 0))) {

                bound = nextBound;

                i = nextIndex - 1;

                advance = false;

            }

        }

        //i<0说明已经遍历完旧的数组tab;i>=n何时有可能呢?在下面看到i=n,因此目前i最大应该是n吧。

        //i+n>=nextn,nextn=nextTab.length,因此若是知足i+n>=nextn说明已经扩容完成

        if (i < 0 || i >= n || i + n >= nextn) {

            int sc;

            if (finishing) {// a

                nextTable = null;

                table = nextTab;

                sizeCtl = (n << 1) - (n >>> 1);

                return;

            }

            //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操做,参考sizeCtl的注释

            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

                //若是有多个线程进行扩容,那么这个值在第二个线程之后就不会相等,由于sizeCtl已经被减1了,

                // 因此后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)

                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

                    return;

                finishing = advance = true;//finishing和advance保证线程已经扩容完成了能够退出循环

                i = n; // recheck before commit

            }

        }

        else if ((f = tabAt(tab, i)) == null)//若是tab[i]为null,那么就把fwd插入到tab[i],代表这个节点已经处理过了

            advance = casTabAt(tab, i, null, fwd);

        else if ((fh = f.hash) == MOVED)//那么若是f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了

            advance = true; // already processed

        else {

            synchronized (f) {

                if (tabAt(tab, i) == f) {

                    Node<K,V> ln, hn;

                    if (fh >= 0) {

                        int runBit = fh & n;

                        Node<K,V> lastRun = f;

                        //这边还对链表进行遍历,这边的的算法和hashmap的算法又不同了,这班是有点对半拆分的感受

                        //把链表分表拆分为,hash&n等于0和不等于0的,而后分别放在新表的i和i+n位置

                        //次方法同hashmap的resize

                        for (Node<K,V> p = f.next; p != null; p = p.next) {

                            int b = p.hash & n;

                            if (b != runBit) {

                                runBit = b;

                                lastRun = p;

                            }

                        }

                        if (runBit == 0) {

                            ln = lastRun;

                            hn = null;

                        }

                        else {

                            hn = lastRun;

                            ln = null;

                        }

                        for (Node<K,V> p = f; p != lastRun; p = p.next) {

                            int ph = p.hash; K pk = p.key; V pv = p.val;

                            if ((ph & n) == 0)

                                ln = new Node<K,V>(ph, pk, pv, ln);

                            else

                                hn = new Node<K,V>(ph, pk, pv, hn);

                        }

                        setTabAt(nextTab, i, ln);

                        setTabAt(nextTab, i + n, hn);

                        //把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab

                        setTabAt(tab, i, fwd);

                        advance = true;

                    }//下面红黑树基本和链表差很少

                    else if (f instanceof TreeBin) {

                        TreeBin<K,V> t = (TreeBin<K,V>)f;

                        TreeNode<K,V> lo = null, loTail = null;

                        TreeNode<K,V> hi = null, hiTail = null;

                        int lc = 0, hc = 0;

                        for (Node<K,V> e = t.first; e != null; e = e.next) {

                            int h = e.hash;

                            TreeNode<K,V> p = new TreeNode<K,V>

                                (h, e.key, e.val, null, null);

                            if ((h & n) == 0) {

                                if ((p.prev = loTail) == null)

                                    lo = p;

                                else

                                    loTail.next = p;

                                loTail = p;

                                ++lc;

                            }

                            else {

                                if ((p.prev = hiTail) == null)

                                    hi = p;

                                else

                                    hiTail.next = p;

                                hiTail = p;

                                ++hc;

                            }

                        }

                        //判断扩容后是否还须要红黑树结构

                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :

                            (hc != 0) ? new TreeBin<K,V>(lo) : t;

                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :

                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        setTabAt(nextTab, i, ln);

                        setTabAt(nextTab, i + n, hn);

                        setTabAt(tab, i, fwd);

                        advance = true;

                    }

                }

            }

        }

    }

}

注意:若是链表结构中元素超过TREEIFY_THRESHOLD阈值,默认为8个,则把链表转化为红黑树,提升遍历查询效率.接下来咱们看看如何构造树结构,代码以下:

private final void treeifyBin(Node<K,V>[] tab, int index) {

    Node<K,V> b; int n, sc;

    if (tab != null) {

        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)

            tryPresize(n << 1);

        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {

            synchronized (b) {

                if (tabAt(tab, index) == b) {

                    TreeNode<K,V> hd = null, tl = null;

                    for (Node<K,V> e = b; e != null; e = e.next) {

                        TreeNode<K,V> p =

                            new TreeNode<K,V>(e.hash, e.key, e.val,

                                              null, null);

                        if ((p.prev = tl) == null)

                            hd = p;

                        else

                            tl.next = p;

                        tl = p;

                    }

                    setTabAt(tab, index, new TreeBin<K,V>(hd));

                }

            }

        }

    }

}

能够看出,生成树节点的代码块是同步的,进入同步代码块以后,再次验证table中index位置元素是否被修改过。

一、根据table中index位置Node链表,从新生成一个hd为头结点的TreeNode链表。

二、根据hd头结点,生成TreeBin树结构,并把树结构的root节点写到table的index位置的内存中,具体实现以下:

/**

 * Creates bin with initial set of nodes headed by b.

 */

TreeBin(TreeNode<K,V> b) {

    super(TREEBIN, null, null, null);

    this.first = b;

    TreeNode<K,V> r = null;

    for (TreeNode<K,V> x = b, next; x != null; x = next) {

        next = (TreeNode<K,V>)x.next;

        x.left = x.right = null;

        if (r == null) {

            x.parent = null;

            x.red = false;

            r = x;

        }

        else {

            K k = x.key;

            int h = x.hash;

            Class<?> kc = null;

            for (TreeNode<K,V> p = r;;) {

                int dir, ph;

                K pk = p.key;

                if ((ph = p.hash) > h)

                    dir = -1;

                else if (ph < h)

                    dir = 1;

                else if ((kc == null &&

                          (kc = comparableClassFor(k)) == null) ||

                         (dir = compareComparables(kc, k, pk)) == 0)

                    dir = tieBreakOrder(k, pk);

                    TreeNode<K,V> xp = p;

                if ((p = (dir <= 0) ? p.left : p.right) == null) {

                    x.parent = xp;

                    if (dir <= 0)

                        xp.left = x;

                    else

                        xp.right = x;

                    r = balanceInsertion(r, x);

                    break;

                }

            }

        }

    }

    this.root = r;

    assert checkInvariants(root);

}

Get:

public V get(Object key) {

    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

    int h = spread(key.hashCode());

    if ((tab = table) != null && (n = tab.length) > 0 &&

        (e = tabAt(tab, (n - 1) & h)) != null) {

        if ((eh = e.hash) == h) {

            if ((ek = e.key) == key || (ek != null && key.equals(ek)))

                return e.val;

        }

        else if (eh < 0)//若是eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另外一个线程正则扩容

            //因此要查找key对应的值的话,直接到新newtable找

            return (p = e.find(h, key)) != null ? p.val : null;

        while ((e = e.next) != null) {

            if (e.hash == h &&

                ((ek = e.key) == key || (ek != null && key.equals(ek))))

                return e.val;

        }

    }

    return null;

}

这个get请求,咱们须要cas来保证变量的原子性。若是tab[i]正被锁住,那么CAS就会失败,失败以后就会不断的重试。这也保证了get在高并发状况下不会出错。

咱们来分析下到底有多少种状况会致使get在并发的状况下可能取不到值。一、一个线程在get的时候,另外一个线程在对同一个key的node进行remove操做;二、一个线程在get的时候,另外一个线程正则重排table。可能致使旧table取不到值。

那么本质是,我在get的时候,有其余线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?咱们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt究竟是干吗的:

 

@SuppressWarnings("unchecked")

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {

    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);

}


它是对tab[i]进行原子性的读取,由于咱们知道putVal等对table的桶操做是有加锁的,那么通常状况下咱们对桶的读也是要加锁的,可是咱们这边为何不须要加锁呢?由于咱们用了Unsafe的getObjectVolatile,由于table是volatile类型,因此对tab[i]的原子请求也是可见的。由于若是同步正确的状况下,根据happens-before原则,对volatile域的写入操做happens-before于每个后续对同一域的读操做。因此无论其余线程对table链表或树的修改,都对get读取可见。

 

sun.misc.Unsafe类:

Unsafe类是什么呢?java不能直接访问操做系统底层,而是经过本地方法来访问。Unsafe类提供了硬件级别的原子操做。Unsafe类在jdk 源码的多个类中用到,这个类的提供了一些绕开JVM的更底层功能,基于它的实现能够提升效率。可是,它是一把双刃剑:正如它的名字所预示的那样,它是Unsafe的,它所分配的内存须要手动free(不被GC回收)。Unsafe类,提供了JNI某些功能的简单替代:确保高效性的同时,使事情变得更简单。

//在o的offset偏移地址处,获取volatile类型的对象
public native java.lang.Object getObjectVolatile(java.lang.Object o, long l);

//原子性的更新java变量
public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2);

/**
     * Stores a reference value into a given Java variable, with volatile store
     * semantics. Otherwise identical to
     * {@link #putObject(Object, long, Object)}
     */
public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1);

  /**
     * Atomically update Java variable to <tt>x</tt> if it is currently holding
     * <tt>expected</tt>.
     * 
     * @return <tt>true</tt> if successful
     */
public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2);
相关文章
相关标签/搜索