阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 
2.PriorityQueue, (优先级队列) 
3.ConcurrentLinkedQueue, (基于链表的并发队列)算法

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列) 
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列) 
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列) 
9.SynchronousQueue                       (并发同步阻塞队列) 
—————————————————–设计模式

阻塞队列和生产者-消费者模式数组

阻塞队列(Blocking queue)提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。若是Queue已经满了,put方法会被阻塞直到有空间可用;若是Queue是空的,那么take方法会被阻塞,直到有元素可用。Queue的长度能够有限,也能够无限;无限的Queue永远不会充满,因此它的put方法永远不会阻塞。并发

阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“生产产品”和“消费产品”。该模式不会发现一个工做便当即处理,而是把工做置于一个任务(“to do”)清单中,以备后期处理。生产者-消费者模式简化了开发,由于它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不一样的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,于是简化了工做负荷的管理。工具

生产者-消费者设计是围绕阻塞队列展开的,生产者把数据放入队列,并使数据可用,当消费者为适当的行为作准备时会从队列中获取数据。生产者不须要知道消费者的省份或者数量,甚至根本没有消费者—它们只负责把数据放入队列。相似地,消费者也不须要知道生产者是谁,以及是谁给它们安排的工做。BlockingQueue可使用任意数量的生产者和消费者,从而简化了生产者-消费者设计的实现。最多见的生产者-消费者设计是将线程池与工做队列相结合。性能

阻塞队列简化了消费者的编码,由于take会保持阻塞直到可用数据出现。若是生产者不能足够快地产生工做,让消费者忙碌起来,那么消费者只能一直等待,直到有工做可作。同时,put方法的阻塞特性也大大地简化了生产者的编码;若是使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工做,从而给消费者时间来赶进进度。this

有界队列是强大的资源管理工具,用来创建可靠的应用程序:它们遏制那些能够产生过多工做量、具备威胁的活动,从而让你的程序在面对超负荷工做时更加健壮。编码

虽然生产者-消费者模式能够把生产者和消费者的代码相互解耦合,可是它们的行为仍是间接地经过共享队列耦合在一块儿了。spa

类库中包含一些BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与 LinkedList和ArrayList类似,可是却拥有比同步List更好的并发性能。PriorityBlockingQueue是一个按优先级顺序排序的队列,当你不但愿按照FIFO的属性处理元素时,这个PriorityBolckingQueue是很是有用的。正如其余排序的容器同样,PriorityBlockingQueue能够比较元素自己的天然顺序(若是它们实现了Comparable),也可使用一个 Comparator进行排序。线程

最后一个BlockingQueue的实现是SynchronousQueue,它根本上不是一个真正的队列,由于它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。由于SynchronousQueue没有存储能力,因此除非另外一个线程已经准备好参与移交工做,不然put和take会一直阻止。SynchronousQueue这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务做好准备。

生产者-消费者模式一样带来了一些性能方面的提升。生产者和消费者能够并发地执行,若是一个受限于I/O,另外一个受限于CPU,那么并发执行的所有产出会高于顺序执行的产出。

———— 
Deque是一个双端队列,容许高效地在头和尾部分别进行插入和移除。实现有ArrayDeque和LinkedBlockingDeque。 
正如阻塞队列适用于生产者-消费者模式同样,双端队列使它们自身与一种叫作窃取工做(work stealing)的模式相关联。一个消费者生产者设计中,全部的消费者只共享一个工做队列;在窃取工做的设计中,每个消费者都有一个本身的双端队列。若是一个消费者完成了本身双端队列中的所有工做,它能够窃取其余消费者的双端队列中的末尾任务。由于工做者线程并不会竞争一个共享的任务队列,因此窃取工做模式比传统的生产者-消费者设计有更佳的可伸缩性;大多数时候它们访问本身的双端队列,减小竞争。当一个工做者必需要访问另外一个队列时,它会从尾部截取,而不是从头部,从而进一步下降对双端队列的争夺。 
窃取工做刚好适合用于解决消费者与生产者同体的问题—-当运行到一个任务的某单元时,可能会识别出更多的任务。好比垃圾回收时对堆作了记号,能够并行使用窃取工做。当一个线程发现了一个新的任务单元时,它会把它放在本身队列的末尾;当双端队列为空时,它会去其余队列的队尾寻找新的任务,这样能确保每个线程都保持忙碌状态。

———————— 
非阻塞算法

基于锁的算法会带来一些活跃度失败的风险。若是线程在持有锁的时候由于阻塞I/O,页面错误,或其余缘由发生延迟,极可能全部的线程都不能前进了。 
一个线程的失败或挂起不该该影响其余线程的失败或挂起,这样的算法成为非阻塞(nonblocking)算法;若是算法的每个步骤中都有一些线程可以继续执行,那么这样的算法称为锁自由(lock-free)算法。在线程间使用CAS进行协调,这样的算法若是能构建正确的话,它既是非阻塞的,又是锁自由的。非竞争的CAS老是可以成功,若是多个线程以一个CAS竞争,总会有一个胜出并前进。非阻塞算法堆死锁和优先级倒置有“免疫性”(但它们可能会出现饥饿和活锁,由于它们容许重进入)。

非阻塞算法经过使用低层次的并发原语,好比比较交换,取代了锁。原子变量类向用户提供了这些底层级原语,也可以当作“更佳的volatile变量”使用,同时提供了整数类和对象引用的原子化更新操做。

