理解ConcurrentHashMap1.8源码

ConcurrentHashMap源码分析

其实ConcurrentHashMap我本身已经看过不少遍了,可是今天在面试阿里的时候本身在描述ConcurrentHashMap发现本身根本讲不清楚什么是ConcurrentHashMap,以及里面是怎么实现的,搞的我忽然发现本身什么都不懂,因此我想要再次的来分析一下这个源码,彻底理解ConcurrentHashMap,而不是觉得本身懂了,实际上本身不懂。html

首先咱们看一下put方法,put方法会调用到putVal方法上面。java

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
	  //若是put进去的是个链表,这个参数表示链表的大小
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
			  //初始化链表
            tab = initTable();
			//若是这个槽位没有数据
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {		
				//使用CAS将这个新的node设置到hash桶里面去
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
			//帮助迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
				//获取锁
            V oldVal = null;
            synchronized (f) {
					//双重检查锁
                if (tabAt(tab, i) == f) {
						//若是hash值大于等于0,那么表明这个节点里的数据是链表
                    if (fh >= 0) {
                        binCount = 1;
							//每次遍历完后binCount加1,表示链表长度
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
								//若是hash值和key值都相同,那么覆盖,break结束循环
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
								//下一个节点为null,说明遍历到尾节点了,那么直接在尾节点设值一个新的值
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
						 //若是是红黑树
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
					  //若是链表个数大于8,那么就调用这个方法
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

解释一下上面的源码作了什么:node

  1. 首先作一下判断,不容许key和value中任意一个为空,不然抛出异常
  2. 计算key的hash值,而后遍历table数组
  3. 若是table数组为null或为空,那么就调用initTable作初始化
  4. 为了保证可见性,会使用tab去table数组里获取数据,若是没有数据,那么用casTabAt经过CAS将新Node设置到table数组里。(注:这里也体现了和hashmap不同的地方,hashmap直接经过数据拿就行了, 这个获取数据和设值都要保证可见性和线程安全性)
  5. 若是当前槽位所对应的hash值是MOVED,说明当前的table正在扩容迁移节点,那么就调用helpTransfer帮助迁移
  6. 走到这里,说明这个槽位里面的元素不止一个,有不少个,因此给头节点加上锁
  7. 若是当前的hash所对应的的槽位不是空的,而且hash值大于等于0,那么就说明这个槽位里面的对象是一个链表,那么就遍历链表
    1. 若是所遍历的链表里面有元素的hash值而且key和当前要插入的数据的是同样的,那么就覆盖原来的值
    2. 若是遍历到最后的节点都没有元素和要插入的值key是同样的,那么就新建一个Node节点,插入到链表的最后
    3. 每遍历一个节点就把binCount+1
  8. 若是当前的节点是TreeBin,那么说明该槽位里面的数据是红黑树,那么调用相应方法插入数据
  9. 最后若是binCount已经大于或等于8了,那么就调用treeifyBin

接下来咱们先看initTable 方法,再看treeifyBin和helpTransfer面试

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
			//一开始的时候sizeCtl为0
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
			//将sizeCtl用CAS设置成-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
						//由于sc一开始为0,因此n取DEFAULT_CAPACITY为16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
						//将table赋值为大小为16的Node数组
                    table = tab = nt;
						//将sc的设置为总容量的75%,若是 n 为 16 的话,那么这里 sc = 12
                    sc = n - (n >>> 2);
                }
            } finally {
					//最后将sizeCtl设置为sc的值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

这个方法里面初始化了一个很重要的变量sizeCtl,初始值为总容量的75%,table初始化为一个容量为16的数组数组

下面咱们在看看treeifyBin方法安全

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
			//若是数据的长度小于64,那么调用tryPresize进行扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
			//若是这个槽位里面的元素是链表
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {			
				//给链表头加上锁
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
					 //遍历链表,而后初始化红黑树对象
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
						//给tab槽位为index的元素设置新的对象
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

treeifyBin这个方法里面并非只是将链表转化为红黑树,而是当tab的长度大于64的时候才会将链表转成红黑树,不然的话,会调用tryPresize方法。多线程

而后咱们进入到tryPresize方法里面看看,tryPresize传入的参数是当前tab数组长度的两倍。ide

private final void tryPresize(int size) {
		//本来传进来的size已是两倍了,这里会再往上取最近的 2 的 n 次方
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
			// 这个 if 分支和以前说的初始化数组的代码基本上是同样的,在这里,咱们能够不用管这块代码
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
				//一开始进来的时候sc是大于0的
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
				//将SIZECTL设置为一个很大的复数
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

这个方法里面,会对tab数据进行校验,若是没有初始化的话会从新进行初始化大小,若是是第一次进来的话会将SIZECTL设置成一个很大的复数,而后调用transfer方法,传如当前的tab数据和null。源码分析

接着咱们来看transfer方法,这个方法比较长,主要的扩容和转移节点都在这个方法里面实现,咱们将这个长方法分红代码块,一步步分析:this

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		//若是当前tab数组长度为16
    int n = tab.length, stride;
		//那么(n >>> 3) / NCPU  = 0 小于MIN_TRANSFER_STRIDE
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
			//将stride设置为 16 
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
			//若是n是16,那么nextTab就是一个容量为32的空数组
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
			//将transferIndex赋值为16
        transferIndex = n;
    }
		...
}

这个代码块主要是作nextTable、transferIndex 、stride的赋值操做。

...
//初始化nextn为32
int nextn = nextTab.length;
//新建一个ForwardingNode对象,里面放入长度为32的nextTab数组
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
//初始化bound为0
for (int i = 0, bound = 0;;) {
	...
}

下面的代码会所有包裹在这个for循环里面,因此咱们来分析一下这个for循环里面的代码

for (int i = 0, bound = 0;;) {
		
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
				//将nextIndex设置为transferIndex,一开始16
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
				//一开始的时候nextIndex是和stride相同,那么nextBound为0,TRANSFERINDEX也为0
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
					//这里bound也直接为0
                bound = nextBound;
					//i = 15
                i = nextIndex - 1;
                advance = false;
            }
        }
		...
}

这个方法是为了设置transferIndex这个属性,transferIndex一开始是原tab数组的长度,每次会向前移动stride大小的值,若是transferIndex减到了0或小于0,那么就设置I等于-1,i在下面的代码会说到。

for (int i = 0, bound = 0;;) {
		...
		//在上面一段代码块中,若是transferIndex已经小于等于0了,就会把i设置为-1
		if (i < 0 || i >= n || i + n >= nextn) {
		    int sc;
				//表示迁移已经完成
		    if (finishing) {
					//将nextTable置空,表示不须要迁移了
		        nextTable = null;
					//将table设置为新的数组
		        table = nextTab;
					//sizeCtl设置为n的 1.5倍
		        sizeCtl = (n << 1) - (n >>> 1);
		        return;
		    }
		    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
		        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
		            return;
		        // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
		         // 也就是说,全部的迁移任务都作完了,也就会进入到上面的 if(finishing){} 分支了
		        finishing = advance = true;
		        i = n; // recheck before commit
		    }
		}
...
}

