java并发编程——并发容器

概述

java cocurrent包提供了不少并发容器,在提供并发控制的前提下,经过优化,提高性能。本文主要讨论常见的并发容器的实现机制和绝妙之处,但并不会对全部实现细节面面俱到。html

为何JUC须要提供并发容器?

java collection framework提供了丰富的容器,有map、list、set、queue、deque。可是其存在一个不足:多数容器类都是非线程安全的,即便部分容器是线程安全的,因为使用sychronized进行锁控制,致使读/写均需进行锁操做,性能很低。java

java collection framework能够经过如下两种方式实现容器对象读写的并发控制,可是都是基于sychronized锁控制机制,性能低:node

1. 使用sychronized方法进行并发控制,如HashTable 和 Vector。如下代码为Vector.add(e)的java8实现代码:算法

    public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

2.使用工具类Collections将非线程安全容器包装成线程安全容器。如下代码是Collections.synchronizedMap(Map<K,V> m)将原始Map包装为线程安全的SynchronizedMap,可是实际上最终操做时,仍然是在被包装的原始m上进行,只是SynchronizedMap的全部方法都加上了synchronized锁控制。编程

    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
        return new SynchronizedMap<>(m);   //将原始Map包装为线程安全的SynchronizedMap
    }
    private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {

        private final Map<K,V> m;       // Backing Map 原始的非线程安全的map对象
        final Object      mutex;        // Object on which to synchronize  加锁对象

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        public V get(Object key) {      
            synchronized (mutex) {return m.get(key);} //全部方法加上synchronized锁控制
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);} //全部方法加上synchronized锁控制
        }
......
}

为了提供高效地并发容器,java 5在java.util.cocurrent包中 引入了并发容器。api

JUC并发容器

本节对juc经常使用的几个并发容器进行代码分析,重点看下这些容器是如何高效地实现并发控制的。在进行具体的并发容器介绍以前,咱们提早搞清楚CAS理论是什么东西。由于在juc并发容器的不少地方都使用到了CAS,他比加锁处理更加高效。数组

CAS

CAS是一种无锁的非阻塞算法,全称为:Compare-and-swap(比较并交换),大体思路是:先比较目标对象现值是否和旧值一致,若是一致,则更新对象为新值;若是不一致,则代表对象已经被其余线程修改,直接返回。算法实现的伪码以下:安全

function cas(p : pointer to int, old : int, new : int) returns bool {
    if *p ≠ old {
        return false
    }
    *p ← new
    return true
}

参考自wiki:Compare-and-swap数据结构

ConcurrentHashMap

ConcurrentHashMap实现了HashTable的全部功能,线程安全,但却在检索元素时不须要锁定,所以效率更高。多线程

ConcurrentHashMap的key 和 value都不容许null出现。缘由在于ConcurrentHashMap不能区分出value是null仍是没有map上,相对的HashMap却能够容许null值,在于其使用在单线程环境下,可使用containKey(key)方法提早断定是否能map上,从而区分这两种状况,可是ConcurrentHashMap在多线程使用上下文中则不能这么断定。参考:关于ConcurrentHashMap为何不能put null

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

ConcurrentHashMap个put和get方法,细节请看代码对应位置的注释。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin,当前hash对应的bin(桶)还不存在时,使用cas写入; 写入失败,则再次尝试。
            }
            else if ((fh = f.hash) == MOVED) //若是tab[i]不为空而且hash值为MOVED,说明该链表正在进行transfer操做,返回扩容完成后的table
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {          // 加锁保证线程安全,但不是对整个table加锁,只对当前的Node加锁,避免其余线程对当前Node进行写操做。 if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { //若是在链表中找到值为key的节点e,直接设置e.val = value便可
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e; //若是没有找到值为key的节点,直接新建Node并加入链表便可
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  //若是首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操做
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD) //若是节点数大于阈值,则转换链表结构为红黑树结构
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); //计数增长1,有可能触发transfer操做(扩容)
     return null; 
    }
  transient volatile Node<K,V>[] table; //元素所在的table是volatile类型,线程间可见


public V get(Object key) { //get无需更改size和count等公共属性,加上table是volatile类型,故而无需加锁。 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

思考一个问题:为何当新加Node对应的‘桶’不存在时能够直接使用CAS操做新增该桶,并插入新节点,可是当新增Node对应的‘桶’存在时,则必须加锁处理?

参考资料:Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进

               java1.7 ConcurrentHashMap实现细节

附上HashMap jdk 1.8版本中的实现原理讲解,讲的很细也很通俗易懂:Jdk1.8中的HashMap实现原理

ConcurrentLinkedQueue

ConcurrentLinkedQueue使用链表做为数据结构,它采用无锁操做,能够任务是高并发环境下性能最好的队列。

ConcurrentLinkedQueue是非阻塞线程安全队列,无界,故不太适合作生产者消费者模式,而LinkedBlockingQueue是阻塞线程安全队列,能够作到有界,一般用于生产者消费者模式。

下面看下其offer()方法的源码,体会下:不使用锁,只是用CAS操做来保证线程安全。细节参考代码对应位置的注释。

    
/**
* 不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点
     */
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { //不断尝试:找到最新的tail节点,不断尝试想最新的tail节点后面添加新节点。 Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time //t引用有可能并非真实的tail节点的引用,多线程操做时,容许该状况出现,只要能保证每次新增元素是在真实的tail节点上添加的便可。 casTail(t, newNode); // Failure is OK. 即便失败,也不影响下次offer新的元素,反正后面会试图寻找到最新的真实tail元素 return true; } // Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试 } else if (p == q) //遇到哨兵节点(next和item相同,空节点或者删除节点),从head节点从新遍历。确保找到最新的tail节点 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //java中'!='运算符不是原子操做,故使用t != (t = tail)作一次断定,若是tail被其余线程更改,则直接使用最新的tail节点返回。 } }

CopyOnWriteArrayList

CopyOnWriteArrayList提供高效地读取操做,使用在读多写少的场景。CopyOnWriteArrayList读取操做不用加锁,且是安全的;写操做时,先copy一份原有数据数组,再对复制数据进行写入操做,最后将复制数据替换原有数据,从而保证写操做不影响读操做。

下面看下CopyOnWriteArrayList的核心代码,体会下CopyOnWrite的思想:

public class CopyOnWriteArrayList<E>    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();
    /**
     * Sets the array.
     */
    final void setArray(Object[] a) {
        array = a;
    }

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    public E get(int index) {
        return get(getArray(), index);
    }

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();   //写 互斥 读
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;  //对副本进行修改操做
            setArray(newElements); //将修改后的副本替换原有的数据
            return true;
        } finally {
            lock.unlock();
        }
    }

}

 ConcurrentSkipListMap

SkipList(跳表)是一种随机性的数据结构,用于替代红黑树,由于它在高并发的状况下,性能优于红黑树。跳表其实是以空间换取时间。跳表的基本模型示意图以下:

ConcurrentSkipListMap的实现就是实现了一个无锁版的跳表,主要是利用无锁的链表的实现来管理跳表底层,一样利用CAS来完成替换。

参考资料

从零单排 Java Concurrency, SkipList&ConcurrnetSkipListMap

相关文章
相关标签/搜索