第六章 Java并发容器和框架

ConcurrentHashMap的实现原理与使用

ConcurrentHashMap是线程安全且高效的hashmap。本节让咱们一块儿研究一下该容器是如何在保证线程安全的同时又能保证高效的操做。node

为何要使用ConcurrentHashMap

在并发编程中使用HashMap可能致使程序死循环。而使用线程安全的HashTable效率又很是低下,基于以上两个缘由,便有了ConcurrentHashMap的登场机会。算法

(1)线程不安全的HashMap编程

在多线程环境下,使用HashMap进行put操做会引发死循环,致使CPU利用率接近100%,因此在并发状况下不能使用HashMap。例如一下代码数组

    final HashMap<String, String> map = new HashMap<>(2);
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        map.put(UUID.randomUUID().toString(), "");
                    }
                }, "ftf" + i).start();
            }
        }
    }, "ftf");
    thread.start;
    thread.join;

HashMap在并发执行put操做时会引发死循环,是由于多线程会致使HashMap的Entry链表造成环形数据结构,一旦造成环形数据结构,Entry的next节点永远不会为空,就会产生死循环获取Entry.缓存

(2)效率低下的HashTable安全

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的状况下HashTable的效率很是低下。由于当一个线程访问HashTable的同步方法,其余线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。数据结构

(3)ConcurrentHashMap的锁分段技术可有效提高并发访问率多线程

HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由是全部访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器的一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效提升并发访问率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分红一段一段地存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余数据段也能被其余线程访问。并发

 

ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于储存键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashEntry相似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素,每一个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先得到与它对于的Segment锁。app

 

ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是经过initialCapacity,loadFactor和concurrencyLevel等几个参数来初始化Segment数组、段偏移量segmentShift、段掩码segmentMask和每一个segment里的HashEntry数组来实现的。

初始化segments数组:让咱们来看一下初始化segments数组的源代码

if(concurrencyLevel > MAX_SEGMENTS)  concurrencyLevel = MAX_SEGMENTS 
 int sshift = 0;
        int ssize = 1;
        while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
            ++sshift;
            ssize <<= 1;
        }
        int segmentShift = 32 - sshift;
        int segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

由上面代码可知,segments数组的长度ssize是经过concurrencyLevel计算得出的。为了能经过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方,因此必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值来做为segments数组的长度。concurrencyLevel的最大值是65535,这意味着segments数组的长度最大为65536,对应的二进制是16位。

初始化segmentShift和segmentMask:这两个全局变量须要在定位segment时的散列算法里使用,sshift等于ssize从1向左移位的次数,在默认状况下concurrencyLevel等于16,1须要向左移位移动4次,因此sshift等于4。segmentShift用于定位参与散列运算的位数,segmentShift等于32减去sshift,因此等于28,这里之因此用32是由于ConcurrentHashMap里的hash()方法输出的最大数是32位的。segmentMask是散列运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1.由于ssize的最大长度是65536,因此segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每一个位都是1.

初始化每一个segment:输入参数initalCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一个segment的负载因子,在构造方法里须要经过两个参数来初始化数组中的每一个segment。

 

定位segment

既然ConcurrentHashMap使用分段锁Segment来保护不一样段的数据,那么在插入和获取元素的时候,必须先经过散列算法定位到segment。ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。之因此进行再散列,目的是减小散列冲突,使元素可以均匀地分布在不一样的Segment上,从而提升容器的存取效率。假如散列的质量差到极点,那么全部的元素都在一个Segment中,不只存取元素缓慢,分段锁也会失去意义。

默认状况下segmentShift为28,segmentMask为15,再散列后的数量最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到散列运算中,(hash>>>segmentShit)&segmentMask的运算结果分别是4,15,7和8,能够看到散列值没有发生冲突。

 

ConcurrentHashMap的操做

get操做:Segment的get操做实现很是简单和高效。先通过一次再散列,而后使用这个散列值经过散列运算定位到Segment,再经过散列算法定位到元素,代码以下

public V get(Object key){
 int hash = hash(key.hashCode());
 return segmentFor(hash).get(key,hahsh)   
}

get操做的高效之处在于整个get过程不须要加锁,除非读到的值是空才会加锁重读。它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segment大小的count字段和用于存储值的HashEntry的value,定义成volatile的变量,可以在线程之间保持可见性,可以被多线程同时读,而且保证不会读到过时的值,可是只能被单线程写(有一种状况能够被多线程写,就是写入的值不依赖与原值),在get操做里只须要读不须要写共享变量count和value,因此能够不用加锁。之因此不会读到过时的值,是由于根据Java内存模型的happen before原则,对volatile字段的写入操做先于读操做,即便两个线程同时修改和获取volatile变量,get操做也能拿到最新的值,这是用volatiel替换锁的经典应用场景。

