Java并发编程系列-(5) Java并发容器

5 并发容器

5.1 Hashtable、HashMap、TreeMap、HashSet、LinkedHashMap

在介绍并发容器以前,先分析下普通的容器,以及相应的实现,方便后续的对比。html

Hashtable、HashMap、TreeMap 都是最多见的一些 Map 实现,是以键值对的形式存储和操做数据的容器类型。java

Hashtable 是早期 Java 类库提供的一个哈希表实现,自己是同步的,不支持 null 键和值,因为同步致使的性能开销,因此已经不多被推荐使用。node

HashMap 是应用更加普遍的哈希表实现,行为上大体上与 HashTable 一致,主要区别在于 HashMap 不是同步的,支持 null 键和值等。一般状况下,HashMap 进行 put 或者 get 操做,能够达到常数时间的性能,因此它是绝大部分利用键值对存取场景的首选,好比,实现一个用户 ID 和用户信息对应的运行时存储结构。git

HashMap 明确声明不是线程安全的数据结构,若是忽略这一点,简单用在多线程场景里,不免会出现问题,如 HashMap 在并发环境可能出现无限循环占用 CPU、size 不许确等诡异的问题。github

TreeMap 则是基于红黑树的一种提供顺序访问的 Map,和 HashMap 不一样,它的 get、put、remove 之类操做都是 O(log(n))的时间复杂度,具体顺序能够由指定的 Comparator 来决定,或者根据键的天然顺序来判断。面试

Hashtable

Hashtable是经过"拉链法"实现的哈希表,结构以下图所示:算法

5a688b8c0001129805220459.jpg

1. 定义后端

public class Hashtable<K,V>  
    extends Dictionary<K,V>  
    implements Map<K,V>, Cloneable, java.io.Serializable{}

Hashtable 继承于 Dictionary 类,实现了 Map, Cloneable, java.io.Serializable接口。数组

2. 构造方法缓存

Hashtable 一共提供了 4 个构造方法:

public Hashtable(int initialCapacity, float loadFactor): 用指定初始容量和指定负载因子构造一个新的空哈希表。
public Hashtable(int initialCapacity):用指定初始容量和默认的负载因子 (0.75) 构造一个新的空哈希表。
public Hashtable():默认构造函数,容量为 11,负载因子为 0.75。
- public Hashtable(Map<? extends K, ? extends V> t):构造一个与给定的 Map 具备相同映射关系的新哈希表。

它包括几个重要的成员变量:table, count, threshold, loadFactor, modCount。

  • table 是一个 Entry[] 数组类型,而 Entry实际上就是如上图所示的一个单向链表。Hashtable的键值对都是存储在Entry数组中的。
  • count 是 Hashtable 的大小,它是 Hashtable 保存的键值对的数量。
  • threshold 是 Hashtable 的阈值,用于判断是否须要调整 Hashtable 的容量。threshold 的值="容量 x 负载因子"。
  • loadFactor 就是负载因子。
  • modCount 记录hashTable被修改的次数,在对HashTable的操做中,不管add、remove、clear方法只要是涉及了改变Table数组元素的个数的方法都会致使modCount的改变。这主要用来实现“快速失败”也就是fail-fast,它是Java集合的一种错误检测机制。

fail-fast机制举例:有两个线程(线程A,线程B),其中线程A负责遍历list、线程B修改list。线程A在遍历list过程的某个时候(此时expectedModCount = modCount=N),线程启动,同时线程B增长一个元素,这是modCount的值发生改变(modCount + 1 = N + 1)。线程A继续遍历执行next方法时,通告checkForComodification方法发现expectedModCount  = N  ,而modCount = N + 1,二者不等,这时就抛出ConcurrentModificationException 异常,从而产生fail-fast机制。

3. PUT操做

put 方法的整个流程为:

  • 判断 value 是否为空,为空则抛出异常;
  • 计算 key 的 hash 值,并根据 hash 值得到 key 在 table 数组中的位置 index,若是 table[index] 元素不为空,则进行迭代,若是遇到相同的 key,则直接替换,并返回旧 value;
  • 不然,咱们能够将其插入到 table[index] 位置。
public synchronized V put(K key, V value) {
        // Make sure the value is not null确保value不为null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        //确保key不在hashtable中
        //首先,经过hash方法计算key的哈希值,并计算得出index值,肯定其在table[]中的位置
        //其次,迭代index索引位置的链表,若是该位置处的链表存在相同的key,则替换value,返回旧的value
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                V old = e.value;
                e.value = value;
                return old;
            }
        }

        modCount++;
        if (count >= threshold) {
            // Rehash the table if the threshold is exceeded
            //若是超过阀值,就进行rehash操做
            rehash();

            tab = table;
            hash = hash(key);
            index = (hash & 0x7FFFFFFF) % tab.length;
        }

        // Creates the new entry.
        //将值插入,返回的为null
        Entry<K,V> e = tab[index];
        // 建立新的Entry节点,并将新的Entry插入Hashtable的index位置,并设置e为新的Entry的下一个元素
        tab[index] = new Entry<>(hash, key, value, e);
        count++;
        return null;
    }

4. Get操做

首先经过 hash()方法求得 key 的哈希值,而后根据 hash 值获得 index 索引。而后迭代链表,返回匹配的 key 的对应的 value;找不到则返回 null。

public synchronized V get(Object key) {
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return e.value;
            }
        }
        return null;
    }

