Java并发6:阻塞队列,Fork/Join框架

阻塞队列

阻塞队列是一个支持两个附加操做的队列。这两个附加的操做支持阻塞的插入和移除方法:java

  • 支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空

阻塞队列经常使用于生产者消费者的场景。其中生产者是向队列添加元素的线程,消费者是从队列取出元素的线程,阻塞队列是存放和获取元素的容器。git

阻塞队列的4种处理方式:github

  1. 抛出异常:
    • add(e) 当队列满,再插入元素,抛出异常
    • remove() 当队列空,再删除元素,抛出异常
    • element() 获取元素
  2. 返回特殊值:
    • offer(e) 插入元素时,插入成功返回true
    • poll() 移除元素,成功返回该值,不然返回null
    • peek()
  3. 一直阻塞:
    • put(e) 当阻塞队列满时,再插入时会阻塞生产者线程,直到队列可用或中断退出
    • take() 当队列空,再移除元素会阻塞消费者线程,直到队列不空
  4. 超时退出
    • offer(e, time, unit) 队列满时再插入元素,阻塞,超时退出
    • poll(time, unit) 队列空时移除元素,阻塞,超时退出

Java中几种阻塞队列

  • ArrayBlockingQueue: 数组结构构成的有界 FIFO 阻塞队列
  • LinkedBolckingQueue: 链表结构构成的有界 FIFO 阻塞队列
  • PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
  • DelayQueue: 支持延时获取元素,使用优先级队列实现的无界阻塞队列
  • SynchronousQueue: 不存储元素的阻塞队列,不为队列元素维护存储空间
  • LinkedTransferQueue: 链表结构构成的无界阻塞队列
  • LinkedBlockingDeque: 链表构成的双向阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界的,按照 FIFO 原则对元素排序的阻塞队列。它还支持对等待的生产者和消费者线程进行排序时的可选公平策略,默认状况下不保证线程公平的访问,在构造时能够选择公平策略。公平性会下降吞吐量,可是减小了可变性和避免了“不平衡性”。算法

LinkedBlockingQueue

这是一个用链表实现的有界阻塞队列,默认长度和最大长度都是 Integer.MAX_VALUE 。该队列也是按照 FIFO 原则对元素排序,肯定线程执行的前后顺序。编程

PriorityBlockingQueue

这是一个支持优先级的无界祖苏队列,默认状况下采起天然顺序升序排序,也能够经过构造函数指定 Comparator 来对元素进行排序。可是它不能保证相同优先级元素的顺序。数组

底层是采用二叉最大堆来实现优先级排序的。缓存

DelayQueue

这是一个支持延时获取元素的无界阻塞队列,其队列使用优先队列 PriorityQueue 实现。队列中的元素必须实现 Delayed 接口,建立元素时能够指定多久以后才能从队列中获取该元素,只有在元素到期时才能获取。并发

主要用于缓存,如清除缓冲中超时的数据。还用于定时任务的调度。框架

元素建立时,要实现 Delayed 接口,首先进行初始化;而后实现 getDelay(Timeunit unit)方法,返回的值是当前元素还须要延时多长时间;最后实现compareTo(Delayed other)方法,用来指定元素的顺序。ide

当消费者从队列中获取元素时,若是元素尚未到延时时间,就阻塞当前线程。此外,设置了 leader 变量表示等待获取队列头部元素的线程。若是 leader 不为空,表示有现成等待获取队列头部元素,使用 await() 方法让当前线程等待信号。若是 leader 为空,则把当前线程设置为 leader,使用 awaitNanos() 方法让当前线程等待接收信号或等待 delay 时间。

SynchronousQueue

与其余阻塞队列不一样,这是一个不存储元素的阻塞队列,每个 put 操做必需要等待一个take操做,不然不能继续添加元素,反之亦然。分为公平和不公平访问队列,默认状况采用非公平性策略访问队列。

该种队列自己不存储任何元素,适合传递性场景,把生产者线程处理的数据直接传递给消费者线程,其吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

这是一个由链表结构组成的 FIFO 的无界阻塞 TransferQueue 队列。它采起一种预占模式,也就是有就直接拿走,没有就占着这个位置直到拿到、超时或中断。相对于其余阻塞队列,多了 tryTransfer 方法和 transfer 方法。

  • transfer(e,[timeout,unit]) 方法: 若是当前有消费者正等待接收元素,该方法能够把生产者传入的元素马上传输给消费者。若是没有消费者等待,该方法将元素存放在队列的 tail 节点,等到该元素被消费者消费了才返回。
  • tryTransfer(e,[timeout,unit])方法: 试探生产者传入的元素是否能直接传给消费者。若是没有消费者等待接收元素,返回false。该方法不管消费者是否接收都当即返回,而 transfer 方法必须等消费了才返回。

LinkedBlockingDeque

是一个由链表组成的双向阻塞队列。能够从队列两端插入和移除元素。

Fork/Join框架

该框架主要应用在并行计算中,把一个大人物分割成若干个小任务,最终汇总每一个小任务结果后获得大结果的框架。Fork 就是把一个大任务切分红若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最终获得这个大任务的结果。

工做窃取算法

工做窃取是指某个线程从其余队列里窃取任务来执行。一般使用双端队列,被窃取任务线程永远从双端队列头部拿任务执行,窃取任务的线程永远从双端队列尾部拿任务执行。

优势是充分利用线程进行并行计算,减小了线程间的竞争。缺点是在某些状况下存在竞争,好比队列只有一个任务时,会消耗更多的资源。

框架设计思路

首先,分割任务,将一个大任务分割成子任务,不停分割直到分割出的子任务足够小。

而后,执行任务并合并结果。分割的子任务分别放在双端队列,而后几个启动线程分别从双端队列获取任务执行。执行结果放在一个队列里,启动一个线程从队列拿数据,而后合并这些线程。

示例

public class ForkJoinCase extends RecursiveTask<Integer> {
    private final int threshold=5;
    private int first;
    private int last;

    public ForkJoinCase(int first,int last){
        this.first=first;
        this.last=last;
    }


    @Override
    protected Integer compute() {
        int ret=0;
        if(last-first<=threshold){//任务足够小,执行
            for(int i=first;i<=last;i++){
                ret+=i;
            }
        }else{//分解任务
            int mid=first+(last-first)/2;
            ForkJoinCase leftTask=new ForkJoinCase(first,mid);
            ForkJoinCase rightTask=new ForkJoinCase(mid+1,last);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            //合并子任务结果
            ret=leftTask.join()+rightTask.join();
        }
        return ret;
    }
}
复制代码

参考资料

相关文章
相关标签/搜索