transient volatile int count;
volatile V value;

在定位元素的代码里咱们能够发现,定位HashEntry和定位Segment的散列算法虽然同样,都与数组的长度减去1再相“与”,可是想“与”的值不同,定位segment使用的是元素的hashcode经过再散列后获得的值的高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值同样,虽然元素在Segment里散列开了,可是却没有再HashEntry里散列开。

put操做:因为put方法里须要对共享变量进行写入操做,因此为了线程安全,在操做共享变量时必须加锁。put方法首先定位到Segment,而后再Segment里进行插入操做。插入操做须要经历两个步骤,第一步判断是否须要对Segment里的HashEntry数组进行扩容,第二步添加元素的位置,而后将其放在HashEntry里。

(1)是否须要扩容:在插入元素前会先判断Segment里的HashEntry数组是否超过容量,若是超过阙值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,由于HashMap是在插入元素后判断元素是否已经到达容量的,若是达到了就进行扩容,可是颇有可能扩容以后没有新元素插入,这时HashMap就进行了一次无效的扩容,

(2)如何扩容:在扩容的时候,首先会建立一个容量是原来容量两倍的数组,而后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

(3)size操做:若是要统计整个ConcurrentHashMap里元素的大小,就必须统计全部Segment里元素的大小后求和。Segment里的全局变量count是一个colatile变量,那么在多线程场景下,是否是直接把全部的Segment的count相加就能够获得整个ConcurrentHashMap大小呢?不是的,虽然相加时能够获取每一个Segment的count最新值,可是可能累加前使用count发生了变化,那么统计结果就不许了。因此,最安全的作法是在统计size的时候把全部Segment的put,remone和clean方法所有锁住,可是这种作法显然很是抵消。由于在累加count操做过程当中,以前累加过的count发生变化的概率很是小,因此ConcurrentHashMap的作法是先尝试2次经过不锁住Segment的方式来统计各个Segment大小,若是统计的过程当中,容器的count发生了变化,则再采用加锁的方式来统计全部Segment大小。ConcurrentHashMap如何判断统计时候发生了变化呢?使用modCount变量,在put,remove和clean方法里操做元素前都会将变成modCount进行加1,那么在统计size先后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

 

ConcurrentLinkedQueue

在并发编程中,有时候须要使用线程安全的队列。若是要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另外一种是使用非阻塞算法。使用阻塞算法的队列能够用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用于不一样的锁)等方式实现。非阻塞的实现方式则可使用循环CAS的方式实现。

ConcurrentLinkedQueue是一个基于连接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当咱们添加一个元素的时候,它会添加到队列的尾部;当咱们获取一个元素时候,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,概算在Michael&Scoft算法上进行了一些修改。

ConcurrentLinkedQueue的结构

ConcurrentLinkedQueue由head节点和tail节点组成,每一个节点(node)由节点元素(item)和指向下一个节点(next)的引用组成,节点和节点之间就是经过这个next关联起来,从而组成一张连接结构的队列。默认状况下head节点存储的元素为空,tail节点等于head节点。

private transient volatile Node<E> tail = head;

入队列

(1)入队列的过程

入队列就是将入队节点添加到队列的尾部。为了方便理解入队时队列的变化,以及head节点和tail几点的变化,这里以一个示例来展开介绍。假设咱们想在一个队列依次插入4个节点,每添加一个节点就作了一个队列的快照图,以下

  1. 添加元素1。队列更新head节点的next节点为元素1节点。又由于tail节点默认状况下等于head节点,因此它们的next节点都指向元素的1节点
  2. 添加元素2。队列首先设置元素1节点的next节点为元素2节点,而后更新tail节点指向元素2节点
  3. 添加元素3。设置tail节点的next节点为元素3节点
  4. 添加元素4。设置元素3的next几点为元素4节点,而后将tail节点指向元素4节点

经过调试入队过程并观察head节点和tail节点的变化,发现入队主要作两件事情:

第一是将入队节点设置成当前队列尾节点的下一个节点;

第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,反之,则将入队及诶单设置成tail节点的next节点,因此节点不老是尾节点。

若是是多线程进入入队,咱们经过源码来详细分析它是如何使用CAS算法来入队的

 /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

从源代码角度来看,整个入队过程主要作两件事情:

第一是定位出尾节点;

第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试

(2)定位尾节点

tail节点并不老是尾节点,因此每次入队都必须先经过tail节点来找到尾节点,尾节点多是tail节点,也多是tail节点的next节点,代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点的多是尾节点。获取tail节点的next节点须要注意的是p节点等于p的next节点状况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚刚初始化,正准备添加节点,因此须要返回head节点,获取p节点的next节点代码以下

   final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

(3)设置入队节点为尾节点