5. rehash扩容

  • 数组长度增长一倍(若是超过上限,则设置成上限值)。
  • 更新哈希表的扩容门限值。
  • 遍历旧表中的节点,计算在新表中的index,插入到对应位置链表的头节点。
protected void rehash() {
        int oldCapacity = table.length;
        Entry<?,?>[] oldMap = table;

        // overflow-conscious code
        int newCapacity = (oldCapacity << 1) + 1;
        if (newCapacity - MAX_ARRAY_SIZE > 0) {
            if (oldCapacity == MAX_ARRAY_SIZE)
                // Keep running with MAX_ARRAY_SIZE buckets
                return;
            newCapacity = MAX_ARRAY_SIZE;
        }
        Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];

        modCount++;
        threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
        table = newMap;

        for (int i = oldCapacity ; i-- > 0 ;) {
            for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {
                Entry<K,V> e = old;
                old = old.next;

                int index = (e.hash & 0x7FFFFFFF) % newCapacity;
                e.next = (Entry<K,V>)newMap[index];
                newMap[index] = e;
            }
        }
    }

6. Remove方法

remove方法主要逻辑以下:

  • 先获取synchronized锁。
  • 计算key的哈希值和index。
  • 遍历对应位置的链表,寻找待删除节点,若是存在,用e表示待删除节点,pre表示前驱节点。若是不存在,返回null。
  • 更新前驱节点的next,指向e的next。返回待删除节点的value值。

Hash值的不一样实现:JDK7 Vs JDK8

以上给出的代码均为jdk7中的实现,注意到在jdk7和8里面,关于元素hash值的计算方法是不同的。

  • 在JDK7中,hashtable专门实现了hash函数,在以上的例子中都有看到,具体的实现以下:
//利用异或,移位等运算,对key的hashcode进一步进行计算以及二进制位的调整等来保证最终获取的存储位置尽可能分布均匀
final int hash(Object k) {
        int h = hashSeed;
        if (0 != h && k instanceof String) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }

以上hash函数计算出的值,经过indexFor进一步处理来获取实际的存储位置

//返回数组下标
    static int indexFor(int h, int length) {
        return h & (length-1);
    }
  • 在jdk8里面,直接调用key.hashCode()来获取key的hash值,接着在保证hash值为正数的前提下,获得相应的下标,
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;

注意到都使用到了hashCode,这个方法是在Object方法中定义的,

@HotSpotIntrinsicCandidate
    public native int hashCode();

能够看到是Object里没有给出hashCode的实现,只是声明为一个native方法,说明Java会去调用本地C/C++对hashcode的具体实现。

在JDK8及之后,能够经过以下指令来获取到全部的hash算法,

java -XX:+PrintFlagsFinal | grep hashCode

具体大概有以下几种,第5个算法是默认使用的,用到了异或操做和一些偏移算法来生成hash值。

0 == Lehmer random number generator,
1 == "somehow" based on memory address
2 == always 1
3 == increment counter
4 == memory based again ("somehow")
5 == Marsaglia XOR-Shift algorithm, that has nothing to do with memory.

HashTable相对于HashMap的最大特色就是线程安全,全部的操做都是被synchronized锁保护的


参考:

  • https://www.imooc.com/article/23015
  • https://wiki.jikexueyuan.com/project/java-collection/hashtable.html
  • https://stackoverflow.com/questions/49172698/default-hashcode-implementation-for-java-objects

HashMap

HashMap是java中使用最为频繁的map类型,其读写效率较高,可是由于其是非同步的,即读写等操做都是没有锁保护的,因此在多线程场景下是不安全的,容易出现数据不一致的问题。

HashMap的结构和HashTable一致,都是使用是由数组和链表两种数据结构组合而成的,不一样的是在JDK8里面引入了红黑树,当链表长度大于8时,会将链表转换为红黑树。

Screen Shot 2019-12-07 at 9.46.25 PM.png

HashMap的成员变量和HashTable同样,在进行初始化的时候,都会设置一个容量值(capacity)和加载因子(loadFactor)。

  • 容量值指的并非表的真实长度,而是用户预估的一个值,真实的表长度,是不小于capacity的2的整数次幂。
  • 加载因子是为了计算哈希表的扩容门限,若是哈希表保存的节点数量达到了扩容门限,哈希表就会进行扩容的操做,扩容的数量为原表数量的2倍。默认状况下,capacity的值为16,loadFactor的值为0.75(综合考虑效率与空间后的折衷)

HashMap的核心构造函数以下,主要是设置负载因子,以及根据用户的设定容量,找到一个不小于该容量的阈值。

public HashMap(int initialCapacity, float loadFactor) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal initial capacity: " +
                                               initialCapacity);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        if (loadFactor <= 0 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                                               loadFactor);
        this.loadFactor = loadFactor;
        this.threshold = tableSizeFor(initialCapacity);
    }

因为HashMap和HashTable有实现上有诸多类似之处,这里会重点介绍hashMap在jdk7和8中的不一样实现。

Hash运算

无论增长、删除、查找键值对,定位到哈希桶数组的位置都是很关键的第一步。都须要用到hash算法,jdk7和8中的算法基本一致,具体实现以下:

