老样子,咱们仍是从一些例子开始慢慢熟悉各类并发队列。以看小说看故事的心态来学习不会显得那么枯燥并且更容易记忆深入。java
阻塞队列最适合作的事情就是作为生产消费者的中间存储,以抵抗生产者消费者速率不匹配的问题,不可是在速率不匹配的时候可以有地方暂存任务,并且能在队列满或空的时候让线程进行阻塞,让出CPU的时间。这里对于阻塞两字加粗,是由于其实Java的线程在这个时候是等待(WAITING)状态而不是阻塞(BLOCKED),这个容易引发歧义。git
下面咱们来写一个程序比较一下阻塞和等待:github
@Slf4j public class BlockVsWait { Object locker = new Object(); ArrayBlockingQueue<Integer> arrayBlockingQueue1 = new ArrayBlockingQueue<>(1); ArrayBlockingQueue<Integer> arrayBlockingQueue2 = new ArrayBlockingQueue<>(1); @Test public void test() throws InterruptedException { arrayBlockingQueue1.put(1); Thread waitOnTake = new Thread(() -> { synchronized (locker) { try { arrayBlockingQueue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }); waitOnTake.setName("waitOnTake"); waitOnTake.start(); Thread waitOnPut = new Thread(() -> { try { arrayBlockingQueue1.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); waitOnPut.setName("waitOnPut"); waitOnPut.start(); Thread block = new Thread(() -> { synchronized (locker) { log.info("OK"); } }); block.setName("block"); block.start(); block.join(); } }
在上面的代码里,咱们开启了三个线程:数组
运行程序以后,咱们看一下线程的状态,能够看到:安全
咱们来查看一下线程这两种状态的定义:微信
通俗一点说,BLOCKED就是线程本身想作事情,可是很无奈只能等别人先把事情干完,因此说是被阻塞,被动的,WAITING就是线程本身主动愿意放弃CPU时间进行等待,等别人在合适的时候通知本身来继续干活,因此说是等待中,主动的。Blocking Queue实际上是让线程Waiting而不是Block。数据结构
如今,咱们使用阻塞队列尝试实现生产者消费者的功能。多线程
首先,实现一个基类,经过一个开关来控制生产者消费者的执行:并发
@Slf4j public abstract class Worker implements Runnable { protected volatile boolean enable = true; protected String name; protected BlockingQueue<Integer> queue; public Worker(String name, BlockingQueue<Integer> queue) { this.name = name; this.queue = queue; } public void stop() { this.enable = false; log.info("Stop:{}", name); } }
而后实现生产者:异步
@Slf4j public class Producer extends Worker { private static AtomicInteger atomicInteger = new AtomicInteger(0); public Producer(String name, BlockingQueue<Integer> queue) { super(name, queue); } @Override public void run() { while (enable) { try { int value = atomicInteger.incrementAndGet(); queue.put(value); log.info("size:{}, put:{}, enable:{}", queue.size(), value, enable); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { } } log.info("{} quit", name); } }
只要开关开启,生产者会无限进行数据生产,把数据加入队列,生产者每100ms生产一个数据,这里有一个计数器来提供要生产的数据。
下面实现消费者:
@Slf4j public class Consumer extends Worker { private static AtomicInteger totalConsumedAfterShutdown = new AtomicInteger(); public Consumer(String name, BlockingQueue<Integer> queue) { super(name, queue); } public static int totalConsumedAfterShutdown() { return totalConsumedAfterShutdown.get(); } @Override public void run() { while (enable || queue.size() > 0) { try { Integer item = queue.take(); log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable); if (!enable) { totalConsumedAfterShutdown.incrementAndGet(); } TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } } log.info("{} quit", name); } }
一样,消费者也是在开关开启或队列中有数据的时候,会不断进行数据消费。这里咱们有一个计数器用来统计开关关闭以后,消费者还能消费多少数据。消费者消费速度是200ms消费一次,明显比生产者慢一半。经过这个配置咱们能够想到,若是使用有界阻塞队列的话,由于消费速度比生产速度慢,因此队列会慢慢堆积一直到队列满,而后生产者线程被阻塞,咱们来写一个测试程序看看是否是这样:
@Slf4j public class ArrayBlockingQueueTest { @Test public void test() throws InterruptedException { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(50, false); List<Worker> workers = new ArrayList<>(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++) { String name = "Producer" + i; Producer worker = new Producer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } for (int i = 0; i < 4; i++) { String name = "Consumer" + i; Consumer worker = new Consumer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } Executors.newSingleThreadScheduledExecutor().schedule(() -> { for (Worker worker : workers) { worker.stop(); } }, 2, TimeUnit.SECONDS); for (Thread thread : threads) { thread.join(); } log.info("totalConsumedAfterShutdown:{}", Consumer.totalConsumedAfterShutdown()); } }
在这段代码里:
部分运行结果以下:
12:59:34.609 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:40, enable:true 12:59:34.609 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:37, enable:true 12:59:34.609 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:36, enable:true 12:59:34.609 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:38, enable:true 12:59:34.609 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:34, put:39, enable:true 12:59:34.683 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:7, enable:true 12:59:34.683 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:6, enable:true 12:59:34.683 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:5, enable:true 12:59:34.687 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:8, enable:true 12:59:34.701 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:33, put:41, enable:true 12:59:34.701 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:42, enable:true 12:59:34.701 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:35, put:44, enable:true 12:59:34.701 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:36, put:43, enable:true 12:59:34.711 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:37, put:45, enable:true 12:59:34.714 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:41, put:46, enable:true 12:59:34.714 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:39, put:48, enable:true 12:59:34.714 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:40, put:50, enable:true 12:59:34.714 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:42, put:49, enable:true 12:59:34.714 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:38, put:47, enable:true 12:59:34.805 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:45, put:53, enable:true 12:59:34.805 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:43, put:51, enable:true 12:59:34.805 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:44, put:52, enable:true 12:59:34.805 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:46, put:54, enable:true 12:59:34.814 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:55, enable:true 12:59:34.818 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:58, enable:true 12:59:34.818 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:57, enable:true 12:59:34.818 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:56, enable:true 12:59:34.888 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:12, enable:true 12:59:34.888 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:60, enable:true 12:59:34.888 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:59, enable:true 12:59:34.887 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:9, enable:true 12:59:34.887 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:10, enable:true 12:59:34.892 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:11, enable:true 12:59:34.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:62, enable:true 12:59:34.909 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:61, enable:true 12:59:35.093 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:64, enable:true 12:59:35.093 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:13, enable:true 12:59:35.094 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:16, enable:true 12:59:35.094 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:17, enable:true 12:59:35.094 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:65, enable:true 12:59:35.094 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:18, enable:true 12:59:35.094 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:66, enable:true 12:59:35.094 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:63, enable:true 12:59:35.297 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:19, enable:true 12:59:35.298 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:69, enable:true 12:59:35.298 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:68, enable:true 12:59:35.298 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:67, enable:true 12:59:35.298 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:20, enable:true 12:59:35.298 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:70, enable:true 12:59:35.298 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:15, enable:true 12:59:35.298 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:14, enable:true 12:59:35.502 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:74, enable:true 12:59:35.502 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:24, enable:true 12:59:35.502 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:22, enable:true 12:59:35.502 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:23, enable:true 12:59:35.502 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:73, enable:true 12:59:35.502 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:72, enable:true 12:59:35.502 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:21, enable:true 12:59:35.502 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:71, enable:true 12:59:35.704 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:77, enable:true 12:59:35.704 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:30, enable:true 12:59:35.704 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:28, enable:true 12:59:35.704 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:75, enable:true 12:59:35.704 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:80, enable:true 12:59:35.704 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:27, enable:true 12:59:35.704 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:76, enable:true 12:59:35.704 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:29, enable:true 12:59:35.909 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:32, enable:true 12:59:35.909 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:84, enable:true 12:59:35.909 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:25, enable:true 12:59:35.909 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:79, enable:true 12:59:35.909 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:78, enable:true 12:59:35.909 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:33, enable:true 12:59:35.909 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:83, enable:true 12:59:35.909 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:26, enable:true 12:59:36.113 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:34, enable:true 12:59:36.113 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:35, enable:true 12:59:36.113 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:31, enable:true 12:59:36.113 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:38, enable:true 12:59:36.113 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:81, enable:true 12:59:36.113 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:82, enable:true 12:59:36.114 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:87, enable:true 12:59:36.114 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:85, enable:true 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer0 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer1 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer2 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer3 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer4 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer5 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer6 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer7 12:59:36.313 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer8 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Producer9 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer0 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer1 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer2 12:59:36.314 [pool-1-thread-1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Worker - Stop:Consumer3 12:59:36.317 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:39, enable:false 12:59:36.317 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:36, enable:false 12:59:36.317 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:37, enable:false 12:59:36.317 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:40, enable:false 12:59:36.317 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:47, put:86, enable:false 12:59:36.317 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:88, enable:false 12:59:36.317 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:92, enable:false 12:59:36.317 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:91, enable:false 12:59:36.420 [Producer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer1 quit 12:59:36.420 [Producer6] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer6 quit 12:59:36.420 [Producer7] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer7 quit 12:59:36.420 [Producer4] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer4 quit 12:59:36.522 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:41, enable:false 12:59:36.522 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:44, enable:false 12:59:36.522 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:43, enable:false 12:59:36.522 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:96, enable:false 12:59:36.522 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:42, enable:false 12:59:36.522 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:49, put:90, enable:false 12:59:36.522 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:89, enable:false 12:59:36.522 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:50, put:93, enable:false 12:59:36.626 [Producer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer2 quit 12:59:36.626 [Producer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer0 quit 12:59:36.626 [Producer8] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer8 quit 12:59:36.626 [Producer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer3 quit 12:59:36.725 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:49, got:45, enable:false 12:59:36.726 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:95, enable:false 12:59:36.726 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - size:48, put:94, enable:false 12:59:36.726 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:50, enable:false 12:59:36.725 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:48, got:47, enable:false 12:59:36.726 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:47, got:48, enable:false 12:59:36.829 [Producer5] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer5 quit 12:59:36.829 [Producer9] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Producer - Producer9 quit 12:59:36.930 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:46, got:49, enable:false 12:59:36.930 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:46, enable:false 12:59:36.930 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:45, got:51, enable:false 12:59:36.930 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:44, got:52, enable:false 12:59:37.133 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:42, got:54, enable:false 12:59:37.133 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:57, enable:false 12:59:37.133 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:40, got:53, enable:false 12:59:37.133 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:41, got:55, enable:false 12:59:37.334 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:59, enable:false 12:59:37.334 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:56, enable:false 12:59:37.334 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:36, got:60, enable:false 12:59:37.334 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:37, got:58, enable:false 12:59:37.538 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:34, got:61, enable:false 12:59:37.538 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:63, enable:false 12:59:37.538 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:33, got:64, enable:false 12:59:37.539 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:32, got:62, enable:false 12:59:37.742 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:29, got:68, enable:false 12:59:37.742 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:65, enable:false 12:59:37.742 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:30, got:66, enable:false 12:59:37.742 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:28, got:67, enable:false 12:59:37.948 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:26, got:70, enable:false 12:59:37.948 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:69, enable:false 12:59:37.948 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:24, got:72, enable:false 12:59:37.948 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:25, got:71, enable:false 12:59:38.149 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:21, got:75, enable:false 12:59:38.149 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:76, enable:false 12:59:38.149 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:22, got:74, enable:false 12:59:38.149 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:20, got:73, enable:false 12:59:38.350 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:80, enable:false 12:59:38.350 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:18, got:77, enable:false 12:59:38.350 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:16, got:79, enable:false 12:59:38.350 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:17, got:78, enable:false 12:59:38.553 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:83, enable:false 12:59:38.553 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:14, got:84, enable:false 12:59:38.553 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:13, got:82, enable:false 12:59:38.553 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:12, got:81, enable:false 12:59:38.759 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:87, enable:false 12:59:38.759 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:88, enable:false 12:59:38.759 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:86, enable:false 12:59:38.759 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:8, got:85, enable:false 12:59:38.960 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:7, got:92, enable:false 12:59:38.963 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:89, enable:false 12:59:38.963 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:4, got:90, enable:false 12:59:38.963 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:5, got:91, enable:false 12:59:39.161 [Consumer0] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:3, got:96, enable:false 12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:93, enable:false 12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:0, got:94, enable:false 12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - size:1, got:95, enable:false 12:59:39.168 [Consumer2] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer2 quit 12:59:39.168 [Consumer1] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer1 quit 12:59:39.168 [Consumer3] INFO me.josephzhu.javaconcurrenttest.concurrent.queues.Consumer - Consumer3 quit
从结果看到几个结论:
固然这个状态不那么容易碰巧遇到,我运行了20+次代码才遇到一次,你也能够把sleep移到前面去这样更容易出现这样的问题。
细细品味一下为何有一个消费者卡住了,咱们不是判断了队列中有数据才继续执行take()的吗?问题就出在这里,在判断的时候队列中的确有数据,看看Consumer0最后输出了3,可是在这以后的瞬间,还有3条数据都被其它线程消费完了,等到执行下一行代码的时候就卡住了。在编写多线程程序的时候,咱们很容易去假设:
这个Bug是很容易忽略的,咱们能够改一下消费者代码,利用有超时等待的poll()来解决这个问题:
@Override public void run() { while (enable || queue.size() > 0) { try { Integer item = queue.poll(1, TimeUnit.SECONDS); log.info("size:{}, got:{}, enable:{}", queue.size(), item, enable); if (!enable && item != null) { totalConsumedAfterShutdown.incrementAndGet(); } TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } } log.info("{} quit", name); }
修改主程序后能够获得下面的结果:
值得注意几点:
前面咱们也看到了,队列消费的操做能够take()能够poll(),各类操做的区别以下:
这些操做之间的性能是否有区别呢,咱们写一个简单的程序测试一下
@Slf4j public class QueueBenchmark { int taskCount = 20000000; int threadCount = 10; @Test public void test() throws InterruptedException { List<Queue<Integer>> queues = getQueues(); benchmark("add", queues, taskCount, threadCount); benchmark("poll", queues, taskCount, threadCount); benchmark("offer", queues, taskCount, threadCount); benchmark("size", queues, taskCount, threadCount); benchmark("remove", queues, taskCount, threadCount); } private List<Queue<Integer>> getQueues() { return Arrays.asList(new ConcurrentLinkedQueue<>(), new LinkedBlockingQueue<>(), new ArrayBlockingQueue<>(taskCount, false), new LinkedTransferQueue<>(), new PriorityBlockingQueue<>(), new LinkedList<>()); } private void benchmark(String operation, List<Queue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException { StopWatch stopWatch = new StopWatch(); queues.forEach(queue -> { stopWatch.start(queue.getClass().getSimpleName() + "-" + operation); try { tasks(queue, taskCount, threadCount, operation); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); log.info("queue:{}, operation:{}, size:{}, qps:{}", queue.getClass().getSimpleName(), operation, queue.size(), (long) taskCount * 1000 / stopWatch.getLastTaskTimeMillis()); }); log.info(stopWatch.prettyPrint()); } private void tasks(Queue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> { IntConsumer opt = task(queue, operation); if (queue instanceof LinkedList) { synchronized (queue) { opt.accept(i); } } else { opt.accept(i); } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } private IntConsumer task(Queue<Integer> queue, String name) { if (name.equals("add")) return queue::add; if (name.equals("offer")) return queue::offer; if (name.equals("poll")) return i -> queue.poll(); if (name.equals("remove")) return i -> queue.remove(); if (name.equals("size")) return i -> queue.size(); return i -> { }; } }
在代码里,咱们测试10个线程下,对各类队列的各类方法执行N次操做的耗时。
结论以下,表格中数据的单位毫秒,也就是耗时,数字越小性能越好:
有几个地方值得注意:
下面咱们稍微改下代码测试一下BlockingQueue的put()和take():
@Slf4j public class BlockingQueueBenchmark { int taskCount = 20000000; int threadCount = 10; @Test public void test() throws InterruptedException { List<BlockingQueue<Integer>> queues = getQueues(); benchmark("put", queues, taskCount, threadCount); benchmark("take", queues, taskCount, threadCount); } private List<BlockingQueue<Integer>> getQueues() { return Arrays.asList( new LinkedBlockingQueue<>(), new LinkedTransferQueue<>(), new ArrayBlockingQueue<>(taskCount, false), new PriorityBlockingQueue<>()); } private void benchmark(String operation, List<BlockingQueue<Integer>> queues, int taskCount, int threadCount) throws InterruptedException { StopWatch stopWatch = new StopWatch(); queues.forEach(queue -> { stopWatch.start(queue.getClass().getSimpleName() + "-" + operation); try { tasks(queue, taskCount, threadCount, operation); } catch (InterruptedException e) { e.printStackTrace(); } stopWatch.stop(); log.info("queue:{}, operation:{}, size:{}", queue.getClass().getSimpleName(), operation, queue.size()); }); log.info(stopWatch.prettyPrint()); } private void tasks(BlockingQueue<Integer> queue, int taskCount, int threadCount, String operation) throws InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount); forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task(queue, operation))); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } private IntConsumer task(BlockingQueue<Integer> queue, String name) { if (name.equals("put")) return i -> { try { queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } }; if (name.equals("take")) return i -> { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }; return i -> { }; } }
把结果一块儿完善到前面表格中:
能够看到,阻塞的方法和非阻塞的性能差很少,也是根据须要选择便可。看代码实现的话也能够看到不少队列对于各类存取方法逻辑基本是一致的。
各个队列之间的性能貌似区别不大,我感受这个测试写的不是很好,可能和线程池的调度也有关系,咱们接下去再从新换一种测试方式来测试下各类队列的吞吐。
在此次的测试中,咱们模拟一下场景:
@Data @AllArgsConstructor @NoArgsConstructor class TestCase { private int elementCount; private Mode mode; private int producerCount; private int consumerCount; }
模拟一下不一样的消费者生产者线程数量配比的状况下,各类队列完成必定数量元素的存取操做总共的耗时。咱们定义三种模式:
enum Mode { ProducerAndConsumerShareThread, ProducerAndThenConsumer, ConcurrentProducerAndConsumer }
咱们定义的全部测试场景以下:
List<TestCase> testCases = new ArrayList<>(); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 1)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 10, 10)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 100)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1000, 1000)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors())); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 1, 100)); testCases.add(new TestCase(element_count, Mode.ConcurrentProducerAndConsumer, 100, 1)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 10, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 100, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, 1000, 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndConsumerShareThread, Runtime.getRuntime().availableProcessors(), 0)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1, 1)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 10, 10)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 100, 100)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, 1000, 1000)); testCases.add(new TestCase(element_count, Mode.ProducerAndThenConsumer, Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()));
十几种测试,覆盖这些场景:
主要测试三种队列,每一种队列测试之间GC一次尽可能排除干扰:
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(); for (TestCase testCase : testCases) { System.gc(); benchmark(linkedBlockingQueue, testCase); } linkedBlockingQueue = null; LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>(); for (TestCase testCase : testCases) { System.gc(); benchmark(linkedTransferQueue, testCase); } linkedTransferQueue = null; ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(element_count); for (TestCase testCase : testCases) { System.gc(); benchmark(arrayBlockingQueue, testCase); } arrayBlockingQueue = null;
生产者:
class ProducerTask implements Runnable { private String name; private BlockingQueue<String> queue; private TestCase testCase; private CountDownLatch startCountDownLatch; private CountDownLatch finishCountDownLatch; public ProducerTask(CountDownLatch startCountDownLatch, CountDownLatch finishCountDownLatch, String name, BlockingQueue<String> queue, TestCase testCase) { this.startCountDownLatch = startCountDownLatch; this.finishCountDownLatch = finishCountDownLatch; this.name = name; this.queue = queue; this.testCase = testCase; } @Override public void run() { try { startCountDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } int count = testCase.elementCount / testCase.getProducerCount(); if (testCase.mode == Mode.ProducerAndConsumerShareThread) { for (int i = 0; i < count; i++) { try { queue.put(name + i); queue.take(); } catch (Exception e) { e.printStackTrace(); } } } else { for (int i = 0; i < count; i++) { try { queue.put(name + i); } catch (Exception e) { e.printStackTrace(); } } } finishCountDownLatch.countDown(); } }
此次的测试,咱们预先根据线程数量算好执行次数,而不是像以前的测试同样全部的任务统一由线程池调度,这样更容易测试出队列自己的性能,排除干扰。这里能够看到若是是存取共享模式的话,生产者直接作存取操做,其它模式的话,生产者仅仅作存的操做。
消费者:
class ConsumerTask implements Runnable { private BlockingQueue<String> queue; private TestCase testCase; private CountDownLatch startCountDownLatch; private CountDownLatch finishCountDownLatch; public ConsumerTask(CountDownLatch startCountDownLatch, CountDownLatch finishCountDownLatch, BlockingQueue<String> queue, TestCase testCase) { this.startCountDownLatch = startCountDownLatch; this.finishCountDownLatch = finishCountDownLatch; this.queue = queue; this.testCase = testCase; } @Override public void run() { try { startCountDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } int count = testCase.elementCount / testCase.getConsumerCount(); if (testCase.mode != Mode.ProducerAndConsumerShareThread) { for (int i = 0; i < count; i++) { try { queue.take(); } catch (Exception e) { e.printStackTrace(); } } } finishCountDownLatch.countDown(); } }
生产者和消费者咱们都用了两个CountDownLatch来作拦截,一个startCountDownLatch用来在全部线程都启动后由主线程通知一会儿放开全部的线程,一个finishCountDownLatch用来让主线程等待线程的执行完毕。
主要的测试代码以下:
private void benchmark(BlockingQueue<String> queue, TestCase testCase) throws InterruptedException { long begin = System.currentTimeMillis(); log.info("\r\n==========================\r\nBegin benchmark Queue:[{}], case:{}", queue.getClass().getSimpleName(), testCase.toString()); CountDownLatch startCountDownLatch = new CountDownLatch(1); if (testCase.mode == Mode.ProducerAndConsumerShareThread) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } else if (testCase.mode == Mode.ConcurrentProducerAndConsumer) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount() + testCase.getConsumerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } for (int i = 0; i < testCase.getConsumerCount(); i++) { new Thread(new ConsumerTask( startCountDownLatch, finishCountDownLatch, queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } else if (testCase.mode == Mode.ProducerAndThenConsumer) { CountDownLatch finishCountDownLatch = new CountDownLatch(testCase.getProducerCount()); for (int i = 0; i < testCase.getProducerCount(); i++) { new Thread(new ProducerTask( startCountDownLatch, finishCountDownLatch, String.format("Thread_%d_", i), queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); startCountDownLatch = new CountDownLatch(1); finishCountDownLatch = new CountDownLatch(testCase.getConsumerCount()); for (int i = 0; i < testCase.getConsumerCount(); i++) { new Thread(new ConsumerTask( startCountDownLatch, finishCountDownLatch, queue, testCase)).start(); } startCountDownLatch.countDown(); finishCountDownLatch.await(); } long finish = System.currentTimeMillis(); log.info("Finish benchmark Queue:[{}], case:{}, QPS:{}\r\n==========================\n", queue.getClass().getSimpleName(), testCase.toString(), (long) element_count * 1000 / (finish - begin)); }
能够看到三种模式的处理不一样:
整个测试结果汇总以下(这个测试是在12核阿里云跑出来的,元素数1000万):
说实话这个测试的结果不是我想象的那样,我想象的是随着并发的增多队列性能会急剧降低,并且各类队列之间有显著的性能差别,这个结果是这样这也能够说明这些队列性能都是很不错的,没有明显的短板。
能够大概得出几个结论:
通常而言,阻塞队列中,无界队列能够选择LinkedBlockingQueue,有界队列能够选择ArrayBlockingQueue,后者还有公平参数能够开启公平特性,有关这个特性下面咱们也会来观察。
SynchronousQueue是没有容量的阻塞队列,只有等另外一个线程移出元素后才能插入元素成功。这里咱们写一段代码来测试,沿用以前的消费者和生产者类,只是修改了2秒后关闭队列的地方,这里咱们加上了interrupt()操做,不然生产者是没法退出的:
@Slf4j public class SynchronousQueueTest { @Test public void test() throws InterruptedException { SynchronousQueue<Integer> queue = new SynchronousQueue<>(false); List<Worker> workers = new ArrayList<>(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++) { String name = "Producer" + i; Producer worker = new Producer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } for (int i = 0; i < 4; i++) { String name = "Consumer" + i; Consumer worker = new Consumer(name, queue); workers.add(worker); Thread thread = new Thread(worker); thread.setName(name); threads.add(thread); thread.start(); } Executors.newSingleThreadScheduledExecutor().schedule(() -> { for (Worker worker : workers) { worker.stop(); } for (Thread thread : threads) { thread.interrupt(); } }, 2, TimeUnit.SECONDS); for (Thread thread : threads) { thread.join(); } } }
咱们先把公平参数设置为false看看输出:
搜索日志能够发现找不到Producer0~Producer5这6个生产者的踪影,由于没有消费者来拉取它们的数据,它们都卡住了,这些生产者都饿死了,日志中最小的put也是从7开始的。改成公平模式试试:
此次能够找到全部生产者的日志,公平模式也就是全部等待的线程FIFO次序来访问队列:
这里给出一个延迟队列的例子,咱们往队列提交10次延迟消息,每次提交2条同样的消息,消息的绝对延迟时间从1到10秒。
@Slf4j public class DelayQueueTest { @Test public void test() throws InterruptedException { DelayQueue<Message> delayQueue = new DelayQueue<>(); IntStream.rangeClosed(1, 10).forEach(i -> { for (int __ = 0; __ < 2; __++) delayQueue.add(new Message(i * 1000)); }); Executors.newFixedThreadPool(1).submit(() -> { while (true) { Message message = delayQueue.take(); log.debug("Got:{}", message); } }); TimeUnit.SECONDS.sleep(20); } @ToString class Message implements Delayed { private final long delay; private final long expire; public Message(long delay) { this.delay = delay; expire = System.currentTimeMillis() + delay; } @Override public long getDelay(TimeUnit unit) { //log.debug("getDelay called : {}", unit); return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } }
输出以下:
17:14:43.957 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947) 17:14:44.007 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=1000, expire=1563354883947) 17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949) 17:14:44.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=2000, expire=1563354884949) 17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949) 17:14:45.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=3000, expire=1563354885949) 17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949) 17:14:46.956 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=4000, expire=1563354886949) 17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949) 17:14:47.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=5000, expire=1563354887949) 17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949) 17:14:48.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=6000, expire=1563354888949) 17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949) 17:14:49.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=7000, expire=1563354889949) 17:14:50.954 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949) 17:14:50.955 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=8000, expire=1563354890949) 17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949) 17:14:51.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=9000, expire=1563354891949) 17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949) 17:14:52.953 [pool-1-thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.queues.DelayQueueTest - Got:DelayQueueTest.Message(delay=10000, expire=1563354892949)
能够看到每过1秒输出2条日志,符合预期。
以前生产上遇到过一个OOM的问题,排查下来是队列使用不当,这里咱们就来看下这个问题,代码逻辑是:
比较特殊的是,使用了transfer()方法,开发的小伙伴可能以为LinkedTransferQueue比较酷炫,因此选择了这个队列,而且认为transfer()能够直接把任务交给消费者性能较高,因此使用了这个方法。
代码以下:
@Slf4j public class BlockingQueueMisuse { LinkedTransferQueue<String> linkedTransferQueue = new LinkedTransferQueue<>(); @Test public void test() throws InterruptedException { int taskCount = 4000; StopWatch stopWatch = new StopWatch(); stopWatch.start("misuse"); ExecutorService threadPool = Executors.newFixedThreadPool(10); //ExecutorService threadPool = Executors.newCachedThreadPool(); IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> { try { linkedTransferQueue.transfer("message" + i); } catch (InterruptedException e) { e.printStackTrace(); } })); IntStream.rangeClosed(1, taskCount).forEach(i -> threadPool.submit(() -> { try { log.debug("Got:{}", linkedTransferQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } })); threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); stopWatch.stop(); log.info(stopWatch.prettyPrint()); } }
运行程序后发现没有任何输出,其实这是由于只有10个线程,生产者须要存的元素数量是4000大大超过了10,全部线程都在等待:
因而,他没多想把线程池修改成了newCachedThreadPool,程序能够正常执行了,看看运行结果:
这个代码是很吓人的,运行过程当中开启了几千个线程。咱们想一下缘由,其实newCachedThreadPool使用的是SynchronousQueue,在没有可用线程的状况下就会新建线程,而这个特性赶上了transfer()的特性,就会致使线程池建立几千个线程。
即便咱们把代码修改成使用LinkedBlockingQueue,配合newCachedThreadPool也会建立几十个线程(若是元素数量足够多,几百个几千个也有可能)。由于一旦阻塞,newCachedThreadPool就会绝不犹豫建立新线程。
对于生产者消费者这种任务,仍是建议直接使用线程来实现,生产者消费者的阻塞不相互干扰,并且线程池也是使用队列来管理任务的,用了线程池至关于两次队列,没有必要。
咱们来看一下此次实验涉及到的一些阻塞队列:
DelayQueue、SynchronousQueue和PriorityBlockingQueue是特种队列,有特殊用途根据须要选择。
LinkedTransferQueue也算是特种队列,它能够实现相似背压的效果,在特殊场景下使用。
ArrayBlockingQueue和LinkedBlockingQueue背后的数据结构不一样,它们多是咱们最经常使用的队列了,区别以下:
非阻塞队列ConcurrentLinkedQueue比较特殊,首先它不是阻塞队列,其次它不使用锁,而是使用CAS,在超高并发的场景下,显然它能够到达更好的性能。
这里利用以前的代码最后作了一次对比测试,这里咱们没有测试并发存取模式,由于消费者不知道什么时候消费完毕,在消费不到数据的时候进行死循环意义不大:
因此在特殊的场景下,好比生产者生产好了数据扔到队列中,有N多个消费者须要并发消费这个时或许能够发挥ConcurrentLinkedQueue的威力(可是,以前也说过了,它的size()比较坑爹),常年处于空的队列不太适合,这个时候使用阻塞队列更合适。
好吧,看来90%的时候仍是用ArrayBlockingQueue和LinkedBlockingQueue太平,有界用前者,须要无界用后者,可是认真考虑下,你真的须要无界吗。经过咱们的测试能够发现这些队列在高并发下都有着百万以上的QPS性能,通常而言用哪一个都不会出现瓶颈,反而是咱们更应该注意由于阻塞致使的线程数量增多和队列的容量占用的内存。
本文中,咱们还花式使用了各类方式来测试队列:
这里想说的是,对于生产消费这样的任务最好仍是使用阻塞队列配置独立的消费线程,生产者能够直接是业务线程,而不是去使用线程池,没有这个必要。
一样,代码见个人Github,欢迎clone后本身把玩,欢迎点赞。
欢迎关注个人微信公众号:随缘主人的园子