Disruptor.shutdown 方法阻塞至全部事件获得处理。
循环调用hasBacklog()断定当前 生产分配的游标: ringBuffer.getCursor() > 消费者序号: consumer的lastSequence,表示还未处理完结。java
public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException { final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } // Busy spin try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } halt(); } private boolean hasBacklog() { final long cursor = ringBuffer.getCursor(); for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + " " + "cursor=:"+ cursor +"consumer:"+ consumer.get()); if (cursor > consumer.get()) { return true; } } return false; }
准确的说:
hasBacklog只是比对瞬时状态下的是否处理完当前已入RingBuffer的事件。
高并发时大量事件阻塞置入RingBuffer时,当消费效率大于生产效率时,可能并非理想中下的全部事件获得处理,慎用。并发
测试用例:高并发