static final int hash(Object key) {   //jdk1.8 & jdk1.7
     int h;
     // h = key.hashCode() 为第一步 取hashCode值
     // h ^ (h >>> 16)  为第二步 高位参与运算
     return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

而后利用获得的hash值与数组长度取模,获得相应的index。

如下图示实例,给出了计算过程,

45205ec2.png

Get操做

public V get(Object key) {
        Node<K,V> e;
        return (e = getNode(hash(key), key)) == null ? null : e.value;
    }

    /**
     * Implements Map.get and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @return the node, or null if none
     */
    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (first = tab[(n - 1) & hash]) != null) {
            if (first.hash == hash && // always check first node
                ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            if ((e = first.next) != null) {
                if (first instanceof TreeNode)
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null);
            }
        }
        return null;
    }

Get操做比较简单:

  • 先定位到数组中index位置,检查第一个节点是否知足要求 
  • 遍历对应该位置的链表,找到知足要求节点进行return

PUT操做

PUT操做的执行过程以下:

d669d29c.png

①.判断键值对数组table[i]是否为空或为null,不然执行resize()进行扩容;

②.根据键值key计算hash值获得插入的数组索引i,若是table[i]==null,直接新建节点添加,转向⑥,若是table[i]不为空,转向③;

③.判断table[i]的首个元素是否和key同样,若是相同直接覆盖value,不然转向④,这里的相同指的是hashCode以及equals;

④.判断table[i] 是否为treeNode,即table[i] 是不是红黑树,若是是红黑树,则直接在树中插入键值对,不然转向⑤;

⑤.遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操做,不然进行链表的插入操做;遍历过程当中若发现key已经存在直接覆盖value便可;

⑥.插入成功后,判断实际存在的键值对数量size是否超多了最大容量threshold,若是超过,进行扩容。

1 public V put(K key, V value) {
 2     // 对key的hashCode()作hash
 3     return putVal(hash(key), key, value, false, true);
 4 }
 5 
 6 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
 7                boolean evict) {
 8     Node<K,V>[] tab; Node<K,V> p; int n, i;
 9     // 步骤①:tab为空则建立
10     if ((tab = table) == null || (n = tab.length) == 0)
11         n = (tab = resize()).length;
12     // 步骤②:计算index,并对null作处理 
13     if ((p = tab[i = (n - 1) & hash]) == null) 
14         tab[i] = newNode(hash, key, value, null);
15     else {
16         Node<K,V> e; K k;
17         // 步骤③:节点key存在,直接覆盖value
18         if (p.hash == hash &&
19             ((k = p.key) == key || (key != null && key.equals(k))))
20             e = p;
21         // 步骤④:判断该链为红黑树
22         else if (p instanceof TreeNode)
23             e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
24         // 步骤⑤:该链为链表
25         else {
26             for (int binCount = 0; ; ++binCount) {
27                 if ((e = p.next) == null) {
28                     p.next = newNode(hash, key,value,null);
                        //链表长度大于8转换为红黑树进行处理
29                     if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st  
30                         treeifyBin(tab, hash);
31                     break;
32                 }
                    // key已经存在直接覆盖value
33                 if (e.hash == hash &&
34                     ((k = e.key) == key || (key != null && key.equals(k)))) 
35                          break;
36                 p = e;
37             }
38         }
39         
40         if (e != null) { // existing mapping for key
41             V oldValue = e.value;
42             if (!onlyIfAbsent || oldValue == null)
43                 e.value = value;
44             afterNodeAccess(e);
45             return oldValue;
46         }
47     }

48     ++modCount;
49     // 步骤⑥:超过最大容量 就扩容
50     if (++size > threshold)
51         resize();
52     afterNodeInsertion(evict);
53     return null;
54 }

Resize扩容操做

因为JDK8引入了红黑树,因此在实现上JDK7和8的resize过程不太一致。

首先是JDK7的实现,

1 void resize(int newCapacity) {   //传入新的容量
 2     Entry[] oldTable = table;    //引用扩容前的Entry数组
 3     int oldCapacity = oldTable.length;         
 4     if (oldCapacity == MAXIMUM_CAPACITY) {  //扩容前的数组大小若是已经达到最大(2^30)了
 5         threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样之后就不会扩容了
 6         return;
 7     }
 8  
 9     Entry[] newTable = new Entry[newCapacity];  //初始化一个新的Entry数组
10     transfer(newTable);                         //!!将数据转移到新的Entry数组里
11     table = newTable;                           //HashMap的table属性引用新的Entry数组
12     threshold = (int)(newCapacity * loadFactor);//修改阈值
13 }

这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer()方法将原有Entry数组的元素拷贝到新的Entry数组里。

1 void transfer(Entry[] newTable) {
 2     Entry[] src = table;                   //src引用了旧的Entry数组
 3     int newCapacity = newTable.length;
 4     for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组
 5         Entry<K,V> e = src[j];             //取得旧Entry数组的每一个元素
 6         if (e != null) {
 7             src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组再也不引用任何对象)
 8             do {
 9                 Entry<K,V> next = e.next;
10                 int i = indexFor(e.hash, newCapacity); //!!从新计算每一个元素在数组中的位置
11                 e.next = newTable[i]; //标记[1]
12                 newTable[i] = e;      //将元素放在数组上
13                 e = next;             //访问下一个Entry链上的元素
14             } while (e != null);
15         }
16     }
17 }

newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;这样先放在一个索引上的元素终会被放到Entry链的尾部(若是发生了hash冲突的话),这一点和Jdk1.8有区别。

具体举例以下图所示:

b2330062.png

接下来是JDK8中的实现,

/**
     * Initializes or doubles table size.  If null, allocates in
     * accord with initial capacity target held in field threshold.
     * Otherwise, because we are using power-of-two expansion, the
     * elements from each bin must either stay at same index, or move
     * with a power of two offset in the new table.
     *
     * @return the table
     */
    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