这个方法是用来表示已经迁移完毕了,能够退出。

for (int i = 0, bound = 0;;) {
	...
	//若是该槽位没有元素,那么直接把tab的i槽位设置为fwd
	else if ((f = tabAt(tab, i)) == null)
	    advance = casTabAt(tab, i, null, fwd);
	//说明这个槽位已经有其余线程迁移过了
	else if ((fh = f.hash) == MOVED)
	    advance = true; // already processed
	//走到这里,说明tab的这个槽位里面有数据,那么咱们须要得到槽位的头节点的监视器锁
	else {
	    synchronized (f) {	
			if (tabAt(tab, i) == f) {
				...
			} 
		  }
	}
	...
}

在这个代码块中,i会从最后一个元素一个个往前移动,而后根据i这个index来判断tab里面槽位的状况。

下面的代码咱们来分析监视器锁里面的内容:

synchronized (f) {
	if (tabAt(tab, i) == f) {
		//fh是当前节点的hash值
		if (fh >= 0) {
		    int runBit = fh & n;
			//lastRun设置为头节点
		    Node<K,V> lastRun = f;
        // 须要将链表一分为二,
        //   找到原链表中的 lastRun,而后 lastRun 及其以后的节点是一块儿进行迁移的
        //   lastRun 以前的节点须要进行克隆,而后分到两个链表中
		    for (Node<K,V> p = f.next; p != null; p = p.next) {
		        int b = p.hash & n;
		        if (b != runBit) {
		            runBit = b;
		            lastRun = p;
		        }
		    }
		    if (runBit == 0) {
		        ln = lastRun;
		        hn = null;
		    }
		    else {
		        hn = lastRun;
		        ln = null;
		    }
		    for (Node<K,V> p = f; p != lastRun; p = p.next) {
		        int ph = p.hash; K pk = p.key; V pv = p.val;
		        if ((ph & n) == 0)
		            ln = new Node<K,V>(ph, pk, pv, ln);
		        else
		            hn = new Node<K,V>(ph, pk, pv, hn);
		    }
			//其中的一个链表放在新数组的位置 i
		    setTabAt(nextTab, i, ln);
			//另外一个链表放在新数组的位置 i+n
		    setTabAt(nextTab, i + n, hn);
			//将原数组该位置处设置为 fwd,表明该位置已经处理完毕
			//其余线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
		    setTabAt(tab, i, fwd);
			//advance 设置为 true,表明该位置已经迁移完毕
		    advance = true;
		}
		//下面红黑树的迁移和上面差很少
		else if (f instanceof TreeBin) {
			....
		}
	} 
}

这个方法主要是将头节点里面的链表拆分红两个链表,而后设置到新的数组中去,再给老的数组设置为fwd,表示这个节点已经迁移过了。

到这里transfer方法已经分析完毕了。 这里我再举个例子,让你们根据透彻的明白多线程之间是怎么进行迁移工做的。

咱们假设stride仍是默认的16,第一次进来nextTab为null,可是tab的长度为32。

一开始的赋值:
1. n会设置成32,而且n只会赋值一次,表明被迁移的数组长度 
2. nextTab会被设置成一个大小为64的数组,并塞入到新的ForwardingNode对象中去。
3. transferIndex会被赋值为32

进入循环:
	初始化i为0,bound为0;
	第一次循环:
		1. 因为advance初始化为true,因此会进入到while循环中,循环出来后,transferIndex会被设置成16,bound被设置成16,i设置成31。这里你可能会问
		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true

	第二次循环:
		1. --i,变为30,--i >= bound成立,并将advance设置成false
		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
	。。。
	第十六次循环:
		1. --i,变为15,将transferIndex设置为0,bound也设置为0,i设置为15
		2. 将原来tab[i]的元素迁移到新的数组中去,并将tab[i]设置为fwd,将advance设置成为true
	第三十二次循环:
		1. 这个时候--i等于-1,而且(nextIndex = transferIndex) <= 0成立,那么会将i设置为-1,advance设置为false
		2. 会把SIZECTL用CAS设置为原来的值加1,而后设置finishing为true

	第三十三次循环:
		1. 因为finishing为true,那么nextTable设置为null,table设置为新的数组值,sizeCtl设置为旧tab的长度的1.5倍

原文出处:https://www.cnblogs.com/luozhiyun/p/11406557.html