p.casNext(null, n)方法用于将入队及诶单设置为当前队列尾尾节点的next节点,若是p是null,表示p是当前队列的尾节点,若是不为null,表示有其余线程更新了尾节点,则须要从新获取当前队列的尾节点

(4)hops的设计意图

上面分析过对于先进先出的队列入队所要作的事情是将入队节点设置成尾节点,dong lea写的代码和逻辑仍是稍微有点复杂,那么咱们用一下方式来实现是否可行?

    public boolean offer(E e){
        if(e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        for(;;){
            Node<E> t = tail;
            if(t.casNext(null, n) && casTail(t, n)) return true;
        }
    }

让tail节点永远做为队列的尾节点,这样实现代码了很是少,并且逻辑清晰和易懂。可是,这么作有一个缺点,每次都须要使用循环CAS更新tail节点。若是能减小CAS更新tail节点的次数,就能提升入队的效率,全部dong lea使用hops变量来控制并减小tail节点的更新频率,并非每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长,使用CAS更新tail节点的次数就会越少,可是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,由于循环体须要多循环一次来定位出尾节点,可是这样仍然能提升入队的效率,由于从本质上来看它经过增长对volatile变量的读操做来减小对volatile变量的写操做,而对volatile变量的写操做开销要远远大于读操做,因此入队效率会有所提高。

注意:入队方法永远返回true,因此不要经过返回值判断入队是否成功

出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。每一个节点出队的快照以下

由图可知,并非每次出队时都更新head节点,当head节点里有元素时,直弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操做才会更新head节点。这种作法也是经过hops变量来减小使用CAS更新head节点的消耗,从而提升出队效率。

 

Java中的阻塞队列

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操做的队列,这两个附加的操做支持阻塞的插入和移除方法。

(1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满

(2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空

阻塞队列经常使用语生产者和消费者的场景,生产者向队里里添加元素,消费者是从队列里取元素的线程。阻塞队列就是生产者存放元素,消费者获取元素的容器。

在阻塞队列不可用时,这两个附加操做提供了4种处理方式

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用

抛出异常:当队列满时,若是再插入元素,会抛出IllegalStateException异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常

返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。若是是移除方法,则是从队列里取出一个元素,若是没有则返回null

一直阻塞:当阻塞队列满时,若是生产者往队列里put元素,队列会一直阻塞生产者,知道队里可用或者响应中断退出。当队列为空时,若是消费者从队列里take元素,队列会阻塞消费者线程,直到队列不为空

超时退出:当阻塞队列满时,若是生产者往队里里插入元素,队列会阻塞生产者线程一段时间,若是超过了指定的时间,生产者线程就会退出

注意:若是是无界阻塞队列,队里不可能会出现满的状况,因此使用put或offer方法永远不会被阻塞,并且使用offer方法时,该方法永远返回true

Java里的阻塞队列

JDK提供了7种阻塞队列:

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
  2. LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
  3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列
  5. SynchronousQueue:一个不存储元素的阻塞队列
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  7. LingkedBlockingDeque:一个由链表结构组成的双向阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。默认状况下不保证线程公平的访问队列。

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认状况下元素采起天然顺序升序排列。也能够自定义类实现cpmpareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序,须要注意的是不能保证同优先级元素的顺序

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素

DelayQueue很是有用,能够运用在如下场景:

  • 缓存系统的设计:能够用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,好比TimeQueue就是使用DelayQueue实现的

 SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素。

它支持公平访问队列,默认状况下现场采用非公平性策略访问队列。使用如下构造方法能够建立公平性访问SynchronousQueue,若是设置成true,则等待的线程会采用先进先出的顺序访问队列。SynchronousQueue能够当作是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列自己并不存储任何元素,很是适合传递性建立。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其余阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法

(1)tryTransfer

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。若是没有消费者等待接受元素,则返回false。和transfer方法的区别是tryTransfer方法不管消费者是否接受,方法当即返回,而transfer方法是必须等到消费者消费了才返回。

对于带有时间限制的tryTransfer(E e,long timeout, TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,可是若是没有消费者消费该元素则等待知道的时间再返回,若是超时尚未消费元素则返回false,反之返回true

(2)transfer

若是当前有消费者正在等待接受元素(消费者使用take()方法或带时间限制的poll()方法),transfer方法能够把生产者传入的元素马上transfer(传输)给消费者。若是没有消费者在等待接受元素,transfer方法将会存放在队列的tail节点,并等到该元素被消费者消费了才返回。

LingkedBlockingDeque

LingkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是能够从队列的两端插入和移出元素,双向队列由于多了一个操做队列的入口,在多线程同时入队时,也就减小了一半的竞争。相比其余阻塞队列,LingkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst和peekLast等方法,以First单词结尾的方法,表示插入。获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却等同于takeFirst,不知道是否是JDK的bug,使用时仍是用带有First和Last后缀的方法更清楚。

在初始化LingkedBlockingDeque时能够设置容量防止其过分膨胀。另外,双向阻塞队列能够运用在“工做窃取”模式中

 

阻塞队列的实现原理

若是队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。经过查看JDK源码发现ArrayBlockingQueue使用了Condition来实现

  /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }


    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

 

Fork/Join框架

什么是Fork/Join框架

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每一个小任务结果后获得大任务结果的框架。

工做窃取算法

工做窃取算法(work-stealing)是指某个线程从其余队列里窃取任务来执行。

优势:充分利用线程进行并行计算,减小了线程间的竞争

缺点:在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且该算法会消耗了更多的系统资源,好比建立多个线程和多个双端队列

 Fork/Join框架的设计

 步骤1:分割任务。首先咱们须要有一个fork类来把大任务分割成子任务,有可能子任务仍是很大,因此还须要不停地分割,直到分割出的子任务足够小

步骤2:执行任务并合并结果。分割的子任务分别放在双端队列里,而后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,而后合并这些数据

Fork/Join使用两个类来完成以上两个事情。

  1. ForkJoinTask:咱们要使用Fork/Join框架,必须首先建立一个Fork/Join任务。它提供在任务中执行fork()和join()操做的机制。一般状况下,咱们不须要直接继承ForkJoinTask类,只须要继承它的子类,Fork/Join框架提供了如下两个子类
    1. RecursiveAction:用于没有返回结果的任务
    2. RecursiveTask:用于有返回结果的任务
  2. ForkJoinPool:ForkJoinTask须要经过ForkJoinPool来执行

任务分割出的子任务会添加到当前工做线程所维护的双端队列中,进入队里的头部。当一个工做线程的队列里暂时没有任务时,它会随机从其余工做线程的队列的尾部获取一个任务。

 Fork/Join框架的异常处理

ForkJoinTask在执行的时候可能会抛出异常,可是咱们没办法在主线程里直接捕获异常,全部ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,而且能够经过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对方,若是任务被取消了则返回CancellationException。若是任务没有完成或者没有抛出异常则返回null

 Fork/Join框架的实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务

(1)ForkJoinTask的fork方法实现原理

当咱们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,而后当即返回结果。pushTask方法把当前任务存放在ForkJoinTask数组队列里。而后再调用ForkJoinPool的signalWork()方法唤醒或建立一个工做线程来执行任务。

(2)ForkJoinTask的join方法实现原理

join方法的主要做用是阻塞当前线程并等待获取结果。首先它调用了doJoin()方法,经过doJoin()方法获得当前任务的状态来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)

  • 若是任务状态是已完成,则直接返回任务结果
  • 若是任务状态是被取消,则直接抛出CancellationException
  • 若是任务时抛出异常,则直接抛出对应异常

在doJoin()方法里,首先经过查看任务的状态,看任务是否已经执行完成,若是执行完成,则直接返回任务状态;若是没有执行完,则从任务数组里取出任务并执行。若是任务顺利执行完成,则设置任务状态为NORMAL,若是出现异常,则记录异常。并将任务状态设置为EXCEPTIONAL

使用 Fork/Join框架

 让咱们经过一个简单的需求来使用Fork/Join框架,需求是:计算1+2+3+4的结果

使用Fork/Join框架首先要考虑到时如何分割任务,若是但愿每一个子任务最多执行两个数相加,那么咱们设置分割的阙值是2,因为是4个数字相加,因此Fork/Join框架会把这个任务fork成两个子任务,子任务一负责计算1+2,子任务而负责3+4,而后join两个子任务的结果。由于是有结果的任务,因此必须继承ResursiveTask,实现代码以下:

public class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 2;//阙值
    private int start;
    private int end;

    public CountTask(int start, int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //若是任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if(canCompute){
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        }else {
            //若是任务大于阙值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //等待子任务执行完,并获得其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一个计算任务,负责计算1+2+3+4
        CountTask countTask = new CountTask(1, 4);
        //执行一个任务
        Future<Integer> result = forkJoinPool.submit(countTask);
        try {
            System.out.println(result.get());
        }catch (InterruptedException e){
            
        }catch (ExecutionException e){
            
        }
    }
}

 

经过这个例子,咱们进一步理解ForkJoinTask,ForkJoinTask与通常任务的主要区别在于它须要实现compute方法,在这个方法里,首先须要判断任务是否足够小,若是足够小就直接执行任务。反之就必须分割成两个子任务,每一个子任务在调用fork方法时,又会进入compute方法,看着当前子任务是否须要继续分割成子任务,若是不须要继承分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并获得其结果。

相关文章
相关标签/搜索