因为Size会进行2次幂的扩展(指长度扩为原来2倍),因此,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。经过下面的例子,能够清楚的看到,21和5在原来的数组中都处于相同的位置,可是在新的数组中,21到了新的位置,位置为原来的位置加上16,也就是旧的Capacity;可是5还在原来的位置。

d773f86e.png

假定咱们在Size变为2倍之后,从新计算hash,由于n变为2倍,相应的n-1的mask范围在高位多1bit(红色),也就是与上面示意图中红色部分对应的那一位,若是那位是1,则须要移动到新的位置,不然不变。

回到代码实现中,直接用旧的hash值与上oldCapacity,由于旧的capacity是2的倍数(二进制为00000...1000),并且获取旧index的时候采用hash&(oldCap-1),因此直接e.hash & oldCap就是判断新增长的高位是否为1,为1则须要移动,不然保持不变。

if ((e.hash & oldCap) == 0)

这种巧妙的方法,同时因为高位的1和0随机出现,保证了resize以后元素分布的离散性。

下图是这一过程的模拟,

3cc9813a.png

JDK8中的红黑树

引入红黑树主要是为了保证在hash分布极不均匀的状况下的性能,当一个链表太长(大于8)的时候,经过动态的将它替换成一个红黑树,这话的话会将时间复杂度从O(n)降为O(logn)。

为何HashMap的数组长度必定保持2的次幂?

  1. 从上面的分析JDK8 resize的过程能够可能到,数组长度保持2的次幂,当resize的时候,为了经过h&(length-1)计算新的元素位置,能够看到当扩容后只有一位差别,也就是多出了最左位的1,这样计算 h&(length-1)的时候,只要h对应的最左边的那一个差别位为0,就能保证获得的新的数组索引和老数组索引一致,不然index+OldCap。

1024555-20161115215812138-679881037.png

  1. 数组长度保持2的次幂,length-1的低位都为1,会使得得到的数组索引index更加均匀。hash函数采用各类位运算也是为了使得低位更加散列,若是低位所有为1,那么对于h低位部分来讲,任何一位的变化都会对结果产生影响,能够尽量的使元素分布比较均匀。

1024555-20161116001404732-625340289.png

HashMap Vs HashTable

  • HashMap容许将 null 做为一个 entry 的 key 或者 value,而 Hashtable 不容许。
  • HashTable 继承自 Dictionary 类,而 HashMap 是 Java1.2 引进的 Map interface 的一个实现。
  • HashTable 的方法是 Synchronized 的,而 HashMap 不是,在多个线程访问 Hashtable 时,不须要本身为它的方法实现同步,而 HashMap 就必须为之提供外同步。

参考:

  • https://tech.meituan.com/2016/06/24/java-hashmap.html
  • https://juejin.im/post/5aa5d8d26fb9a028d2079264
  • https://my.oschina.net/hosee/blog/618953
  • https://www.imooc.com/article/22943
  • https://www.cnblogs.com/chengxiao/p/6059914.html

TreeMap

TreeMap继承于AbstractMap,实现了Map, Cloneable, NavigableMap, Serializable接口。

Screen Shot 2019-12-08 at 3.56.55 PM.png

TreeMap 是一个有序的key-value集合,它是经过红黑树实现的。该映射根据其键的天然顺序进行排序,或者根据建立映射时提供的Comparator进行排序,具体取决于使用的构造方法。
TreeMap的基本操做 containsKey、get、put 和 remove 的时间复杂度是 log(n) 。

对于SortedMap来讲,该类是TreeMap体系中的父接口,也是区别于HashMap体系最关键的一个接口。SortedMap接口中定义的第一个方法Comparator<? super K> comparator();该方法决定了TreeMap体系的走向,有了比较器,就能够对插入的元素进行排序了。

TreeMap的查找、插入、更新元素等操做,主要是对红黑树的节点进行相应的更新,和数据结构中相似。

TreeSet

TreeSet基于TreeMap实现,底层也是红黑树。只是每次插入元素时,value为一个默认的dummy数据。

HashSet

HashSet的实现很简单,内部有一个HashMap的成员变量,全部的Set相关的操做都转换为了对HashMapde操做。

public class HashSet<E>
    extends AbstractSet<E>
    implements Set<E>, Cloneable, java.io.Serializable
{
    static final long serialVersionUID = -5024744406713321676L;

    private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();
    
    //其余操做省略
 }

从上面的code能够看到,内部还定义了一个PRESENT的dummy对象,当添加元素时,直接添加一对键值对,key为元素值,value为PRESENT。

/**
     * Adds the specified element to this set if it is not already present.
     * More formally, adds the specified element <tt>e</tt> to this set if
     * this set contains no element <tt>e2</tt> such that
     * <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
     * If this set already contains the element, the call leaves the set
     * unchanged and returns <tt>false</tt>.
     *
     * @param e element to be added to this set
     * @return <tt>true</tt> if this set did not already contain the specified
     * element
     */
    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }

其余的操做相似,就是把PRESENT当作value。

LinkedHashMap

首先是定义,

public class LinkedHashMap<K,V>
    extends HashMap<K,V>
    implements Map<K,V>
{
    ...
}

能够看到,LinkedHashMap是HashMap的子类,但和HashMap的无序性不同,LinkedHashMap经过维护一个运行于全部条目的双向链表,保证了元素迭代的顺序。该迭代顺序能够是插入顺序或者是访问顺序,这个能够在初始化的时候肯定,默认采用插入顺序来维持取出键值对的次序。