——————————- 
/** 
*DelayQueue(延时队列)是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象, 
*内部经过一个优先队列(PriorityQueue)的引用实现相关数据操做。 
*其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即对头对象的延迟到期的时间最长。 
*若是没有任何到期,那么就不会有任何头元素,而且poll将返回null(正由于这样,不能将null放入该队列中) 
*Delayed接口有一个名为getDelay()的方法,它用来告知延时到期有多长时间,或延迟在多长时间以前已经到期。 
*为了排序,Delayed接口还继承了Comparable接口,所以必须实现comparaTo(),使其产生合理比较。 
*/ 
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    private transient final ReentrantLock lock = new ReentrantLock();//锁 
    /** 
    *接口Condition :锁条件变量,其实例和特定的锁绑定。提供了: 
    * await()、awaitUninterruptibly()、awaitNanos(long)、await(long, TimeUnit)、awaitUntil(Date) 
    * signal()、signalAll() 方法,实现了对线程的“等待”和“唤醒”操做 
    */ 
    private transient final Condition available = lock.newCondition();//锁的条件变量,提供“等待”“唤醒”线程的操做 
    private final PriorityQueue<E> q = new PriorityQueue<E>();//内部有一个优先队列(无界的队列)的引用

    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) { 
        this.addAll(c); 
    }

    /** 
     * 插入元素到延时队列 (调用offer实现) 
     */ 
    public boolean add(E e) { 
        return offer(e); 
    }

    /** 
     * 插入元素(加锁) 
     */ 
    public boolean offer(E e) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            E first = q.peek(); 
            q.offer(e); //调用优先队列的实现 
            if (first == null || e.compareTo(first) < 0) 
                available.signalAll();//唤醒其余的线程 
            return true; 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 插入元素 
     */ 
    public void put(E e) { 
        offer(e); 
    } 
    //因为是无界的,不会对offer插入元素阻塞,参数unit无效 
    public boolean offer(E e, long timeout, TimeUnit unit) { 
        return offer(e); 
    }

    /** 
     * 头元素出对列(加锁) 
     */ 
    public E poll() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            E first = q.peek(); 
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                return null; 
            else { 
                E x = q.poll(); 
                assert x != null; 
                if (q.size() != 0) 
                    available.signalAll(); 
                return x; 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 得到并移除头元素,在延时到期的状况下。 
     */ 
    public E take() throws InterruptedException { 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            for (;;) { 
                E first = q.peek(); 
                if (first == null) { 
                    available.await(); 
                } else { 
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS); 
                    if (delay > 0) { 
                        long tl = available.awaitNanos(delay); 
                    } else { 
                        E x = q.poll(); 
                        assert x != null; 
                        if (q.size() != 0) 
                            available.signalAll(); // wake up other takers 
                        return x;

                    } 
                } 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 得到并移除头元素,延迟到期或指定时间到期后。 
     */ 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        long nanos = unit.toNanos(timeout); 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            for (;;) { 
                E first = q.peek(); 
                if (first == null) { 
                    if (nanos <= 0) 
                        return null; 
                    else 
                        nanos = available.awaitNanos(nanos); 
                } else { 
                    long delay = first.getDelay(TimeUnit.NANOSECONDS); 
                    if (delay > 0) { 
                        if (nanos <= 0) 
                            return null; 
                        if (delay > nanos) 
                            delay = nanos; 
                        long timeLeft = available.awaitNanos(delay); 
                        nanos -= delay – timeLeft; 
                    } else { 
                        E x = q.poll(); 
                        assert x != null; 
                        if (q.size() != 0) 
                            available.signalAll(); 
                        return x; 
                    } 
                } 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 获取但不移除元素(调用优先队列的peek方法) 
     */ 
    public E peek() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.peek(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public int size() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.size(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    //把队列在的元素“剪切到”集合c中 
    public int drainTo(Collection<? super E> c) { 
        if (c == null) 
            throw new NullPointerException(); 
        if (c == this) 
            throw new IllegalArgumentException(); 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            int n = 0; 
            for (;;) { 
                E first = q.peek(); 
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                    break; 
                c.add(q.poll()); 
                ++n; 
            } 
            if (n > 0) 
                available.signalAll(); 
            return n; 
        } finally { 
            lock.unlock(); 
        } 
    }

    //把maxElements个元素“剪切”到集合c中 
    public int drainTo(Collection<? super E> c, int maxElements) { 
        if (c == null) 
            throw new NullPointerException(); 
        if (c == this) 
            throw new IllegalArgumentException(); 
        if (maxElements <= 0) 
            return 0; 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            int n = 0; 
            while (n < maxElements) { 
                E first = q.peek(); 
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                    break; 
                c.add(q.poll()); 
                ++n; 
            } 
            if (n > 0) 
                available.signalAll(); 
            return n; 
        } finally { 
            lock.unlock(); 
        } 
    }

    public void clear() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            q.clear(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public int remainingCapacity() { 
        return Integer.MAX_VALUE; 
    }

    public Object[] toArray() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.toArray(); 
        } finally { 
            lock.unlock(); 
        } 
    } 
    public <T> T[] toArray(T[] a) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.toArray(a); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public boolean remove(Object o) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.remove(o); 
        } finally { 
            lock.unlock(); 
        } 
    }

……

}

相关文章
相关标签/搜索