过气网红Disruptor源码分析

Disruptor

Disruptor是java圈子里著名的并发队列,它是一个基于生产者-消费者模型,并优化了cpu伪共享的高性能队列。要理解disruptor须要理解一下几个概念:RingBufferSequenceSequencerSequenceBarrierdisruptorjava

RingBuffer

RingBuffer是disruptor中用来存数据的环形数组。Disruptor的基本数据结构就是一个循环队列。学习过数据结构的同窗都知道循环队列是一个基于数组的队列,用一个变量来表示队头位置下标,另外一个变量来表示队尾位置下标。当位置下标到达数组末尾的时候,下标的下一个位置就移动到数组开头,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex来表示队尾位置,用takeIndex来表示队头位置,当队头或队尾到达数组末尾的时候,被置为数组开头位置。下面是java.util.concurrent.ArrayBlockingQueue中的代码。数组

/** items index for next take, poll, peek or remove */
int takeIndex;  队头位置
/** items index for next put, offer, or add */
int putIndex;   队尾位置
private void enqueue(E x) {                 入队方法
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;                   队尾入队数据
    if (++putIndex == items.length)        若是已经到了数组的最后一个位置
        putIndex = 0;                      位置置于数组开头
    count++;
    notEmpty.signal();
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];           队头出队数据
    items[takeIndex] = null;              若是已经到了数组的最后一个位置
    if (++takeIndex == items.length)      位置置于数组开头
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

Sequence

Sequence是disruptor对队列中表示位置的下标位置的抽象。为何要用一个类而不是一个整型来表示数组的下标位置呢?这是由于disruptor在试图解决cpu伪共享问题。CPU伪共享简单讲就是在不一样cpu核的多个线程他们分别在本身的cache中缓存了同一个变量,当一个线程修改了这个变量将会使用MESI协议将别的线程缓存了相同变量的cache-line失效。若是多个线程高频修改一个变量可能会相互影响使得cpu缓存的做用大打折扣。那怎么才能尽可能避免这种状况呢?disruptor的作法是让每一个消费者都维护着本身的sequence,而且sequence作了cache-line填充,使得每一个sequence将占用整个cache-line。通常来讲一个cache-line是64个字节,用一个long来表示位置,那么就须要8个long。所以,disruptor在表示下标的long变量先后都放置了7个long,这样当读取value时,不管从哪一个方向读取64个字节都能保证cache-line被填充。下面是com.lmax.disruptor.Sequence的代码。缓存

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;            填充
}

class Value extends LhsPadding
{
    protected volatile long value;                        真正的值
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;      填充
}

Sequencer

Sequencer是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。AbstractSequencer有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为。下面是com.lmax.disruptor.AbstractSequencer的代码。数据结构

protected final WaitStrategy waitStrategy;                                           等待策略
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);      生产者位置
protected volatile Sequence[] gatingSequences = new Sequence[0];                     消费者位置

生产者在生产数据的时候有2个步骤,第一步,获取新数据的位置;第二步,插入数据并发布,发布后的数据就能够被消费了。其中第一步对应着next方法,第二步对应着publish方法。依据生产者是单线程的仍是多线程的,Sequencer被分为MultiProducerSequencer和SingleProducerSequencer,这2种Sequencer大致逻辑类似但又有差异。
MultiProducerSequencer的next方法中,首先获取生产者位置并加上n再减去buffersize,将他和消费者位置比较,若是大于则代表生产者超过了消费者一圈,这是不可行的,不然是可行的就用cas更新生产者位置。获取消费者位置的时候并非从gatingSequences中直接获取最小的那个,而是经过一个gatingSequenceCache来获取的,这是由于sequence是一个频繁改变,被多个线程操做的对象,而且每次去获取都要去找最小值,为了减小没必要要的获取,每次从gatingSequences中获取一次最小值时将其缓存起来,在生产者没有追到这个缓存的最小值前,能够不用去获取最新的最小值。当追上这个最小值的时候,就须要从gatingSequences中获取最小值,若是生产者仍是超过了一圈那么就暂停一下,再重复以上操做,不然就将最小值赋值给gatingSequenceCache并重复以上操做。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代码。多线程