在成员变量上,与HashMap不一样的是,引入了before和after两个变量来记录先后的元素。

一、K key

二、V value

三、Entry<K, V> next

四、int hash

五、Entry<K, V> before

六、Entry<K, V> after

1-4是从HashMap.Entry中继承过来的;5-6是LinkedHashMap独有的。注意next是用于维护HashMap指定table位置上链接的Entry的顺序的,before、After是用于维护Entry插入的前后顺序的。

能够把LinkedHashMap的结构当作以下图所示:

test.png

接下来主要介绍LinkedHashMap的排序操做,

在构造函数中,须要指定accessOrder,有两种状况:

  • false,全部的Entry按照插入的顺序排列
  • true,全部的Entry按照访问的顺序排列
public LinkedHashMap(int initialCapacity,
         float loadFactor, boolean accessOrder) {
    super(initialCapacity, loadFactor);
    this.accessOrder = accessOrder;
}

第二种状况,也就是accessOrder为true时,每次经过get/put方法访问时,都把访问的那个数据移到双向队列的尾部去,也就是说,双向队列最头的那个数据就是最不常访问的那个数据。具体实现以下,afterNodeAccess这个方法在HashMap中没有实现,LinkedHashMap进行了实现,将元素进行排序。

void afterNodeAccess(Node<K,V> e) { // move node to last
        LinkedHashMap.Entry<K,V> last;
        if (accessOrder && (last = tail) != e) {
            LinkedHashMap.Entry<K,V> p =
                (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
            p.after = null;
            if (b == null)
                head = a;
            else
                b.after = a;
            if (a != null)
                a.before = b;
            else
                last = b;
            if (last == null)
                head = p;
            else {
                p.before = last;
                last.after = p;
            }
            tail = p;
            ++modCount;
        }
    }

利用LinkedHashMap实现LRU缓存

LRU即Least Recently Used,最近最少使用,也就是说,当缓存满了,会优先淘汰那些最近最不常访问的数据。LinkedHashMap正好知足这个特性,当咱们开启accessOrder为true时,最新访问(get或者put(更新操做))的数据会被丢到队列的尾巴处,那么双向队列的头就是最不常用的数据了。

此外,LinkedHashMap还提供了一个方法,这个方法就是为了咱们实现LRU缓存而提供的,removeEldestEntry(Map.Entry<K,V> eldest) 方法。该方法能够提供在每次添加新条目时移除最旧条目的实现程序,默认返回 false。

下面是一个最简单的LRU缓存的实现,当size超过maxElement时,每次新增一个元素时,就会移除最久远的元素。

public class LRUCache extends LinkedHashMap
{
    public LRUCache(int maxSize)
    {
        super(maxSize, 0.75F, true);
        maxElements = maxSize;
    }

    protected boolean removeEldestEntry(java.util.Map.Entry eldest)
    {
        //逻辑很简单,当大小超出了Map的容量,就移除掉双向队列头部的元素,给其余元素腾出点地来。
        return size() > maxElements;
    }

    private static final long serialVersionUID = 1L;
    protected int maxElements;
}

参考:

  • https://juejin.im/post/5a4b433b6fb9a0451705916f
  • https://www.cnblogs.com/xiaoxi/p/6170590.html

5.2 ConcurrentHashMap

这节开始介绍并发容器,首先是ConcurrentHashMap,实现了线程安全的HashMap。以前也提到了HashMap在多线程环境下的问题,这小节先详细分析为何HashMap多线程下不安全。

HashMap多线程环境下的问题分析

首先说结论,为何HashMap不是线程安全的?在多线程下,会致使HashMap的Entry链表造成环形数据结构,一旦造成环形,Entry的next节点永远不为空,不管是进行resize仍是get/size等操做时,就会产生死循环。

首先针对JDK7进行分析:

下面是resize部分的代码,这段代码将原HashMap中的元素依次移动到扩容后的HashMap中,

1:  // Transfer method in java.util.HashMap -
2:  // called to resize the hashmap
3:  // 依次移动每一个bucket中的元素到新的buckets中
4:  for (int j = 0; j < src.length; j++) {
5:    Entry e = src[j];
6:    if (e != null) {
7:      src[j] = null;
8:      do {
            // Next指向下一个须要移动的元素
9:          Entry next = e.next; 
            // 计算新Map中的位置
10:         int i = indexFor(e.hash, newCapacity);
            // 插入到bucket中第一个位置
11:         e.next = newTable[i];
12:         newTable[i] = e;
            // 指向原bucket中下一个位置的元素
13:         e = next;
14:     } while (e != null);
15:   }
16: }

在正常单线程的状况下,若是有以下的HashMap的结构,为了方便这里只有2个bucket(java.util.HashMap中默认是 16)。

Screen Shot 2019-12-08 at 10.45.35 PM.png

按照上面的resize流程,e和next分别指向A和B,A是第一次迭代将会被移动的元素,B是下一个。

  • 第一次迭代后,A被移动到新的Map中,Map的容量已经增大了一倍。A的位置以下图所示

Screen Shot 2019-12-08 at 10.46.40 PM.png

  • 第二次迭代后,B被移动到了新的位置,以下图所示,C为下一个待移动的元素。

Screen Shot 2019-12-08 at 10.47.42 PM.png

  • 第三次迭代以后,C被移动到了新的位置,因为C以后没有其余元素,所以整个resize过程完成,最后新的Map以下:

Screen Shot 2019-12-08 at 10.48.16 PM.png

在resize完成以后,每一个bucket的深度变小了,达到了resize的目的。整个过程在单线程下没有任何问题,可是考虑到多线程的状况,就会可能会出现竞争。

如今有两个线程Thread1,Thread2同时进行resize的操做,假设Thread1在运行到第9行后,Thread2获取了CPU而且也开始执行resize的操做。

1:  // Transfer method in java.util.HashMap -
2:  // called to resize the hashmap
3:  
4:  for (int j = 0; j < src.length; j++) {
5:    Entry e = src[j];
6:    if (e != null) {
7:      src[j] = null;
8:      do {
9:      Entry next = e.next; 
     // Thread1 STOPS RIGHT HERE
10:     int i = indexFor(e.hash, newCapacity);
11:     e.next = newTable[i];
12:     newTable[i] = e;
13:     e = next;
14:   } while (e != null);
15:   }
16: }

Thread1运行后,对应的e1和next1别指向A和B,可是Thread1并无移动元素。

Screen Shot 2019-12-08 at 10.49.53 PM.png

假设Thread2在获取CPU后完整的运行了整个resize,新的Map结构将会以下图所示:

Screen Shot 2019-12-08 at 10.50.42 PM.png

注意到e1next1仍是指向A和B,可是A和B的位置关系已经变了,按照resize的算法进行两轮迭代以后,变成以下的结构,

Screen Shot 2019-12-08 at 10.53.01 PM.png

Screen Shot 2019-12-08 at 10.53.29 PM.png

注意此时enext的指向,在下一次的迭代中,将把A放在第3个bucket的一个位置,可是B仍然是指向A的,因此出现了下面的相似于双向链表的结构,

Screen Shot 2019-12-08 at 10.54.19 PM.png

接着Thread1就会进入到无限循环中,此时若是有get操做的话,也会出现无限循环的状况。这就是HashMap在多线程状况下容易出现的问题。

接着针对JDK8进行分析:

前面已经提到,JDK8和7在Resize的不一样之处就是8保留了链表中元素的前后位置,这样基本能够确保在resize过程当中不出现循环的问题,可是仍是可能出现数据丢失的问题。如下是resize的核心实现,

Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }

