Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例html
本文由 TonySpark 翻译自 Javarevisited。转载请参见文章末尾的要求。java
Java.util.concurrent.BlockingQueue 是一个队列实现类,支持这样的操做:当从队列中获取或者移除元素时,若是队列为空,须要等待,直到队列不为空;同时若是向队列中添加元素时,此时若是队列无可用空间,也须要等待。安全
BlockingQueue 类不接收Null值,若是你试图向队列中存入Null值将抛出NullPointerException.架构
BlockingQueue的实现是线程安全的。全部队列方法自己都是原子操做,使用并发控制的内部锁或者其它形式。并发
BlockingQueue这个接口是Java集合架构的一部分,它主要用于解决生产者/消费者问题。在BlockingQueue中,咱们不用担忧生产者操做时是否有可用空间或者消费者操做时是否有可用的对像而等待这样的问题,这些都会在它的实现类中进行处理。ide
Java中提供了几个对BlockingQueue的实现类,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等this
在处理生产者/消费者问题上 咱们将会使用ArrayBlockingQueue来实现,以下是咱们需知道的重要方法:spa
如今我们看看用BlockingQueue来解决生产者/消费者问题。线程
Message翻译
Producer产生的普通Java对象,并添加到队列中。
Message.java
1 package com.journaldev.concurrency; 2 3 public class Message { 4 private String msg; 5 6 public Message(String str){ 7 this.msg=str; 8 } 9 10 public String getMsg() { 11 return msg; 12 } 13 14 }
Producer
Producer这个类会产生消息并将其放入队列中。
Producer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { //生产消息 for(int i=0; i<100; i++){ Message msg = new Message(""+i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //添加退出消息 Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
Consumer类会从队列获取消息进行处理。若是获取的是退出消息则结束。
Consumer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { try{ Message msg; //获取并处理消息直到接收到“exit”消息 while((msg = queue.take()).getMsg() !="exit"){ Thread.sleep(10); System.out.println("Consumed "+msg.getMsg()); } }catch(InterruptedException e) { e.printStackTrace(); } } }
生产者/消费者的服务类将会产生固定大小的BlockingQueue,生产者和消费者同时共享该BlockingQueue,该服务类会起启动生产者和消费者线程。
ProducerConsumerService.java
1 package com.journaldev.concurrency; 2 3 4 import java.util.concurrent.ArrayBlockingQueue; 5 import java.util.concurrent.BlockingQueue; 6 7 8 public class ProducerConsumerService { 9 10 public static void main(String[] args) { 11 //建立大小为10的 BlockingQueue 12 BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); 13 Producer producer = new Producer(queue); 14 Consumer consumer = new Consumer(queue); 15 //开启 producer线程向队列中生产消息 16 new Thread(producer).start(); 17 //开启 consumer线程 中队列中消费消息 18 new Thread(consumer).start(); 19 System.out.println("Producer and Consumer has been started"); 20 } 21 22 }
上面程序的运行结果:
1 Producer and Consumer has been started 2 Produced 0 3 Produced 1 4 Produced 2 5 Produced 3 6 Produced 4 7 Consumed 0 8 Produced 5 9 Consumed 1 10 Produced 6 11 Produced 7 12 Consumed 2 13 Produced 8 14 ...
Thread sleep 使得生产者/消费者 生产、消费这此消息有必定的延迟。
原文连接: Javarevisited 翻译: TonySpark
译文连接: http://www.cnblogs.com/tonyspark/p/3722013.html
[ 转载请保留原文出处、译者和译文连接。]