1.相关知识的了解java
阻塞队列:当队列为空时,去队列中取数据会被阻塞。当队列满时,往队列中放数据会被阻塞。数组
非阻塞队列:当队列为空时,去队列取数据会直接返回失败,队列满时,往队列中放数据会直接返回失败。并发
2.经常使用的阻塞队列ide
LinkedBlockingQueue:基于链表实现的FIFO的阻塞队列,建立是能够指定容量大小,不指定则是默认值Integer.MAX_VALUE。this
ArrayBlockingQueue:基于数组实现的FIFO的阻塞队列,再建立是必须指定大小atom
3.LinkedBlockingQueue阻塞队列模拟生产者-消费者模式spa
1 package com.test; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.atomic.AtomicInteger; 7 8 /** 9 * 10 * @Title: Test01.java 11 * @Package com.test 12 * @Description: 使用并发包下 LinkedBlockingQueue 阻塞队列模拟生产者消费者问题 13 * @author Mr.Chen 14 * @date 2019年4月9日 15 * @version V1.0 16 * 版权声明:本文为博主原创文章,转载请附上博文连接 17 */ 18 public class Test01 { 19 public static void main(String[] args) { 20 BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); 21 ProducerThread producerThread = new ProducerThread(blockingQueue); 22 ConsumerThread consumerThread = new ConsumerThread(blockingQueue); 23 Thread t1 = new Thread(producerThread); 24 Thread t2 = new Thread(consumerThread); 25 t1.start(); 26 t2.start(); 27 //10秒后 中止线程.. 28 try { 29 Thread.sleep(10*1000); 30 producerThread.stop(); 31 } catch (Exception e) { 32 // TODO: handle exception 33 } 34 35 } 36 } 37 38 /** 39 * 40 *生存者类 41 */ 42 class ProducerThread implements Runnable { 43 //定义变量接收LinkedBlockingQueue 44 BlockingQueue<String> queue = null; 45 46 //定义一个自增的变量,用来做为队列里面的消息 47 AtomicInteger data = new AtomicInteger(0); 48 49 //定义循环的结束条件 50 boolean flag = true; 51 52 public ProducerThread(BlockingQueue<String> queue) { 53 this.queue = queue; 54 } 55 56 @Override 57 public void run() { 58 try { 59 //循环往队列里面放值,若是放不进去,设置两秒的等待时间。每一个循环设置1秒的等待时间,以便打印的时候方便查看 60 System.out.println(Thread.currentThread().getName() + " 生产者启动-----"); 61 while (flag) { 62 //获取data自增的值 63 String message = data.incrementAndGet() + ""; 64 boolean offer = queue.offer(message, 2, TimeUnit.SECONDS); 65 if (offer) { 66 System.out.println(Thread.currentThread().getName() + " " + message + " 放入队列成功"); 67 } else { 68 System.out.println(Thread.currentThread().getName() + " " + message + " 放入队列失败"); 69 } 70 Thread.sleep(1000); 71 } 72 } catch (InterruptedException e) { 73 System.out.println(Thread.currentThread().getName() + " 生产者中止-----"); 74 } finally { 75 System.out.println(Thread.currentThread().getName() + " 生产者中止-----"); 76 } 77 } 78 79 80 public void stop() { 81 this.flag = false; 82 } 83 84 } 85 86 class ConsumerThread implements Runnable { 87 //定义变量接收LinkedBlockingQueue 88 BlockingQueue<String> queue = null; 89 90 //定义循环的结束条件 91 boolean flag = true; 92 93 public ConsumerThread(BlockingQueue<String> queue) { 94 this.queue = queue; 95 } 96 97 @Override 98 public void run() { 99 //使用queue 去取队列中的消息 100 System.out.println(Thread.currentThread().getName() + " 消费者启动-----"); 101 try { 102 while (flag) { 103 String poll = queue.poll(2, TimeUnit.SECONDS); 104 if (poll == null) { 105 flag = false; 106 System.out.println("消费者超过2秒时间未获取到消息."); 107 return; 108 } 109 System.out.println(Thread.currentThread().getName() + " 消费者拿到 " + poll ); 110 Thread.sleep(2000); 111 } 112 } catch (InterruptedException e) { 113 // TODO Auto-generated catch block 114 e.printStackTrace(); 115 } 116 117 } 118 119 }