在实现中会使用两个临时链表,分别存储新地址和旧地址的链表,最后将这两个链表放到对应的位置。

假定出现以下的状况,有ABC三个元素须要移动,首先线程1指向A,next即为B,此后线程2一样进行resize,并把high/low两个链表的更新完成,这时返回线程1继续运行。

Image 3.jpeg

可是线程1仍然按照正常的流程继续,A会被放到High链表,B会被放到Low链表,这以后因为B后面没有元素,更新完成,所以C就漏掉了。

其实无论是JDK7仍是8,因为链表的不少操做都没有加锁,每一个操做也不是原子操做,致使可能出现不少意想不到的结果,也是为何须要引入专门的ConcurrentHashMap。

ConcurrentHashMap介绍

为何不使用HashTable?

以前介绍的HashTable也能保证线程安全,可是HashTable使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,因此竞争越激烈效率越低。正由于如此,须要引入更加高效的多线程解决方案。

ConcurrentHashMap的结构在JDk1.7和1.8中有较大的不一样,下面将会分别进行介绍。

JDK1.7中的实现

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每一个Segment里包含一个HashEntry数组,咱们称之为table,每一个HashEntry是一个链表结构的元素。

Segment实际继承自可重入锁(ReentrantLock),这是与普通HashMap的最大区别。

Picture1.png

面试点:ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提高?

ConcurrentHashMap容许多个修改操做并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不一样部分进行的修改。内部使用段(Segment)来表示这些不一样的部分,每一个段其实就是一个小的hash table,只要多个修改操做发生在不一样的段上,它们就能够并发进行。

1.1 初始化过程

初始化有三个参数:

  • initialCapacity:初始容量大小 ,默认16。
  • loadFactor, 扩容因子或者叫负载因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。
  • concurrencyLevel 并发度:默认16。并发度能够理解为程序运行时可以同时操做ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。若是并发度设置的太小,会带来严重的锁竞争问题;若是并发度设置的过大,本来位于同一个Segment内的访问会扩散到不一样的Segment中,CPU cache命中率会降低,从而引发程序性能降低。

如下是对初始化函数的分析:

Screen Shot 2019-12-09 at 10.47.06 PM.png

1.2 Hash值计算

对某个元素进行Put/Get操做以前,都须要定位该元素在哪一个segment元素的某个table元素中的,定位的过程,取得key的hashcode值进行一次再散列(经过Wang/Jenkins算法),拿到再散列值后,以再散列值的高位进行取模获得当前元素在哪一个segment上。

Screen Shot 2019-12-09 at 11.10.10 PM.png

具体的Hash实现以下:

Screen Shot 2019-12-09 at 11.12.01 PM.png

1.3 Get方法

定位segment和定位table后,依次扫描这个table元素下的的链表,要么找到元素,要么返回null。

在高并发下的状况下如何保证取得的元素是最新的?

用于存储键值对数据的HashEntry,在设计上它的成员变量value等都是volatile类型的,这样就保证别的线程对value值的修改,get方法能够立刻看到。

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }

1.4 Put方法

一、首先定位segment,当这个segment在map初始化后,还为null,由ensureSegment方法负责填充这个segment。

