Disruptor 的大名从好久之前就据说了,可是一直没有时间;看完之后才发现其内部的思想异常清晰,很容易就能前移到其余的项目,因此仔细了解一下仍是颇有必要的这。篇博客将主要从源码角度分析,Disruptor 为何那么快,在此以前能够先查看 Disruptor 详解 一 ,可以对 Disruptor 的使用有一个大体的了解;此外 Disruptor 一般会和 ArrayBlockingQueue 作对比,能够参考 JDK源码分析(11)之 BlockingQueue 相关 ;html
首先能够从下面两张图看到,Disruptor 的内部结构,只这里我偷了一下懒,图中的内容是老版本的,可能和新版本有点不同可是主要结构仍是同样的;java
具体使用示例代码我这里就不贴,你们能够看我上一篇博客;设计模式
初始化;首先在启动的时候,须要预先初始化 RingBuffer,因此须要传入 EventFactory;这里和 JUC 里面 Queue 很不同的地方地方是,RingBuffer 中的 Event 不会被取出,每次 publish 的时候都是覆盖以前的内容,因此 RingBuffer 这里是不会产生 GC 的;而生产者和消费者都持有一个 Sequence,指示当前的处理位置,当须要获取 Event 的时候,能够直接使用 sequence & ringBuffer.size - 1
除留余数法快速找到对应的数组位置;数组
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
生产者;同时能够指定 Disruptor 是单生产者仍是多生产者:缓存
UNSAFE.compareAndSwapLong
;当没有空余位置的时候他们都是使用 LockSupport.parkNanos(1L);
来阻塞线程的,若是有须要你也能够改为其余的等待模式;并发
// RingBuffer // 首先经过 Sequencer 拿到下一个可用的序列 public long next() { return sequencer.next(); } // 而后用除留余数发拿到对应的数组元素 public E get(long sequence) { return elementAt(sequence); } // 这里是使用 UNSAFE 直接获取内存对象 protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } // 最后将拿到的数组元素修改成新的 Event,再发布 public void publish(long sequence) { sequencer.publish(sequence); } // 这里全部关于生产者并发的问题都封装到了 Sequencer 里面,后面最详细讲到
消费者;正由于上面说的 RingBuffer 中的对象不对像普通的 Queue 同样,真正取出,因此在 Disruptor 中能够很容易作到,同一个消息同时被多个消费者获取的逻辑;这里的关键就在于 每一个消费者所持有的 Sequence;框架
等待策略;Disruptor 提供了不少从等待策略,这里须要根据实际的业务需求选择使用;同时和 JDK 中的队列相比,不管是阻塞队列仍是并发队列,其控制并发的方式都是固定的,而在 Disruptor 中则能够很容易的定制这些策略,从这一点来看也能够说是实现了策略模式;ide
以上这些就是 Disruptor 的大体框架性内容了,另外有两点是 Disruptor 很快的重要缘由;源码分析
首先计算机中各级存储器的速度差别巨大,数量级描述大体以下:性能
存储器 | 容量 | 速度 |
---|---|---|
寄存器 | * / B | 1 ns |
一级 Cache | * / KB | 5 ~ 10 ns |
二级 Cache | * / KB - M | 40 ~ 60 ns |
内存 | */ M - G | 100 ~ 150 ns |
硬盘 | * / G - T | 3 ~ 15 ms |
根据上图的数据,直观的反应若是想加快软件的运行速度,固然是尽可能利用上层的缓存体系;在 JVM 中缓存不是以单字节存在的,而是以缓存行的形式,一般是 2 的整数幂个连续字节,通常为 32-256 个字节。最多见的缓存行大小是 64 个字节;
在咱们的队列,数则或者 Disruptor 中,理想状态下就是生产者和消费的速度保持相对一致,这样能避免阻塞的发生,其生产者和消费者就分别位于数组的头部和尾部;
可是这样的理想状态很难到达,要么是生产者快一些,要么是消费者快一些,其结果以下图;
因此头和尾一般都位于同一个缓存行中,这样者更新头的时候,将对应的缓存标记为失效,同时尾也被标记为了失效,者就是伪缓存;
下面是一个缓存的测试例子;
public final class FalseSharing implements Runnable { private static final int NUM_THREADS = 4; // change private static final long ITERATIONS = 500L * 1000L * 1000L; private final int arrayIndex; private static VolatileLong[] longs = new VolatileLong[NUM_THREADS]; static { for (int i = 0; i < longs.length; i++) { longs[i] = new VolatileLong(); } } public FalseSharing(final int arrayIndex) { this.arrayIndex = arrayIndex; } public static void main(final String[] args) throws Exception { final long start = System.nanoTime(); runTest(); System.out.println("duration = " + (System.nanoTime() - start)); } private static void runTest() throws InterruptedException { Thread[] threads = new Thread[NUM_THREADS]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new FalseSharing(i)); } for (Thread t : threads) { t.start(); } for (Thread t : threads) { t.join(); } } @Override public void run() { long i = ITERATIONS + 1; while (0 != --i) { longs[arrayIndex].value = i; } } public static final class VolatileLong { // public long p1, p2, p3, p4, p5, p6; // cache line padding public volatile long value = 0L; // public long p8, p9, p10, p11, p12, p13, p14, p15; // cache line padding } }
这里不一样的机器测试的结果不一样,你们能够修改线程数,padding 数,和 padding 的前后顺序;会获得不一样的结果;
我测试的结果:
无 padding :17988876300
有 padding :4667271000
能够看到是查了一个数量级
这样的缓存填充,在 Disruptor 中随处可见:
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7; } public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... }
并发的处理,一样的 Disruptor 中随处可见,虽然在平时写代码的时候也会注意,可是当状态变量多了之后,代码就会变得很复杂,不容易读懂;而在 Disruptor 中由 Sequence 串联起来的各个部分,以及策略模式的应用,使得每部分的处理同样的清晰;这里的内容太多了就不一一分析了,好比 MultiProducerSequencer 和 SingleProducerSequencer;
// SingleProducerSequencer public long next(int n) { if (n < 1) throw new IllegalArgumentException("n must be > 0"); long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } // MultiProducerSequencer public long next(int n) { if (n < 1) throw new IllegalArgumentException("n must be > 0"); long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
对 Disruptor 源码查看的最大感受是,习觉得常的结构设计模式,均可以有更精妙的写法,若是 Sequence 承担的各部分逻辑串联的角色,总体的消费者生产者模式,消费者部分能够当作是观察者模式,也能够看出是事件监听模式,以及并发控制的策略模式;两外就是包括伪缓存在内的各细节优化;