do
{
    current = cursor.get();                                                         生产者当前位置
    next = current + n;                                                             插入n个新数据后的位置

    long wrapPoint = next - bufferSize;                                             新位置减去ringbuffer长度
    long cachedGatingSequence = gatingSequenceCache.get();                          消费者位置

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)         wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
    {
        long gatingSequence = Util.getMinimumSequence(gatingSequences, current);    获取最新的消费者位置

        if (wrapPoint > gatingSequence)                                             若是仍是超过一圈则等待并重试
        {
            LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
            continue;
        }

        gatingSequenceCache.set(gatingSequence);                                   更新缓存的消费者位置
    }
    else if (cursor.compareAndSet(current, next))                                  更新生产者位置
    {
        break;
    }
}
while (true);

SingleProducerSequencer的next方法的逻辑和MultiProducerSequencer差很少。不一样的是因为只有一个生产者线程,所以SingleProducerSequencer直接使用了一个long的nextValue来表示可生产数据的位置,一个long的cachedValue来表示消费者位置缓存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代码。并发

long nextValue = this.nextValue;                                                  获取可生产数据的最小位置               

long nextSequence = nextValue + n;                                                插入n个新数据后的位置
long wrapPoint = nextSequence - bufferSize;                                       新位置减去ringbuffer长度
long cachedGatingSequence = this.cachedValue;                                     消费者位置

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)         wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈
{
    cursor.setVolatile(nextValue);  // StoreLoad fence

    long minSequence;
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  获取消费者最新位置,若是仍是超过一圈则等待并重试
    {
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }

    this.cachedValue = minSequence;                                              保存消费者位置
}

this.nextValue = nextSequence;                                                   保存生产者位置

2种Sequencer的获取新数据位置的逻辑类似,可是发布数据的逻辑却彻底不同。SingleProducerSequencer的发布逻辑较为简单,publish方法中直接更新生产者位置cursor,注意cursor和nextValue的差异,nextValue用来获取新数据的位置,而cursor是已经发布的数据的位置,对于消费者来讲cursor才是真正的生产者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代码。ide

@Override
public void publish(long sequence)
{
    cursor.set(sequence);                  更新生成者位置
    waitStrategy.signalAllWhenBlocking();  通知等待的消费者消费
}

这种方式用在MultiProducerSequencer上显然是不合适的,由于一个生产者发布可能会致使其余生产者也发布了。事实上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一个长度和ringbuffer相同的数组availableBuffer来跟踪数据的发布状态。下面是com.lmax.disruptor.MultiProducerSequencer发布相关的代码。函数