二、对Segment加锁,虽然value是volatile的,只能保证可见性,不能保证原子性。这里put操做不是原子操做,所以须要加锁。

Picture1.png

三、定位所在的table元素,并扫描table下的链表,找到时:

Picture1.png

注意到默认onlyIfAbsent为false,也就是若是有相同key的元素,会覆盖旧的值。不管是否覆盖,都是返回旧值。

没有找到时:

Picture1.png

1.5 扩容操做

扩容操做不会扩容Segment,只会扩容对应的table数组,每次都是将数组翻倍。

Picture1.png

以前也提到过,因为数组长度为2次幂,因此每次扩容以后,元素要么在原处,要么在原处加上偏移量为旧的size的新位置。

1.6 Size方法

size的时候进行两次不加锁的统计,两次一致直接返回结果,不一致,从新加锁再次统计,

Screen Shot 2019-12-10 at 11.49.53 AM.png

ConcurrentHashMap的弱一致性

get方法和containsKey方法都是经过对链表遍历判断是否存在key相同的节点以及得到该节点的value。但因为遍历过程当中其余线程可能对链表结构作了调整,所以get和containsKey返回的多是过期的数据,这一点是ConcurrentHashMap在弱一致性上的体现。

JDK1.8中的实现

相比JDK1.7的重要变化:

一、取消了segment数组,引入了Node结构,直接用Node数组来保存数据,锁的粒度更小,减小并发冲突的几率。
二、存储数据时采用了链表+红黑树的形式,纯链表的形式时间复杂度为O(n),红黑树则为O(logn),性能提高很大。何时链表转红黑树?当key值相等的元素造成的链表中元素个数超过8个的时候。

2.1 数据结构

  • Node:存放实际的key和value值。
  • sizeCtl:负数:表示进行初始化或者扩容,-1表示正在初始化,-N,表示有N-1个线程正在进行扩容
    正数:0 表示尚未被初始化,>0的数,初始化或者是下一次进行扩容的阈值。
  • TreeNode:用在红黑树,表示树的节点, TreeBin是实际放在table数组中的,表明了这个红黑树的根。

Picture1.png

ConcurrentHashMap在初始化时,只是给成员变量赋值,put时进行实际数组的填充。

2.2 Hash计算

先计算key的hash值,而后将高位加入计算来进行再散列。

Picture1.png

Picture1.png

2.3 Get方法

首先计算hash值,肯定在table中的位置。

  • 是否恰好在table中某个首元素,找到返回;
  • 在树中查找
  • 在链表中查找

Picture1.png

注意到在初始化TreeBin,也就是设置红黑树所在的Node的第一个节点时,会设置对应的hash值,这些hash值定义以下。因此上面的代码中,能够经过判断首节点的hash值<0来肯定该节点为树。

static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations

2.4 Put方法

PUT方法中会实际初始化数组,

Screen Shot 2019-12-10 at 8.56.23 PM.png
Screen Shot 2019-12-10 at 9.17.20 PM.png

2.5 扩容操做

线程执行put操做,发现容量已经达到扩容阈值,须要进行扩容操做。ConcurrentHashMap支持并发扩容,实现方式是,将表拆分,让每一个线程处理本身的区间。以下图:

Screen Shot 2019-12-10 at 9.23.42 PM.png

迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其余线程,此节点已经迁移完毕。此时线程2访问到了ForwardingNode节点,若是线程2执行的put或remove等写操做,那么就会先帮其扩容。若是线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。

2.6 Size

Put操做时,addCount 方法用于 CAS 更新 baseCount,但颇有可能在高并发的状况下,更新失败,那么这些节点虽然已经被添加到哈希表中了,可是数量却没有被统计。

当更新 baseCount 失败的时候,会调用 fullAddCount 将这些失败的结点包装成一个 CounterCell 对象,保存在 CounterCell 数组中。

整张表实际的 size 实际上是 baseCount 加上 CounterCell 数组中元素的个数。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}

具体的计算count方法,

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

和JDK1.7同样,这样获得的size也只是大概数字,也具备弱一致性。

5.3 ConcurrentSkipListMap

ConcurrentSkipListMap是一个并发安全, 基于skiplist实现有序存储的Map。能够当作是TreeMap的并发版本。

ConcurrentHashMap采用空间换取时间, 但它有着ConcurrentHashMap不能比拟的优势: 有序数据存储.

SkipList的结构以下图所示:

30222128-045c88b7e992443395a540ba2eb740f3.jpg

从图中能够得出ConcurrentSkipListMap的几个特色:

  1. ConcurrentSkipListMap 的节点主要由 Node, Index, HeadIndex 构成;
  2. ConcurrentSkipListMap 的数据结构横向纵向都是链表
  3. 最下面那层链表是Node层(数据节点层), 上面几层都是Index层(索引)
  4. 从纵向链表来看, 最左边的是 HeadIndex 层, 右边的都是Index 层, 且每层的最底端都是对应Node, 纵向上的索引都是指向最底端的Node。

5.4 ConcurrentSkipListSet

ConcurrentSkipListSet基于ConcurrentSkipListMap实现,相似于TreeSet基于TreeMap实现。

5.5 ConcurrentLinkedQueue

ConcurrentLinkedQueue实现了一个高并发的队列,底层使用链表做为其数据结构。从性能角度看,能够算是高并发环境下性能最好的队列了。

ConcurrentLinkedQueue类中,核心节点Node的定义以下,item表示目标元素,next表示当前Node的下一个元素。

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

