Disruptor源码系列-Sequencer

上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,可是 Disruptor 中最复杂的部分在于如何并发控制消息的增长和消费,而这部分由 Senquencer 来完成。java

这篇文章基于 Disruptor 官方提供的示例代码。缓存

Sequencer 简介

Sequencer 能够被认为是 Disruptor 的大脑,而 Sequence 则能够认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在建立 RingBuffer 时建立。微信

// 多个生产者的状况
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
// 单个生产者的状况
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
复制代码

Sequencer 接口有两种实现,SingleProducerSequencerMultiProducerSequencer,分别来处理单个生产者和多个生产者的状况。并发

在 Sequencer 中有一个 next() 方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上能够认为是一个 AtomicLong,消费者和生产者都会维护本身的 Sequence。框架

Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增加。这里须要注意的是在 Disruptor 中共享的并非 Sequence 对象,而是 sequence 中的 value。ide

生产者中 Sequence 的 value 表示当前消息已经生产到哪一个位置,消费者中 Sequence 的 value 表示消费者已经处理到哪一个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?工具

RingBuffer 是消息的容器,为了让消息可以被正常传递,RingBuffer 须要知足两个要求,第一个是对于全部的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。spa

Sequencer 的核心就是解决了这两个问题,经过 GatingBarrier 两个工具。设计

Gating 经过 RingBuffer.addGatingSequences() 方法来获取,Barrier 经过 RingBuffer.newBarrier() 方法来获取。code

上图中 C 表明消费者,P 表明生产者。

须要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每一个消费者维护本身的 Sequence 生产者的 Sequence 在 RingBuffer 维护

Gating 实现

Gating 的设计其实很简单,其实就是将多个全部消费者的 Sequence 监控起来,而后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。

全部消费者的 Sequence 经过以下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。

Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()

Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:

// MultiProducerSequencer.next() 方法
do
{
    current = cursor.get();
    next = current + n;

    long wrapPoint = next - bufferSize; // 获取一圈以前的值
    long cachedGatingSequence = gatingSequenceCache.get(); // 获取缓存的 gatingSequences 的最小值
    // 若是大于缓存的值,则进行进一步的判断
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
    {
        // 获取当前实际最小的 sequence
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
        // 若是比实际的最小 sequqnce 还大,说明已经没有位置了,则继续进行自旋(无限循环)
        if (wrapPoint > gatingSequence)
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }
        gatingSequenceCache.set(gatingSequence);
    }
    else if (cursor.compareAndSet(current, next)) // 若是比缓存的值小,说明目前还有空闲位置,能够继续存入消息
    {
        break;
    }
}
while (true); // 这是一个无限循环,直到有新的空间能够存入消息
复制代码

若是没有足够的空间,那么 next() 方法就会被阻塞,新的消息没法加入到 RingBuffer 中。

上面是 gating 的示意图,c1 和 c2 处理的速度不同,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经没法在向 RingBuffer 中添加新的消息,所以会被阻塞,直到 c1 将 消息处理完成以后才能继续插入消息。

SequencerBarrier 实现

同时对于消费者来讲,必须等到 RingBuffer 中有消息才能进行处理。 经过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier 实例,按照以下的调用路径来初始化:

RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()

消费者从 RingBuffer 中获取消息时,须要经过 SenquencerBarrier 来肯定是否有可用的消息, 使用 SequencerBarrier 的调用路径以下:

BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()

BatchEventProcessor 是默认使用的消费者,上面咱们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户本身实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码以下:

// BatchEventProcessor.processEvents() 方法,删除了部分代码
while (true)
{
    try
    {
        // 获取可用消息的最大值
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
        // 若是当前的位置小于可用的位置,说明有消息能够处理,进行消息处理
        while (nextSequence <= availableSequence)
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 调用实际的 Handler 处理消息
            nextSequence++;
        }
        sequence.set(availableSequence); // 将本身的 sequence 设置处理完成的位置
    }
}
复制代码

若是没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。

SequenceBarrier 除了能够控制消费者从 RingBuffer 取数据以外,还能够控制多个消费者执行的顺序。若是要安排消费者执行的顺序,用以下的代码就能够。

disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());
复制代码

上面的代码表示 AnotherLongEventHandler 须要等 LongEventHandler 处理完成以后,才能对消息进行处理。

消费者之间控制依赖关系其实就是控制 sequence 的大小,若是说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值必定小于等于 C1 的 Sequence。

其中 then 关系是经过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:

private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
    if (processorSequences.length > 0)
    {
        ringBuffer.addGatingSequences(processorSequences);
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}
复制代码

其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。

关注微信公众号,聊点其余的

相关文章
相关标签/搜索