在介绍并发容器以前,先分析下普通的容器,以及相应的实现,方便后续的对比。html
Hashtable、HashMap、TreeMap 都是最多见的一些 Map 实现,是以键值对的形式存储和操做数据的容器类型。java
Hashtable 是早期 Java 类库提供的一个哈希表实现,自己是同步的,不支持 null 键和值,因为同步致使的性能开销,因此已经不多被推荐使用。node
HashMap 是应用更加普遍的哈希表实现,行为上大体上与 HashTable 一致,主要区别在于 HashMap 不是同步的,支持 null 键和值等。一般状况下,HashMap 进行 put 或者 get 操做,能够达到常数时间的性能,因此它是绝大部分利用键值对存取场景的首选,好比,实现一个用户 ID 和用户信息对应的运行时存储结构。git
HashMap 明确声明不是线程安全的数据结构,若是忽略这一点,简单用在多线程场景里,不免会出现问题,如 HashMap 在并发环境可能出现无限循环占用 CPU、size 不许确等诡异的问题。github
TreeMap 则是基于红黑树的一种提供顺序访问的 Map,和 HashMap 不一样,它的 get、put、remove 之类操做都是 O(log(n))的时间复杂度,具体顺序能够由指定的 Comparator 来决定,或者根据键的天然顺序来判断。面试
Hashtable是经过"拉链法"实现的哈希表,结构以下图所示:算法
1. 定义后端
public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable{}
Hashtable 继承于 Dictionary 类,实现了 Map, Cloneable, java.io.Serializable接口。数组
2. 构造方法缓存
Hashtable 一共提供了 4 个构造方法:
public Hashtable(int initialCapacity, float loadFactor): 用指定初始容量和指定负载因子构造一个新的空哈希表。 public Hashtable(int initialCapacity):用指定初始容量和默认的负载因子 (0.75) 构造一个新的空哈希表。 public Hashtable():默认构造函数,容量为 11,负载因子为 0.75。 - public Hashtable(Map<? extends K, ? extends V> t):构造一个与给定的 Map 具备相同映射关系的新哈希表。
它包括几个重要的成员变量:table, count, threshold, loadFactor, modCount。
fail-fast机制举例:有两个线程(线程A,线程B),其中线程A负责遍历list、线程B修改list。线程A在遍历list过程的某个时候(此时expectedModCount = modCount=N),线程启动,同时线程B增长一个元素,这是modCount的值发生改变(modCount + 1 = N + 1)。线程A继续遍历执行next方法时,通告checkForComodification方法发现expectedModCount = N ,而modCount = N + 1,二者不等,这时就抛出ConcurrentModificationException 异常,从而产生fail-fast机制。
3. PUT操做
put 方法的整个流程为:
public synchronized V put(K key, V value) { // Make sure the value is not null确保value不为null if (value == null) { throw new NullPointerException(); } // Makes sure the key is not already in the hashtable. //确保key不在hashtable中 //首先,经过hash方法计算key的哈希值,并计算得出index值,肯定其在table[]中的位置 //其次,迭代index索引位置的链表,若是该位置处的链表存在相同的key,则替换value,返回旧的value Entry tab[] = table; int hash = hash(key); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { V old = e.value; e.value = value; return old; } } modCount++; if (count >= threshold) { // Rehash the table if the threshold is exceeded //若是超过阀值,就进行rehash操做 rehash(); tab = table; hash = hash(key); index = (hash & 0x7FFFFFFF) % tab.length; } // Creates the new entry. //将值插入,返回的为null Entry<K,V> e = tab[index]; // 建立新的Entry节点,并将新的Entry插入Hashtable的index位置,并设置e为新的Entry的下一个元素 tab[index] = new Entry<>(hash, key, value, e); count++; return null; }
4. Get操做
首先经过 hash()方法求得 key 的哈希值,而后根据 hash 值获得 index 索引。而后迭代链表,返回匹配的 key 的对应的 value;找不到则返回 null。
public synchronized V get(Object key) { Entry tab[] = table; int hash = hash(key); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return e.value; } } return null; }
5. rehash扩容
protected void rehash() { int oldCapacity = table.length; Entry<?,?>[] oldMap = table; // overflow-conscious code int newCapacity = (oldCapacity << 1) + 1; if (newCapacity - MAX_ARRAY_SIZE > 0) { if (oldCapacity == MAX_ARRAY_SIZE) // Keep running with MAX_ARRAY_SIZE buckets return; newCapacity = MAX_ARRAY_SIZE; } Entry<?,?>[] newMap = new Entry<?,?>[newCapacity]; modCount++; threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1); table = newMap; for (int i = oldCapacity ; i-- > 0 ;) { for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) { Entry<K,V> e = old; old = old.next; int index = (e.hash & 0x7FFFFFFF) % newCapacity; e.next = (Entry<K,V>)newMap[index]; newMap[index] = e; } } }
6. Remove方法
remove方法主要逻辑以下:
Hash值的不一样实现:JDK7 Vs JDK8
以上给出的代码均为jdk7中的实现,注意到在jdk7和8里面,关于元素hash值的计算方法是不同的。
//利用异或,移位等运算,对key的hashcode进一步进行计算以及二进制位的调整等来保证最终获取的存储位置尽可能分布均匀 final int hash(Object k) { int h = hashSeed; if (0 != h && k instanceof String) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); }
以上hash函数计算出的值,经过indexFor进一步处理来获取实际的存储位置
//返回数组下标 static int indexFor(int h, int length) { return h & (length-1); }
int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length;
注意到都使用到了hashCode,这个方法是在Object方法中定义的,
@HotSpotIntrinsicCandidate public native int hashCode();
能够看到是Object里没有给出hashCode的实现,只是声明为一个native方法,说明Java会去调用本地C/C++对hashcode的具体实现。
在JDK8及之后,能够经过以下指令来获取到全部的hash算法,
java -XX:+PrintFlagsFinal | grep hashCode
具体大概有以下几种,第5个算法是默认使用的,用到了异或操做和一些偏移算法来生成hash值。
0 == Lehmer random number generator,
1 == "somehow" based on memory address
2 == always 1
3 == increment counter
4 == memory based again ("somehow")
5 == Marsaglia XOR-Shift algorithm, that has nothing to do with memory.
HashTable相对于HashMap的最大特色就是线程安全,全部的操做都是被synchronized锁保护的
参考:
HashMap是java中使用最为频繁的map类型,其读写效率较高,可是由于其是非同步的,即读写等操做都是没有锁保护的,因此在多线程场景下是不安全的,容易出现数据不一致的问题。
HashMap的结构和HashTable一致,都是使用是由数组和链表两种数据结构组合而成的,不一样的是在JDK8里面引入了红黑树,当链表长度大于8时,会将链表转换为红黑树。
HashMap的成员变量和HashTable同样,在进行初始化的时候,都会设置一个容量值(capacity)和加载因子(loadFactor)。
HashMap的核心构造函数以下,主要是设置负载因子,以及根据用户的设定容量,找到一个不小于该容量的阈值。
public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; this.threshold = tableSizeFor(initialCapacity); }
因为HashMap和HashTable有实现上有诸多类似之处,这里会重点介绍hashMap在jdk7和8中的不一样实现。
Hash运算
无论增长、删除、查找键值对,定位到哈希桶数组的位置都是很关键的第一步。都须要用到hash算法,jdk7和8中的算法基本一致,具体实现以下:
static final int hash(Object key) { //jdk1.8 & jdk1.7 int h; // h = key.hashCode() 为第一步 取hashCode值 // h ^ (h >>> 16) 为第二步 高位参与运算 return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
而后利用获得的hash值与数组长度取模,获得相应的index。
如下图示实例,给出了计算过程,
Get操做
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } /** * Implements Map.get and related methods * * @param hash hash for key * @param key the key * @return the node, or null if none */ final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
Get操做比较简单:
PUT操做
PUT操做的执行过程以下:
①.判断键值对数组table[i]是否为空或为null,不然执行resize()进行扩容;
②.根据键值key计算hash值获得插入的数组索引i,若是table[i]==null,直接新建节点添加,转向⑥,若是table[i]不为空,转向③;
③.判断table[i]的首个元素是否和key同样,若是相同直接覆盖value,不然转向④,这里的相同指的是hashCode以及equals;
④.判断table[i] 是否为treeNode,即table[i] 是不是红黑树,若是是红黑树,则直接在树中插入键值对,不然转向⑤;
⑤.遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操做,不然进行链表的插入操做;遍历过程当中若发现key已经存在直接覆盖value便可;
⑥.插入成功后,判断实际存在的键值对数量size是否超多了最大容量threshold,若是超过,进行扩容。
1 public V put(K key, V value) { 2 // 对key的hashCode()作hash 3 return putVal(hash(key), key, value, false, true); 4 } 5 6 final V putVal(int hash, K key, V value, boolean onlyIfAbsent, 7 boolean evict) { 8 Node<K,V>[] tab; Node<K,V> p; int n, i; 9 // 步骤①:tab为空则建立 10 if ((tab = table) == null || (n = tab.length) == 0) 11 n = (tab = resize()).length; 12 // 步骤②:计算index,并对null作处理 13 if ((p = tab[i = (n - 1) & hash]) == null) 14 tab[i] = newNode(hash, key, value, null); 15 else { 16 Node<K,V> e; K k; 17 // 步骤③:节点key存在,直接覆盖value 18 if (p.hash == hash && 19 ((k = p.key) == key || (key != null && key.equals(k)))) 20 e = p; 21 // 步骤④:判断该链为红黑树 22 else if (p instanceof TreeNode) 23 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); 24 // 步骤⑤:该链为链表 25 else { 26 for (int binCount = 0; ; ++binCount) { 27 if ((e = p.next) == null) { 28 p.next = newNode(hash, key,value,null); //链表长度大于8转换为红黑树进行处理 29 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st 30 treeifyBin(tab, hash); 31 break; 32 } // key已经存在直接覆盖value 33 if (e.hash == hash && 34 ((k = e.key) == key || (key != null && key.equals(k)))) 35 break; 36 p = e; 37 } 38 } 39 40 if (e != null) { // existing mapping for key 41 V oldValue = e.value; 42 if (!onlyIfAbsent || oldValue == null) 43 e.value = value; 44 afterNodeAccess(e); 45 return oldValue; 46 } 47 } 48 ++modCount; 49 // 步骤⑥:超过最大容量 就扩容 50 if (++size > threshold) 51 resize(); 52 afterNodeInsertion(evict); 53 return null; 54 }
Resize扩容操做
因为JDK8引入了红黑树,因此在实现上JDK7和8的resize过程不太一致。
首先是JDK7的实现,
1 void resize(int newCapacity) { //传入新的容量 2 Entry[] oldTable = table; //引用扩容前的Entry数组 3 int oldCapacity = oldTable.length; 4 if (oldCapacity == MAXIMUM_CAPACITY) { //扩容前的数组大小若是已经达到最大(2^30)了 5 threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样之后就不会扩容了 6 return; 7 } 8 9 Entry[] newTable = new Entry[newCapacity]; //初始化一个新的Entry数组 10 transfer(newTable); //!!将数据转移到新的Entry数组里 11 table = newTable; //HashMap的table属性引用新的Entry数组 12 threshold = (int)(newCapacity * loadFactor);//修改阈值 13 }
这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer()方法将原有Entry数组的元素拷贝到新的Entry数组里。
1 void transfer(Entry[] newTable) { 2 Entry[] src = table; //src引用了旧的Entry数组 3 int newCapacity = newTable.length; 4 for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组 5 Entry<K,V> e = src[j]; //取得旧Entry数组的每一个元素 6 if (e != null) { 7 src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组再也不引用任何对象) 8 do { 9 Entry<K,V> next = e.next; 10 int i = indexFor(e.hash, newCapacity); //!!从新计算每一个元素在数组中的位置 11 e.next = newTable[i]; //标记[1] 12 newTable[i] = e; //将元素放在数组上 13 e = next; //访问下一个Entry链上的元素 14 } while (e != null); 15 } 16 } 17 }
newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;这样先放在一个索引上的元素终会被放到Entry链的尾部(若是发生了hash冲突的话),这一点和Jdk1.8有区别。
具体举例以下图所示:
接下来是JDK8中的实现,
/** * Initializes or doubles table size. If null, allocates in * accord with initial capacity target held in field threshold. * Otherwise, because we are using power-of-two expansion, the * elements from each bin must either stay at same index, or move * with a power of two offset in the new table. * * @return the table */ final Node<K,V>[] resize() { Node<K,V>[] oldTab = table; int oldCap = (oldTab == null) ? 0 : oldTab.length; int oldThr = threshold; int newCap, newThr = 0; if (oldCap > 0) { if (oldCap >= MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return oldTab; } else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) newThr = oldThr << 1; // double threshold } else if (oldThr > 0) // initial capacity was placed in threshold newCap = oldThr; else { // zero initial threshold signifies using defaults newCap = DEFAULT_INITIAL_CAPACITY; newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } if (newThr == 0) { float ft = (float)newCap * loadFactor; newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } threshold = newThr; @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; table = newTab; if (oldTab != null) { for (int j = 0; j < oldCap; ++j) { Node<K,V> e; if ((e = oldTab[j]) != null) { oldTab[j] = null; if (e.next == null) newTab[e.hash & (newCap - 1)] = e; else if (e instanceof TreeNode) ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { // preserve order Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } return newTab; }
因为Size会进行2次幂的扩展(指长度扩为原来2倍),因此,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。经过下面的例子,能够清楚的看到,21和5在原来的数组中都处于相同的位置,可是在新的数组中,21到了新的位置,位置为原来的位置加上16,也就是旧的Capacity;可是5还在原来的位置。
假定咱们在Size变为2倍之后,从新计算hash,由于n变为2倍,相应的n-1的mask范围在高位多1bit(红色),也就是与上面示意图中红色部分对应的那一位,若是那位是1,则须要移动到新的位置,不然不变。
回到代码实现中,直接用旧的hash值与上oldCapacity,由于旧的capacity是2的倍数(二进制为00000...1000),并且获取旧index的时候采用hash&(oldCap-1),因此直接e.hash & oldCap
就是判断新增长的高位是否为1,为1则须要移动,不然保持不变。
if ((e.hash & oldCap) == 0)
这种巧妙的方法,同时因为高位的1和0随机出现,保证了resize以后元素分布的离散性。
下图是这一过程的模拟,
JDK8中的红黑树
引入红黑树主要是为了保证在hash分布极不均匀的状况下的性能,当一个链表太长(大于8)的时候,经过动态的将它替换成一个红黑树,这话的话会将时间复杂度从O(n)降为O(logn)。
为何HashMap的数组长度必定保持2的次幂?
HashMap Vs HashTable
参考:
TreeMap继承于AbstractMap,实现了Map, Cloneable, NavigableMap, Serializable接口。
TreeMap 是一个有序的key-value集合,它是经过红黑树实现的。该映射根据其键的天然顺序进行排序,或者根据建立映射时提供的Comparator进行排序,具体取决于使用的构造方法。
TreeMap的基本操做 containsKey、get、put 和 remove 的时间复杂度是 log(n) 。
对于SortedMap来讲,该类是TreeMap体系中的父接口,也是区别于HashMap体系最关键的一个接口。SortedMap接口中定义的第一个方法Comparator<? super K> comparator();
该方法决定了TreeMap体系的走向,有了比较器,就能够对插入的元素进行排序了。
TreeMap的查找、插入、更新元素等操做,主要是对红黑树的节点进行相应的更新,和数据结构中相似。
TreeSet基于TreeMap实现,底层也是红黑树。只是每次插入元素时,value为一个默认的dummy数据。
HashSet的实现很简单,内部有一个HashMap的成员变量,全部的Set相关的操做都转换为了对HashMapde操做。
public class HashSet<E> extends AbstractSet<E> implements Set<E>, Cloneable, java.io.Serializable { static final long serialVersionUID = -5024744406713321676L; private transient HashMap<E,Object> map; // Dummy value to associate with an Object in the backing Map private static final Object PRESENT = new Object(); //其余操做省略 }
从上面的code能够看到,内部还定义了一个PRESENT的dummy对象,当添加元素时,直接添加一对键值对,key为元素值,value为PRESENT。
/** * Adds the specified element to this set if it is not already present. * More formally, adds the specified element <tt>e</tt> to this set if * this set contains no element <tt>e2</tt> such that * <tt>(e==null ? e2==null : e.equals(e2))</tt>. * If this set already contains the element, the call leaves the set * unchanged and returns <tt>false</tt>. * * @param e element to be added to this set * @return <tt>true</tt> if this set did not already contain the specified * element */ public boolean add(E e) { return map.put(e, PRESENT)==null; }
其余的操做相似,就是把PRESENT当作value。
首先是定义,
public class LinkedHashMap<K,V> extends HashMap<K,V> implements Map<K,V> { ... }
能够看到,LinkedHashMap是HashMap的子类,但和HashMap的无序性不同,LinkedHashMap经过维护一个运行于全部条目的双向链表,保证了元素迭代的顺序。该迭代顺序能够是插入顺序或者是访问顺序,这个能够在初始化的时候肯定,默认采用插入顺序来维持取出键值对的次序。
在成员变量上,与HashMap不一样的是,引入了before和after两个变量来记录先后的元素。
一、K key 二、V value 三、Entry<K, V> next 四、int hash 五、Entry<K, V> before 六、Entry<K, V> after
1-4是从HashMap.Entry中继承过来的;5-6是LinkedHashMap独有的。注意next是用于维护HashMap指定table位置上链接的Entry的顺序的,before、After是用于维护Entry插入的前后顺序的。
能够把LinkedHashMap的结构当作以下图所示:
接下来主要介绍LinkedHashMap的排序操做,
在构造函数中,须要指定accessOrder,有两种状况:
public LinkedHashMap(int initialCapacity, float loadFactor, boolean accessOrder) { super(initialCapacity, loadFactor); this.accessOrder = accessOrder; }
第二种状况,也就是accessOrder为true时,每次经过get/put方法访问时,都把访问的那个数据移到双向队列的尾部去,也就是说,双向队列最头的那个数据就是最不常访问的那个数据。具体实现以下,afterNodeAccess这个方法在HashMap中没有实现,LinkedHashMap进行了实现,将元素进行排序。
void afterNodeAccess(Node<K,V> e) { // move node to last LinkedHashMap.Entry<K,V> last; if (accessOrder && (last = tail) != e) { LinkedHashMap.Entry<K,V> p = (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after; p.after = null; if (b == null) head = a; else b.after = a; if (a != null) a.before = b; else last = b; if (last == null) head = p; else { p.before = last; last.after = p; } tail = p; ++modCount; } }
利用LinkedHashMap实现LRU缓存
LRU即Least Recently Used,最近最少使用,也就是说,当缓存满了,会优先淘汰那些最近最不常访问的数据。LinkedHashMap正好知足这个特性,当咱们开启accessOrder为true时,最新访问(get或者put(更新操做))的数据会被丢到队列的尾巴处,那么双向队列的头就是最不常用的数据了。
此外,LinkedHashMap还提供了一个方法,这个方法就是为了咱们实现LRU缓存而提供的,removeEldestEntry(Map.Entry<K,V> eldest) 方法。该方法能够提供在每次添加新条目时移除最旧条目的实现程序,默认返回 false。
下面是一个最简单的LRU缓存的实现,当size超过maxElement时,每次新增一个元素时,就会移除最久远的元素。
public class LRUCache extends LinkedHashMap { public LRUCache(int maxSize) { super(maxSize, 0.75F, true); maxElements = maxSize; } protected boolean removeEldestEntry(java.util.Map.Entry eldest) { //逻辑很简单,当大小超出了Map的容量,就移除掉双向队列头部的元素,给其余元素腾出点地来。 return size() > maxElements; } private static final long serialVersionUID = 1L; protected int maxElements; }
参考:
这节开始介绍并发容器,首先是ConcurrentHashMap,实现了线程安全的HashMap。以前也提到了HashMap在多线程环境下的问题,这小节先详细分析为何HashMap多线程下不安全。
首先说结论,为何HashMap不是线程安全的?在多线程下,会致使HashMap的Entry链表造成环形数据结构,一旦造成环形,Entry的next节点永远不为空,不管是进行resize仍是get/size等操做时,就会产生死循环。
首先针对JDK7进行分析:
下面是resize部分的代码,这段代码将原HashMap中的元素依次移动到扩容后的HashMap中,
1: // Transfer method in java.util.HashMap - 2: // called to resize the hashmap 3: // 依次移动每一个bucket中的元素到新的buckets中 4: for (int j = 0; j < src.length; j++) { 5: Entry e = src[j]; 6: if (e != null) { 7: src[j] = null; 8: do { // Next指向下一个须要移动的元素 9: Entry next = e.next; // 计算新Map中的位置 10: int i = indexFor(e.hash, newCapacity); // 插入到bucket中第一个位置 11: e.next = newTable[i]; 12: newTable[i] = e; // 指向原bucket中下一个位置的元素 13: e = next; 14: } while (e != null); 15: } 16: }
在正常单线程的状况下,若是有以下的HashMap的结构,为了方便这里只有2个bucket(java.util.HashMap中默认是 16)。
按照上面的resize流程,e和next分别指向A和B,A是第一次迭代将会被移动的元素,B是下一个。
在resize完成以后,每一个bucket的深度变小了,达到了resize的目的。整个过程在单线程下没有任何问题,可是考虑到多线程的状况,就会可能会出现竞争。
如今有两个线程Thread1,Thread2同时进行resize的操做,假设Thread1在运行到第9行后,Thread2获取了CPU而且也开始执行resize的操做。
1: // Transfer method in java.util.HashMap - 2: // called to resize the hashmap 3: 4: for (int j = 0; j < src.length; j++) { 5: Entry e = src[j]; 6: if (e != null) { 7: src[j] = null; 8: do { 9: Entry next = e.next; // Thread1 STOPS RIGHT HERE 10: int i = indexFor(e.hash, newCapacity); 11: e.next = newTable[i]; 12: newTable[i] = e; 13: e = next; 14: } while (e != null); 15: } 16: }
Thread1运行后,对应的e1和next1别指向A和B,可是Thread1并无移动元素。
假设Thread2在获取CPU后完整的运行了整个resize,新的Map结构将会以下图所示:
注意到e1
和next1
仍是指向A和B,可是A和B的位置关系已经变了,按照resize的算法进行两轮迭代以后,变成以下的结构,
注意此时e
和next
的指向,在下一次的迭代中,将把A放在第3个bucket的一个位置,可是B仍然是指向A的,因此出现了下面的相似于双向链表的结构,
接着Thread1就会进入到无限循环中,此时若是有get操做的话,也会出现无限循环的状况。这就是HashMap在多线程状况下容易出现的问题。
接着针对JDK8进行分析:
前面已经提到,JDK8和7在Resize的不一样之处就是8保留了链表中元素的前后位置,这样基本能够确保在resize过程当中不出现循环的问题,可是仍是可能出现数据丢失的问题。如下是resize的核心实现,
Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } }
在实现中会使用两个临时链表,分别存储新地址和旧地址的链表,最后将这两个链表放到对应的位置。
假定出现以下的状况,有ABC三个元素须要移动,首先线程1指向A,next即为B,此后线程2一样进行resize,并把high/low两个链表的更新完成,这时返回线程1继续运行。
可是线程1仍然按照正常的流程继续,A会被放到High链表,B会被放到Low链表,这以后因为B后面没有元素,更新完成,所以C就漏掉了。
其实无论是JDK7仍是8,因为链表的不少操做都没有加锁,每一个操做也不是原子操做,致使可能出现不少意想不到的结果,也是为何须要引入专门的ConcurrentHashMap。
为何不使用HashTable?
以前介绍的HashTable也能保证线程安全,可是HashTable使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,因此竞争越激烈效率越低。正由于如此,须要引入更加高效的多线程解决方案。
ConcurrentHashMap的结构在JDk1.7和1.8中有较大的不一样,下面将会分别进行介绍。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每一个Segment里包含一个HashEntry数组,咱们称之为table,每一个HashEntry是一个链表结构的元素。
Segment实际继承自可重入锁(ReentrantLock),这是与普通HashMap的最大区别。
面试点:ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提高?
ConcurrentHashMap容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的hash table,只要多个修改操做发生在不一样的段上,它们就能够并发进行。
1.1 初始化过程
初始化有三个参数:
如下是对初始化函数的分析:
1.2 Hash值计算
对某个元素进行Put/Get操做以前,都须要定位该元素在哪一个segment元素的某个table元素中的,定位的过程,取得key的hashcode值进行一次再散列(经过Wang/Jenkins算法),拿到再散列值后,以再散列值的高位进行取模获得当前元素在哪一个segment上。
具体的Hash实现以下:
1.3 Get方法
定位segment和定位table后,依次扫描这个table元素下的的链表,要么找到元素,要么返回null。
在高并发下的状况下如何保证取得的元素是最新的?
用于存储键值对数据的HashEntry,在设计上它的成员变量value等都是volatile类型的,这样就保证别的线程对value值的修改,get方法能够立刻看到。
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
1.4 Put方法
一、首先定位segment,当这个segment在map初始化后,还为null,由ensureSegment方法负责填充这个segment。
二、对Segment加锁,虽然value是volatile的,只能保证可见性,不能保证原子性。这里put操做不是原子操做,所以须要加锁。
三、定位所在的table元素,并扫描table下的链表,找到时:
注意到默认onlyIfAbsent为false,也就是若是有相同key的元素,会覆盖旧的值。不管是否覆盖,都是返回旧值。
没有找到时:
1.5 扩容操做
扩容操做不会扩容Segment,只会扩容对应的table数组,每次都是将数组翻倍。
以前也提到过,因为数组长度为2次幂,因此每次扩容以后,元素要么在原处,要么在原处加上偏移量为旧的size的新位置。
1.6 Size方法
size的时候进行两次不加锁的统计,两次一致直接返回结果,不一致,从新加锁再次统计,
ConcurrentHashMap的弱一致性
get方法和containsKey方法都是经过对链表遍历判断是否存在key相同的节点以及得到该节点的value。但因为遍历过程当中其余线程可能对链表结构作了调整,所以get和containsKey返回的多是过期的数据,这一点是ConcurrentHashMap在弱一致性上的体现。
相比JDK1.7的重要变化:
一、取消了segment数组,引入了Node结构,直接用Node数组来保存数据,锁的粒度更小,减小并发冲突的几率。
二、存储数据时采用了链表+红黑树的形式,纯链表的形式时间复杂度为O(n),红黑树则为O(logn),性能提高很大。何时链表转红黑树?当key值相等的元素造成的链表中元素个数超过8个的时候。
2.1 数据结构
ConcurrentHashMap在初始化时,只是给成员变量赋值,put时进行实际数组的填充。
2.2 Hash计算
先计算key的hash值,而后将高位加入计算来进行再散列。
2.3 Get方法
首先计算hash值,肯定在table中的位置。
注意到在初始化TreeBin,也就是设置红黑树所在的Node的第一个节点时,会设置对应的hash值,这些hash值定义以下。因此上面的代码中,能够经过判断首节点的hash值<0来肯定该节点为树。
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations
2.4 Put方法
PUT方法中会实际初始化数组,
2.5 扩容操做
线程执行put操做,发现容量已经达到扩容阈值,须要进行扩容操做。ConcurrentHashMap支持并发扩容,实现方式是,将表拆分,让每一个线程处理本身的区间。以下图:
迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其余线程,此节点已经迁移完毕。此时线程2访问到了ForwardingNode节点,若是线程2执行的put或remove等写操做,那么就会先帮其扩容。若是线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。
2.6 Size
Put操做时,addCount 方法用于 CAS 更新 baseCount,但颇有可能在高并发的状况下,更新失败,那么这些节点虽然已经被添加到哈希表中了,可是数量却没有被统计。
当更新 baseCount 失败的时候,会调用 fullAddCount 将这些失败的结点包装成一个 CounterCell 对象,保存在 CounterCell 数组中。
整张表实际的 size 实际上是 baseCount 加上 CounterCell 数组中元素的个数。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n); }
具体的计算count方法,
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
和JDK1.7同样,这样获得的size也只是大概数字,也具备弱一致性。
ConcurrentSkipListMap是一个并发安全, 基于skiplist实现有序存储的Map。能够当作是TreeMap的并发版本。
ConcurrentHashMap采用空间换取时间, 但它有着ConcurrentHashMap不能比拟的优势: 有序数据存储.
SkipList的结构以下图所示:
从图中能够得出ConcurrentSkipListMap的几个特色:
ConcurrentSkipListSet基于ConcurrentSkipListMap实现,相似于TreeSet基于TreeMap实现。
ConcurrentLinkedQueue实现了一个高并发的队列,底层使用链表做为其数据结构。从性能角度看,能够算是高并发环境下性能最好的队列了。
ConcurrentLinkedQueue类中,核心节点Node的定义以下,item表示目标元素,next表示当前Node的下一个元素。
private static class Node<E> { volatile E item; volatile Node<E> next;
add,offer将元素插入到尾部,其中add实现上直接调用了offer。peek方法拿头部的数据,可是不移除和poll拿头部的数据,可是同时移除。
CopyOnWrite(写时复制)的容器。通俗的理解是当咱们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,而后新的容器里添加元素,添加完元素以后,再用新的容器替换旧的容器。
好处是咱们能够对容器进行并发的读,而不须要加锁,由于当前容器不会添加任何元素。因此写时复制容器也是一种读写分离的思想,读和写不一样的容器。若是读的时候有多个线程正在向容器添加数据,读仍是会读到旧的数据,由于写的时候不会锁住旧的,只能保证最终一致性。
下面介绍一下写的过程,
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
首先,写入操做使用锁,主要是为了控制写写的状况。接着进行新数组的复制,将新的元素加入newElements,最后使用新的数组替换老的数组,修改就完成了。整个过程不会影响读取,而且修改完成之后,读取线程能够“觉察”到这个修改,由于array是volatile类型,保证了可见性。
/** The array, accessed only via getArray/setArray. */ private transient volatile Object[] array;
容器的适用场景:适用读多写少的并发场景,常见应用:白名单/黑名单,商品类目的访问和更新场景。可是因为会复制旧的数组,全部可能存在内存占用问题。
CopyOnWriteArraySet基于CopyOnWriteArrayList实现,为了保证数据的惟一性,在往其中加入数据时,会check当前数组中是否存在该元素,若是不存在,则加入到当前数组。
/** * Appends the element, if not present. * * @param e element to be added to this list, if absent * @return {@code true} if the element was added */ public boolean addIfAbsent(E e) { Object[] snapshot = getArray(); return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false : addIfAbsent(e, snapshot); }
阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:
阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
JDK7 提供了 7 个阻塞队列。分别是
1. ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。咱们可使用如下代码建立一个公平的阻塞队列:
2. LinkedBlockingQueue
一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
3. PriorityBlockingQueue
一个支持优先级的无界队列。默认状况下元素采起天然顺序排列,也能够经过比较器 comparator 来指定元素的排序规则。元素按照升序排列。
4. DelayQueue
一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。咱们能够将 DelayQueue 运用在如下应用场景:
缓存系统的设计:能够用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,从好比 TimerQueue 就是使用 DelayQueue 实现的。
队列中的 Delayed 必须实现 compareTo 来指定元素的顺序。好比让延时时间最长的放在队列的末尾。
5. SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列。每个 put 操做必须等待一个 take 操做,不然不能继续添加元素。SynchronousQueue 能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合于传递性场景, 好比在一个线程中使用的数据,传递给另一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
6. LinkedTransferQueue
是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其余阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代码是试图把存放当前元素的 s 节点做为 tail 节点。第二行代码是让 CPU 自旋等待消费者消费元素。由于自旋会消耗 CPU,因此自旋必定的次数后使用 Thread.yield() 方法来暂停当前正在执行的线程,并执行其余线程。
对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,则是试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待指定的时间再返回,若是超时还没消费元素,则返回 false,若是在超时时间内消费了元素,则返回 true。
7. LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的两端插入和移出元素。双端队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。可是 take 方法却等同于 takeFirst,不知道是否是 Jdk 的 bug,使用时仍是用带有 First 和 Last 后缀的方法更清楚。
在初始化 LinkedBlockingDeque 时能够设置容量防止其过渡膨胀。另外双向阻塞队列能够运用在“工做窃取”模式中。
在介绍阻塞队列的实现以前,先介绍一下生产者与消费者模式:
生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。
生产者和消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是经过阻塞队列来进行通讯,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
1)当队列满的时候,插入元素的线程被阻塞,直达队列不满。
2)队列为空的时候,获取元素的线程被阻塞,直到队列不空。
JDK是如何让生产者和消费者可以高效率的进行通信呢?
答案是使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。
以ArrayBlockingQueue为例:
private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair) { // 省略其余代码 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
从上述代码能够看到,当队列为空,notEmpty进行等待;插入元素后,唤醒等待的线程。当队列满时,notFull进行等待;删除元素后,唤醒等待的线程。
参考:
本文由『后端精进之路』原创,首发于博客 http://teckee.github.io/ , 转载请注明出处
搜索『后端精进之路』关注公众号,马上获取最新文章和价值2000元的BATJ精品面试课程。