add,offer将元素插入到尾部,其中add实现上直接调用了offer。peek方法拿头部的数据,可是不移除和poll拿头部的数据,可是同时移除。

5.6 CopyOnWriteArrayList

CopyOnWrite(写时复制)的容器。通俗的理解是当咱们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,而后新的容器里添加元素,添加完元素以后,再用新的容器替换旧的容器。

好处是咱们能够对容器进行并发的读,而不须要加锁,由于当前容器不会添加任何元素。因此写时复制容器也是一种读写分离的思想,读和写不一样的容器。若是读的时候有多个线程正在向容器添加数据,读仍是会读到旧的数据,由于写的时候不会锁住旧的,只能保证最终一致性。

下面介绍一下写的过程,

/**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

首先,写入操做使用锁,主要是为了控制写写的状况。接着进行新数组的复制,将新的元素加入newElements,最后使用新的数组替换老的数组,修改就完成了。整个过程不会影响读取,而且修改完成之后,读取线程能够“觉察”到这个修改,由于array是volatile类型,保证了可见性。

/** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

容器的适用场景:适用读多写少的并发场景,常见应用:白名单/黑名单,商品类目的访问和更新场景。可是因为会复制旧的数组,全部可能存在内存占用问题。

5.7 CopyOnWriteArraySet

CopyOnWriteArraySet基于CopyOnWriteArrayList实现,为了保证数据的惟一性,在往其中加入数据时,会check当前数组中是否存在该元素,若是不存在,则加入到当前数组。

/**
     * Appends the element, if not present.
     *
     * @param e element to be added to this list, if absent
     * @return {@code true} if the element was added
     */
    public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
            addIfAbsent(e, snapshot);
    }

5.8 阻塞队列

定义与经常使用操做

阻塞队列(BlockingQueue)是一个支持两个附加操做的队列。这两个附加的操做是:

  • 在队列为空时,获取元素的线程会等待队列变为非空。
  • 当队列满时,存储元素的线程会等待队列可用。

阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException("Queue full") 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,若是没有则返回 null
  • 一直阻塞:当阻塞队列满时,若是生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,若是超过必定的时间,生产者线程就会退出。

Java里的阻塞队列

JDK7 提供了 7 个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

1. ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的全部生产者线程或消费者线程,当队列可用时,能够按照阻塞的前后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先阻塞的消费者线程,能够先从队列里获取元素。一般状况下为了保证公平性会下降吞吐量。咱们可使用如下代码建立一个公平的阻塞队列:

2. LinkedBlockingQueue

一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

3. PriorityBlockingQueue

一个支持优先级的无界队列。默认状况下元素采起天然顺序排列,也能够经过比较器 comparator 来指定元素的排序规则。元素按照升序排列。

4. DelayQueue

一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。咱们能够将 DelayQueue 运用在如下应用场景:

  • 缓存系统的设计:能够用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。

  • 定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,从好比 TimerQueue 就是使用 DelayQueue 实现的。

队列中的 Delayed 必须实现 compareTo 来指定元素的顺序。好比让延时时间最长的放在队列的末尾。

5. SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每个 put 操做必须等待一个 take 操做,不然不能继续添加元素。SynchronousQueue 能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合于传递性场景, 好比在一个线程中使用的数据,传递给另一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

6. LinkedTransferQueue

是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其余阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  • transfer 方法。若是当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法能够把生产者传入的元素马上 transfer(传输)给消费者。若是没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。transfer 方法的关键代码以下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代码是试图把存放当前元素的 s 节点做为 tail 节点。第二行代码是让 CPU 自旋等待消费者消费元素。由于自旋会消耗 CPU,因此自旋必定的次数后使用 Thread.yield() 方法来暂停当前正在执行的线程,并执行其余线程。

  • tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法不管消费者是否接收,方法当即返回。而 transfer 方法是必须等到消费者消费了才返回。

对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,则是试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待指定的时间再返回,若是超时还没消费元素,则返回 false,若是在超时时间内消费了元素,则返回 true。

7. LinkedBlockingDeque

一个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的两端插入和移出元素。双端队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。可是 take 方法却等同于 takeFirst,不知道是否是 Jdk 的 bug,使用时仍是用带有 First 和 Last 后缀的方法更清楚。

在初始化 LinkedBlockingDeque 时能够设置容量防止其过渡膨胀。另外双向阻塞队列能够运用在“工做窃取”模式中。

阻塞队列的实现原理

在介绍阻塞队列的实现以前,先介绍一下生产者与消费者模式:

生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

生产者和消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是经过阻塞队列来进行通讯,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。

1)当队列满的时候,插入元素的线程被阻塞,直达队列不满。
2)队列为空的时候,获取元素的线程被阻塞,直到队列不空。

JDK是如何让生产者和消费者可以高效率的进行通信呢?

答案是使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

以ArrayBlockingQueue为例:

private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
        // 省略其余代码 
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
}

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
  } finally {
            lock.unlock();
        }
}

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }
    
private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

从上述代码能够看到,当队列为空,notEmpty进行等待;插入元素后,唤醒等待的线程。当队列满时,notFull进行等待;删除元素后,唤醒等待的线程。


参考:

  • https://www.infoq.cn/article/java-blocking-queue

本文由『后端精进之路』原创,首发于博客 http://teckee.github.io/ , 转载请注明出处

搜索『后端精进之路』关注公众号,马上获取最新文章和价值2000元的BATJ精品面试课程

后端精进之路.png

相关文章
相关标签/搜索