一说到HashMap与Hashtable区别就会想到前者线程不安全,后者线程安全。可是当咱们须要线程安全的时候,Hashtable并非一个良好的选择,concurrentHashMap才是。node
获得线程安全的HashMap有如下三种方式:
①.使用Hashtable
②.使用Collections.synchronizedMap()方法
③.使用concurrentHashMap
有这3中方式,为何推荐使用第三种,而不是其他两个方法?答案:效率,咱们来看看那两种方式的源码
算法
咱们能够看到也使用synchronized关键字将HashMap包装成一个线程安全的map与Hashtable相似,每次只有一个线程能访问此对象。数组
public static Map synchronizedMap(Map m) { return new SynchronizedMap<>(m); }复制代码private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } 复制代码复制代码private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } 复制代码
/**
* 存放node的数组,大小是2的幂次方
*/
transient volatile Node[] table;
/**
* 扩容时用于存放数据的变量,平时为null
*/
private transient volatile Node[] nextTable;
/**
* 经过CAS更新,记录容器的容量大小
*/
private transient volatile long baseCount;
/**
* 控制标志符
* 负数: 表明正在进行初始化或扩容操做,其中-1表示正在初始化,-N 表示有N-1个线程正在进行扩容操做
* 正数或0: 表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小,相似于扩容阈值
* 它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
* 实际容量 >= sizeCtl,则扩容
*/
private transient volatile int sizeCtl;
/**
* 下次transfer方法的起始下标index加上1以后的值
*/
private transient volatile int transferIndex;
/**
* CAS自旋锁标志位
*/
private transient volatile int cellsBusy;
/**
* counter cell表,长度总为2的幂次
*/
private transient volatile CounterCell[] counterCells;
复制代码
static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
...
}
复制代码
static final class TreeNode extends LinkedHashMap.Entry {
TreeNode parent; // red-black tree links
TreeNode left;
TreeNode right;
TreeNode prev; // needed to unlink next upon deletion
boolean red;
...
}
复制代码
static final class TreeBin extends Node {
TreeNode root;
volatile TreeNode first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
...
}
复制代码
static final class ForwardingNode extends Node {
final Node[] nextTable;
//ForwardingNode节点hash为-1,若操做中遇到此类型节点,代表有线程正在扩容
ForwardingNode(Node[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
...
}
复制代码
/**
* 默认构造方法
*/
public ConcurrentHashMap() {}
/**
* 指定容量
*/
public ConcurrentHashMap(int initialCapacity) {
//异常状况
if (initialCapacity < 0)
throw new IllegalArgumentException();
//计算容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* 指定map
*/
public ConcurrentHashMap(Map m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
/**
* 指定容量、负载因子
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 指定容量、负载因子、并发级别
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
复制代码
concurrentHashMap调用Unsafe类中的方法实现CAS(这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,若是相等,则接受你指定的修改的值,不然拒绝你的操做),其内部方法大多为native方法即直接调用操做系统底层资源执行相应任务,提供了一些能够直接操控内存和线程的底层操做。安全
initTable方法判断sizeCtl值,若sizeCtl值为-1即有其余线程正在初始化,调用Thread.yield()让出CPU时间片,而正在初始化的线程经过Unsafe.compareAndSwapInt方法修改sizeCtl为-1,初始化数组和扩容阈值。数据结构
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//若sizeCtl<0,即存在其余线程正在初始化操做,确保只有一个线程进行初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//利用CAS方法把sizectl的值置为-1,代表已有线程进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//得到桶容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化node数组
Node[] nt = (Node[])new Node[n];
table = tab = nt;
//计算扩容阈值0.75n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
复制代码
从源码咱们能够看出最外层的流程控制采用while循环,而非if条件分支,由于Thread.yield()会让出cpu时间,目的在于确保数组初始化成功多线程
ConcurrentHashMap的put操做与HashMap很类似,但ConcurrentHashMap不容许null做为key和value,而且因为须要保证线程安全,有如下两个多线程状况:
并发
①.若是一个或多个线程正在对ConcurrentHashMap进行扩容操做,当前线程也要进入扩容的操做中。这个扩容的操做之因此能被检测到,是由于transfer方法会将已经操做过扩容桶头结点置为ForwardingNode节点,若是检测到须要插入的位置被该节点占有,就帮助进行扩容。
dom
②.若是检测到要插入的节点是非空且不是ForwardingNode节点,就对这个节点加锁,这样就保证了线程安全。
ide
final V putVal(K key, V value, boolean onlyIfAbsent) {
//与HashMap不一样,ConcurrentHashMap不容许null做为key或value
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
//若table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//若当前数组i位置上的节点为null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//CAS插入节点(V:当前数组i位置上的节点; O:null; N:新Node对象)
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当前正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//锁住当前数组i位置上的节点
synchronized (f) {
//判断是否节点f是否为当前数组i位置上的节点,防止被其它线程修改
if (tabAt(tab, i) == f) {
//当前位置桶的结构为链表
if (fh >= 0) {
binCount = 1;
//遍历链表节点
for (Node e = f;; ++binCount) {
K ek;
//若hash值与key值相同,进行替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
//若链表中找不到,尾插节点
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
//当前位置桶结构为红黑树,TreeBin哈希值固定为-2
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
//遍历红黑树上节点,更新或增长节点
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//若链表长度超过8,将链表转为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//节点数+1,若超过阈值则扩容
addCount(1L, binCount);
return null;
}
/**
* hash算法
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
复制代码
大体流程:
①.首先判断key和value是否为null,为null抛异常。
与HashMap不一样,ConcurrentHashMap不容许null做为key或value,为何这样设计?
性能
由于ConcurrentHashmap是支持并发的,这样会有一个问题,当你经过get(k)获取对应的value时,若是获取到的是null时,你没法判断,它是put(k,v)的时候value为null,仍是这个key历来没有作过映射。HashMap是非并发的,能够经过contains(key)来作这个判断。而支持并发的Map在调用m.contains(key)和m.get(key),m可能已经不一样了。参考
②.从新计算hash值,由于1.8concurrentHashMap引入了红黑树来处理较多的哈希冲突,简化了hash算法,不过对比了1.8HashMap的hash算法,除了由于concurrentHashMap不容许null为key,没有把null的hash值算为0之外,其多了一步位运算,之因此这样是为了确保重哈希算出的必定不为负数,我认为是由于若算出负值,可能会影响后续的节点类型判断
③.判断当前table是否为空,空的话初始化table,初始化方法已阐述过再也不多提
④.根据重哈希算出的值经过与运算获得桶索引,利用Unsafe类直接获取内存内存中对应位置上的节点,若没有碰撞即桶中无结点CAS直接添加
⑤.若发生碰撞,判断桶中第一个节点类型。是ForwardingNode节点的话,代表有其它线程正在扩容,则一块儿进行扩容操做
⑥.剩下场景,按节点相应结构链表或红黑树的方式插入或更新新节点
⑦.若新增节点后链表长度大于8,就把这个链表转换成红黑树
⑧.最后节点数量+1,校验是否超过阈值,若超过则扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//多线程CAS发生失败的时候执行
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
///若是check值大于等于0 则须要检验是否须要进行扩容操做
if (check >= 0) {
Node[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
复制代码
咱们能够看到在更新baseCount时用了2次CAS操做,一次直接更新basecount,若更新失败则更新CELLVALUE,若还更新失败会调用fullAddCount()方法,这个方法会死循环一直将因为多线程竞争致使CAS失败的容量变化值存到CounterCell数组中,为size()方法作准备
对于ConcurrentHashMap来讲,这个table里到底装了多少东西实际上是个不肯定的数量,由于不可能在调用size()方法的时候像GC的“stop the world”同样让其余线程都停下来让你去统计,所以只能说这个数量是个估计值。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
复制代码
为了统计元素个数,ConcurrentHashMap经过累加baseCount和CounterCell数组中的数量,元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,获得元素的总个数。
支持多线程扩容,没有加锁 ,其目的不只为了知足concurrent的要求,而是但愿利用并发处理去减小扩容带来的时间影响。
如下两个场景可能会触发扩容机制(与1.8HashMap相同):
①.当桶中链表长度达到阈值8,但整个ConcurrentHashMap节点数量小于64
②.新增节点后,整个ConcurrentHashMap节点数量超过阈值
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 建立node数组,容量为当前的两倍
Node[] nt = (Node[])new Node[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 若扩容时出现OOM异常,则将阈值设为最大,代表不支持扩容
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 建立ForwardingNode节点,做为标记位,代表当前位置桶已作过处理
ForwardingNode fwd = new ForwardingNode(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//经过CAS设置transferIndex属性值,并初始化i和bound值
//i指当前处理的槽位序号,bound指须要处理的槽位边界
//先处理最后一个桶的节点;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 将原数组中节点复制到新数组中去
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//若是全部的节点都已经完成复制工做 就把nextTable赋值给table 清空临时对象nextTable
if (finishing) {
nextTable = null;
table = nextTab;
//设置新扩容阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操做
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//锁住i位置上桶的节点
synchronized (f) {
//确保f是i位置上桶的节点
if (tabAt(tab, i) == f) {
Node ln, hn;
//当前桶是链式结构
if (fh >= 0) {
//构造两个链表
int runBit = fh & n;
Node lastRun = f;
//相似于1.8HashMap,只须要看新增的1bit是0仍是1进行分类
for (Node p = f.next; p != null; p = p.next) {
//n是就数组长度,不是长度-1
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另外一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就能够执行i--操做
advance = true;
}
//当前桶是红黑树结构,操做和上面的相似
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//若是扩容后已经再也不须要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
复制代码
如图所示(蓝色:hash值第X位为0,灰色:hash值第X位为1)
看过JDK8 HashMap扩容方法应该会感受有点像,ConcurrentHashMap对链表结构的处理相似HashMap的resize()方法最后部分,对红黑树结构处理相似HashMap里TreeNode内部类的split()方法,TreeBin的构造方法相似于TreeNode内部类的treeify()方法,可是因为ConcurrentHashMap须要保证线程安全其操做仍是复杂得多
问题:为何要反序构建链表?
参考了下1.7HashMap头插节点思想,由于后插入的数据被使用的频次更高,没法随机访问只能从头开始遍历查询,而ConcurrentHashMap利用内置锁synchronized锁住桶头节点,由于后插入的数据被使用的频次更高,倒序使锁持有时间更短,等待时间也就短了(仅本人见解)
给定一个key来肯定value的时候,必须知足两个条件 key相同 hash值相同,对于节点可能在链表或树上的状况,须要分别去查找。
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
////根据hash值肯定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//桶首节点的key与查找的key相同,则直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//树节点场景
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//链表场景
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
复制代码
不考虑细节处理,JDK 7中ConcurrentHashMap最主要采用segment,多线程竞争会先锁住segment,在其put操做中会先定位segment位置,再定位segment中具体桶位置,而JDK 8直接操做Node数组中的桶,锁住桶首节点,锁的粒度变小了。另外segment继承ReentrantLock,而ReentrantLock相对于synchronized关键字多了等待可中断、公平性、绑定多个条件,但针对于性能而言,因为synchronized内置锁不断在优化,其性能不会差。