@Override
public void publish(final long sequence)
{
    setAvailable(sequence);                             设置改位置的状态
    waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{
    long bufferAddress = (index * SCALE) + BASE;
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

能够看到使用cursor来获取消费者可消费的最大位置是不合适的了。在Sequencer中有个getHighestPublishedSequence(long lowerBound, long availableSequence)方法用来返回能够被消费的最大位置。对于SingleProducerSequencer因为是发布时更新cursor,所以能够直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。性能


SequenceBarrier

SequenceBarrier是协调消费者的进度和它依赖的进度的。这里说依赖是由于消费者自己是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法能够看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。若是大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的构造函数。学习

ProcessingSequenceBarrier(
    final Sequencer sequencer,
    final WaitStrategy waitStrategy,
    final Sequence cursorSequence,
    final Sequence[] dependentSequences)
{
    this.sequencer = sequencer;
    this.waitStrategy = waitStrategy;
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length)            若是dependentSequences长度为0,就依赖生产者进度
    {
        dependentSequence = cursorSequence;
    }
    else
    {
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

SequenceBarrier的核心方法就是waitFor(final long sequence)。该方法是用来等待入参sequence变成可消费状态的。使用waitStrategy来等待并获取一个有效的sequence,在waitstrategy的全部实现中,这个返回值其实就是dependentSequence。最后经过Sequencer的getHighestPublishedSequence方法获取[sequence,dependentSequence]区间内可消费的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代码。

public long waitFor(final long sequence)
    throws AlertException, InterruptedException, TimeoutException
{
    checkAlert();
    等待sequence有效并返回dependentSequence位置
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);        

    if (availableSequence < sequence)
    {
        return availableSequence;
    }
    返回[sequence,dependentSequence]最小已发布位置
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

WaitStrategy是消费者等待消费的动做。判断sequence是否有效的方法是和dependentSequence比较,当且仅当sequence小于等于dependentSequence时有效。 好比以下代码:

while ((availableSequence = dependentSequence.get()) < sequence)
{
    等待
}
return availableSequence;

消费者

Disruptor的消费者有2种一种是WokerHandler一种是EventHandler。能够设置多个WokerHandler,多个WokerHandler会一块儿去处理全部的数据,也能够设置多个EventHandler,多个EventHandler会分别处理全部的数据。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用来建立。该方法会建立一个workPool,workPool里面有sequenceBarrier,除此外workPool里还有一个workSequence。每一个workHandler会建立一个workProcessor,workSequence也会传入workProcessor的构造方法。下面是com.lmax.disruptor.WorkerPool部分代码。

private final AtomicBoolean started = new AtomicBoolean(false);
private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
// WorkProcessors are created to wrap each of the provided WorkHandlers
private final WorkProcessor<?>[] workProcessors;
@SafeVarargs
public WorkerPool(
    final RingBuffer<T> ringBuffer,
    final SequenceBarrier sequenceBarrier,
    final ExceptionHandler<? super T> exceptionHandler,
    final WorkHandler<? super T>... workHandlers)
{
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];

    for (int i = 0; i < numWorkers; i++)
    {
        workProcessors[i] = new WorkProcessor<>(            建立WorkHandler执行器
            ringBuffer,
            sequenceBarrier,                                协调消费进度
            workHandlers[i],
            exceptionHandler,
            workSequence);                                  全部WorkHandler使用一个workSequence
    }
}

WorkProcessor能够当作是workHandler的执行者,他的核心方法是run。run方法中使用CAS从workSequence中获取要消费的下标,能够看出workPool中的全部workProcessor是从同一个workSequence中获取的,所以一个workPool里的workHandler是共同消费数据的。当成功获取到须要nextSequence后,将其于cachedAvailableSequence比较,若是小于等于cachedAvailableSequence表示能够消费,不然使用sequenceBarrier等待并从新获取依赖(能够先理解为生产者)的最大可消费位置。这里cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是同样的,为了避免用每次都去获取,每次获取后将其保存一块儿来,消费者还没消费到这个位置的时候,能够不用去获取,由于这时消费者必定没有超过依赖。

while (true)
{
    try
    {
        省略注解
        if (processedSequence)
        {
            processedSequence = false;
            do
            {
                nextSequence = workSequence.get() + 1L;                            从workSequence的下一个位置
                sequence.set(nextSequence - 1L);
            }
            while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));  竞争这个位置
        }
                                                                           
        if (cachedAvailableSequence >= nextSequence)                               若是这个位置小于缓存的依赖的位置
        {
            event = ringBuffer.get(nextSequence);                                  获取这个位置的数据并消费
            workHandler.onEvent(event);
            processedSequence = true;
        }
        else
        {
            cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);      等待并获取最大可消费位置
        }
    }
    省略部分代码
}

EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法建立的。每个EventHandler会被建立为一个BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。该方法中就是使用sequencerBarrier去获取了依赖的最新位置,而后从直接当前位置一直消费到依赖最新的位置。这和WorkProcessor是不一样的,由于BatchEventProcessor中的sequence各自增加互不影响,而WorkProcessor的sequence都是从workSequence中去争抢,因此多个EventHandler是分别消费全部的数据。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代码。

while (true)
{
    try
    {
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);                直接获取最大可消费位置
        if (batchStartAware != null)
        {
            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
        }

        while (nextSequence <= availableSequence)                                           直接消费到最大可消费位置
        {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
            nextSequence++;
        }

        sequence.set(availableSequence);                                                   一匹消费完后才更新消费进度
    }

Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法建立ProcessingSequenceBarrier时传入的dependentSequences都是长度为0的Sequence数组,这样建立的ProcessingSequenceBarrier的dependentSequences就是生产者的位置,这样建立出来的消费者就是依赖于生产者进度的。这2个方法都返回EventHandlerGroup,它包含了表示消费者进度的Sequence数组,当使用EventHandlerGroup建立消费者时就会使用该Sequence数组做为参数建立ProcessingSequenceBarrier,这样建立出来的消费者就会依赖前一个消费者的消费进度。Disruptor总体运行以下图所示:
disruptor 其中生产者的位置是7,生产者的gatingSeq指向消费者依赖图中的最末端的消费者的seq,表示生产者不能超过最末端的消费者;workpool有2个workhandler分别在2,3,workpool的seqbarrier指向生产者,表示workpool不能超过生产者;eventhandler目前消费到位置是1,他的seqbarrier指向workpool表示其消费进度不能超过workpool。

相关文章
相关标签/搜索