上篇文章已经讲过了 RingBuffer 了, RingBuffer 是消息的容器,可是 Disruptor 中最复杂的部分在于如何并发控制消息的增长和消费,而这部分由 Senquencer 来完成。java
这篇文章基于 Disruptor 官方提供的示例代码。缓存
Sequencer 能够被认为是 Disruptor 的大脑,而 Sequence 则能够认为是神经元,Sequencer 会产生信号(Sequence 中的 value)来控制消费者和生产者。在一个 Disruptor 实例中,只会有一个 Sequencer 实例,在建立 RingBuffer 时建立。微信
// 多个生产者的状况
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
// 单个生产者的状况
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
复制代码
Sequencer 接口有两种实现,SingleProducerSequencer
和 MultiProducerSequencer
,分别来处理单个生产者和多个生产者的状况。并发
在 Sequencer 中有一个 next()
方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上能够认为是一个 AtomicLong,消费者和生产者都会维护本身的 Sequence。框架
Sequence 中的 value 表示 RingBuffer 消息的编号,Disruptor 中控制逻辑都是围绕这个编号来完成的。RingBuffer 的 sequence 从 0 开始增加。这里须要注意的是在 Disruptor 中共享的并非 Sequence 对象,而是 sequence 中的 value。ide
生产者中 Sequence 的 value 表示当前消息已经生产到哪一个位置,消费者中 Sequence 的 value 表示消费者已经处理到哪一个位置。对于 Sequencer 和 Sequence 已经介绍清楚了,那么 Sequencer 是怎么运行的呢?工具
RingBuffer 是消息的容器,为了让消息可以被正常传递,RingBuffer 须要知足两个要求,第一个是对于全部的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。spa
Sequencer 的核心就是解决了这两个问题,经过 Gating
和 Barrier
两个工具。设计
Gating 经过 RingBuffer.addGatingSequences()
方法来获取,Barrier 经过 RingBuffer.newBarrier()
方法来获取。code
上图中 C 表明消费者,P 表明生产者。
须要说明的是,EventProcessor + EventHandler 才是一个完整的消费者。EventProcessor 中会维护一个 Sequence 对象,记录该消费者处理到哪条消息,每一个消费者维护本身的 Sequence 生产者的 Sequence 在 RingBuffer 维护
Gating 的设计其实很简单,其实就是将多个全部消费者的 Sequence 监控起来,而后在生产者向 RingBuffer 中写入数据时,判断是否有足够的空间来存入新的消息。
全部消费者的 Sequence 经过以下的方法调用路径,最后存入到 Sequencer.gatingSequences 变量中。
Disruptor.handleEventsWith() -> RingBuffer.addGatingSequences() -> Sequencer.addGatingSequences()
Sequencer.next() 中会对 gatingSequences 进行判断,具体判断的逻辑就是看当前这些被监控的 Sequence 中最小的 value 是否已经落后一圈了,落后一圈就表示新的消息没有写入的空间:
// MultiProducerSequencer.next() 方法
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize; // 获取一圈以前的值
long cachedGatingSequence = gatingSequenceCache.get(); // 获取缓存的 gatingSequences 的最小值
// 若是大于缓存的值,则进行进一步的判断
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// 获取当前实际最小的 sequence
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 若是比实际的最小 sequqnce 还大,说明已经没有位置了,则继续进行自旋(无限循环)
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next)) // 若是比缓存的值小,说明目前还有空闲位置,能够继续存入消息
{
break;
}
}
while (true); // 这是一个无限循环,直到有新的空间能够存入消息
复制代码
若是没有足够的空间,那么 next() 方法就会被阻塞,新的消息没法加入到 RingBuffer 中。
上面是 gating 的示意图,c1 和 c2 处理的速度不同,c1 在 1 的位置上,而 c2 在 2 的位置上,生产者 P 已经没法在向 RingBuffer 中添加新的消息,所以会被阻塞,直到 c1 将 消息处理完成以后才能继续插入消息。
同时对于消费者来讲,必须等到 RingBuffer 中有消息才能进行处理。 经过 SequenceBarrier 来进行管理, SequenceBarrier 实际生成的是 ProcessingSequenceBarrier
实例,按照以下的调用路径来初始化:
RingBuffer.newBarrier() -> Sequencer.newBarrier() -> new ProcessingSequenceBarriser()
消费者从 RingBuffer 中获取消息时,须要经过 SenquencerBarrier 来肯定是否有可用的消息, 使用 SequencerBarrier 的调用路径以下:
BatchEventProcessor.processEvents() -> sequenceBarrier.waitFor()
BatchEventProcessor 是默认使用的消费者,上面咱们说到了 EventProcessor + EventHandler 才是一个完整的消费者。用户本身实现 EventHandler 来处理消息的逻辑。而实际从 RingBuffer 中获取消息的逻辑则在 BatchEventProcessor 中实现,关键代码以下:
// BatchEventProcessor.processEvents() 方法,删除了部分代码
while (true)
{
try
{
// 获取可用消息的最大值
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
// 若是当前的位置小于可用的位置,说明有消息能够处理,进行消息处理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); // 调用实际的 Handler 处理消息
nextSequence++;
}
sequence.set(availableSequence); // 将本身的 sequence 设置处理完成的位置
}
}
复制代码
若是没有获取到可处理的 sequence, 那么当前的处理消息的 handlers 也会被阻塞。
SequenceBarrier 除了能够控制消费者从 RingBuffer 取数据以外,还能够控制多个消费者执行的顺序。若是要安排消费者执行的顺序,用以下的代码就能够。
disruptor.handleEventsWith(new LongEventHandler()).then(new AnOtherLongEventHandler());
复制代码
上面的代码表示 AnotherLongEventHandler
须要等 LongEventHandler
处理完成以后,才能对消息进行处理。
消费者之间控制依赖关系其实就是控制 sequence 的大小,若是说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值必定小于等于 C1 的 Sequence。
其中 then 关系是经过 Disruptor.updateGatingSequencesForNextInChain() 方法来实现:
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
if (processorSequences.length > 0)
{
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
复制代码
其实 Disruptor 控制的秘密就是这些了,其实也不是很复杂,只是实现的方式很巧妙,再加上并发控制没有使用锁,才造就了一个如此高效的框架。
关注微信公众号,聊点其余的