Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么java

队列,是一种数据结构。除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的。不管使用哪一种排序方式,队列的头都是调用remove()或poll()移除元素的。在FIFO队列中,全部新元素都插入队列的末尾。node

 

Queue中的方法程序员

Queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下JDK API就知道了:数组

注意一点就好,Queue一般不容许插入Null,尽管某些实现(好比LinkedList)是容许的,可是也不建议。安全

 

BlockingQueue数据结构

一、BlockingQueue概述多线程

只讲BlockingQueue,由于BlockingQueue是Queue中的一个重点,而且经过BlockingQueue咱们再次加深对于生产者/消费者模型的理解。其余的Queue都不难,经过查看JDK API和简单阅读源码彻底能够理解他们的做用。spa

BlockingQueue,顾名思义,阻塞队列。BlockingQueue是在java.util.concurrent下的,所以不难理解,BlockingQueue是为了解决多线程中数据高效安全传输而提出的。线程

多线程中,不少场景均可以使用队列实现,好比经典的生产者/消费者模型,经过队列能够便利地实现二者之间数据的共享,定义一个生产者线程,定义一个消费者线程,经过队列共享数据就能够了。code

固然现实不可能都是理想的,好比消费者消费速度比生产者生产的速度要快,那么消费者消费到 必定程度上的时候,必需要暂停等待一下了(使消费者线程处于WAITING状态)。BlockingQueue的提出,就是为了解决这个问题的,他不用程序员去控制这些细节,同时还要兼顾效率和线程安全。

阻塞队列所谓的"阻塞",指的是某些状况下线程会挂起(即阻塞),一旦条件知足,被挂起的线程又会自动唤醒。使用BlockingQueue,不须要关心何时须要阻塞线程,何时须要唤醒线程,这些内容BlockingQueue都已经作好了

二、BlockingQueue中的方法

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已经列了。看一下BlockingQueue中特有的方法:

(1)void put(E e) throws InterruptedException

把e添加进BlockingQueue中,若是BlockingQueue中没有空间,则调用线程被阻塞,进入等待状态,直到BlockingQueue中有空间再继续

(2)void take() throws InterruptedException

取走BlockingQueue里面排在首位的对象,若是BlockingQueue为空,则调用线程被阻塞,进入等待状态,直到BlockingQueue有新的数据被加入

(3)int drainTo(Collection<? super E> c, int maxElements)

一次性取走BlockingQueue中的数据到c中,能够指定取的个数。经过该方法能够提高获取数据效率,不须要屡次分批加锁或释放锁

三、ArrayBlockingQueue

基于数组的阻塞队列,必须指定队列大小。比较简单。ArrayBlockingQueue中只有一个ReentrantLock对象,这意味着生产者和消费者没法并行运行(见下面的代码)。另外,建立ArrayBlockingQueue时,能够指定ReentrantLock是否为公平锁,默认采用非公平锁。

/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

四、LinkedBlockingQueue

基于链表的阻塞队列,和ArrayBlockingQueue差很少。不过LinkedBlockingQueue若是不指定队列容量大小,会默认一个相似无限大小的容量,之因此说是相似是由于这个无限大小是Integer.MAX_VALUE,这么说就好理解ArrayBlockingQueue为何必需要制定大小了,若是ArrayBlockingQueue不指定大小的话就用Integer.MAX_VALUE,那将形成大量的空间浪费,可是基于链表实现就不同的,一个一个节点连起来而已。另外,LinkedBlockingQueue生产者和消费者都有本身的锁(见下面的代码),这意味着生产者和消费者能够"同时"运行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

五、SynchronousQueue

比较特殊,一种没有缓冲的等待队列。什么叫作没有缓冲区,ArrayBlocking中有:

/** The queued items  */
private final E[] items;

数组用以存储队列。LinkedBlockingQueue中有:

/**
 * Linked list node class
 */
static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
    volatile E item;
    Node<E> next;
    Node(E x) { item = x; }
}

将队列以链表形式链接。

生产者/消费者操做数据实际上都是经过这两个"中介"来操做数据的,可是SynchronousQueue则是生产者直接把数据给消费者(消费者直接从生产者这里拿数据),好像又回到了没有生产者/消费者模型的老办法了。换句话说,每个插入操做必须等待一个线程对应的移除操做。SynchronousQueue又有两种模式:

一、公平模式

采用公平锁,并配合一个FIFO队列(Queue)来管理多余的生产者和消费者

二、非公平模式

采用非公平锁,并配合一个LIFO栈(Stack)来管理多余的生产者和消费者,这也是SynchronousQueue默认的模式

 

利用BlockingQueue实现生产者消费者模型

上一篇咱们写的生产者消费者模型有局限,局限体如今:

  • 缓冲区内只能存放一个数据,实际生产者/消费者模型中的缓冲区内能够存放大量生产者生产出来的数据
  • 生产者和消费者处理数据的速度几乎同样

OK,咱们就用BlockingQueue来简单写一个例子,而且让生产者、消费者处理数据速度不一样。子类选择的是ArrayBlockingQueue,大小定为10:

public static void main(String[] args)
{
    final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
    Runnable producerRunnable = new Runnable()
    {
        int i = 0;
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我生产了一个" + i++);
                    bq.put(i + "");
                    Thread.sleep(1000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Runnable customerRunnable = new Runnable()
    {
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我消费了一个" + bq.take());
                    Thread.sleep(3000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread producerThread = new Thread(producerRunnable);
    Thread customerThread = new Thread(customerRunnable);
    producerThread.start();
    customerThread.start();
}

代码的作法是让生产者生产速度快于消费者消费速度的,看一下运行结果:

 1 我生产了一个0
 2 我消费了一个1
 3 我生产了一个1
 4 我生产了一个2
 5 我消费了一个2
 6 我生产了一个3
 7 我生产了一个4
 8 我生产了一个5
 9 我消费了一个3
10 我生产了一个6
11 我生产了一个7
12 我生产了一个8
13 我消费了一个4
14 我生产了一个9
15 我生产了一个10
16 我生产了一个11
17 我消费了一个5
18 我生产了一个12
19 我生产了一个13
20 我生产了一个14
21 我消费了一个6
22 我生产了一个15
23 我生产了一个16
24 我消费了一个7
25 我生产了一个17
26 我消费了一个8
27 我生产了一个18

分两部分来看输出结果:

一、第1行~第23行。这块BlockingQueue未满,因此生产者随便生产,消费者随便消费,基本上都是生产3个消费1个,消费者消费速度慢

二、第24行~第27行,从前面咱们能够看出,生产到16,消费到6,说明到了ArrayBlockingQueue的极限10了,这时候没办法,生产者生产一个ArrayBlockingQueue就满了,因此不能继续生产了,只有等到消费者消费完才能够继续生产。因此以后的打印内容必定是一个生产者、一个消费者

这就是前面一章开头说的"经过平衡生产者和消费者的处理能力来提升总体处理数据的速度",这给例子应该体现得很明显。另外,也不要担忧非单一辈子产者/消费者场景下的系统假死问题,缓冲区空、缓冲区满的场景BlockingQueue都是定义了不一样的Condition,因此不会唤醒本身的同类。

相关文章
相关标签/搜索