Disruptor
Disruptor是java圈子里著名的并发队列,它是一个基于生产者-消费者模型,并优化了cpu伪共享的高性能队列。要理解disruptor须要理解一下几个概念:RingBuffer,Sequence,Sequencer,SequenceBarrier。 java
RingBuffer
RingBuffer是disruptor中用来存数据的环形数组。Disruptor的基本数据结构就是一个循环队列。学习过数据结构的同窗都知道循环队列是一个基于数组的队列,用一个变量来表示队头位置下标,另外一个变量来表示队尾位置下标。当位置下标到达数组末尾的时候,下标的下一个位置就移动到数组开头,例如jdk中的java.util.concurrent.ArrayBlockingQueue,它用putIndex来表示队尾位置,用takeIndex来表示队头位置,当队头或队尾到达数组末尾的时候,被置为数组开头位置。下面是java.util.concurrent.ArrayBlockingQueue中的代码。数组
/** items index for next take, poll, peek or remove */ int takeIndex; 队头位置 /** items index for next put, offer, or add */ int putIndex; 队尾位置
private void enqueue(E x) { 入队方法 // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; 队尾入队数据 if (++putIndex == items.length) 若是已经到了数组的最后一个位置 putIndex = 0; 位置置于数组开头 count++; notEmpty.signal(); }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; 队头出队数据 items[takeIndex] = null; 若是已经到了数组的最后一个位置 if (++takeIndex == items.length) 位置置于数组开头 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
Sequence
Sequence是disruptor对队列中表示位置的下标位置的抽象。为何要用一个类而不是一个整型来表示数组的下标位置呢?这是由于disruptor在试图解决cpu伪共享问题。CPU伪共享简单讲就是在不一样cpu核的多个线程他们分别在本身的cache中缓存了同一个变量,当一个线程修改了这个变量将会使用MESI协议将别的线程缓存了相同变量的cache-line失效。若是多个线程高频修改一个变量可能会相互影响使得cpu缓存的做用大打折扣。那怎么才能尽可能避免这种状况呢?disruptor的作法是让每一个消费者都维护着本身的sequence,而且sequence作了cache-line填充,使得每一个sequence将占用整个cache-line。通常来讲一个cache-line是64个字节,用一个long来表示位置,那么就须要8个long。所以,disruptor在表示下标的long变量先后都放置了7个long,这样当读取value时,不管从哪一个方向读取64个字节都能保证cache-line被填充。下面是com.lmax.disruptor.Sequence的代码。缓存
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; 填充 } class Value extends LhsPadding { protected volatile long value; 真正的值 } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; 填充 }
Sequencer
Sequencer是用来协调生产者进度和消费者进度的。消费者不能跑到生产者前面去了,生产者也不能超过消费者一圈。AbstractSequencer有3个重要的参数,cursor表示的生产者的位置,gatingSequences表示的是末端消费者的位置,waitstrategy表示当没有数据给消费者时,消费者的等待行为。下面是com.lmax.disruptor.AbstractSequencer的代码。数据结构
protected final WaitStrategy waitStrategy; 等待策略 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 生产者位置 protected volatile Sequence[] gatingSequences = new Sequence[0]; 消费者位置
生产者在生产数据的时候有2个步骤,第一步,获取新数据的位置;第二步,插入数据并发布,发布后的数据就能够被消费了。其中第一步对应着next方法,第二步对应着publish方法。依据生产者是单线程的仍是多线程的,Sequencer被分为MultiProducerSequencer和SingleProducerSequencer,这2种Sequencer大致逻辑类似但又有差异。
MultiProducerSequencer的next方法中,首先获取生产者位置并加上n再减去buffersize,将他和消费者位置比较,若是大于则代表生产者超过了消费者一圈,这是不可行的,不然是可行的就用cas更新生产者位置。获取消费者位置的时候并非从gatingSequences中直接获取最小的那个,而是经过一个gatingSequenceCache来获取的,这是由于sequence是一个频繁改变,被多个线程操做的对象,而且每次去获取都要去找最小值,为了减小没必要要的获取,每次从gatingSequences中获取一次最小值时将其缓存起来,在生产者没有追到这个缓存的最小值前,能够不用去获取最新的最小值。当追上这个最小值的时候,就须要从gatingSequences中获取最小值,若是生产者仍是超过了一圈那么就暂停一下,再重复以上操做,不然就将最小值赋值给gatingSequenceCache并重复以上操做。下面是com.lmax.disruptor.MultiProducerSequencer#next(int)的代码。多线程
do { current = cursor.get(); 生产者当前位置 next = current + n; 插入n个新数据后的位置 long wrapPoint = next - bufferSize; 新位置减去ringbuffer长度 long cachedGatingSequence = gatingSequenceCache.get(); 消费者位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈 { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); 获取最新的消费者位置 if (wrapPoint > gatingSequence) 若是仍是超过一圈则等待并重试 { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); 更新缓存的消费者位置 } else if (cursor.compareAndSet(current, next)) 更新生产者位置 { break; } } while (true);
SingleProducerSequencer的next方法的逻辑和MultiProducerSequencer差很少。不一样的是因为只有一个生产者线程,所以SingleProducerSequencer直接使用了一个long的nextValue来表示可生产数据的位置,一个long的cachedValue来表示消费者位置缓存。下面是com.lmax.disruptor.SingleProducerSequencer#next(int)的代码。并发
long nextValue = this.nextValue; 获取可生产数据的最小位置 long nextSequence = nextValue + n; 插入n个新数据后的位置 long wrapPoint = nextSequence - bufferSize; 新位置减去ringbuffer长度 long cachedGatingSequence = this.cachedValue; 消费者位置 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) wrapPoint > cachedGatingSequence表示生产者超过了消费者一圈 { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) 获取消费者最新位置,若是仍是超过一圈则等待并重试 { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; 保存消费者位置 } this.nextValue = nextSequence; 保存生产者位置
2种Sequencer的获取新数据位置的逻辑类似,可是发布数据的逻辑却彻底不同。SingleProducerSequencer的发布逻辑较为简单,publish方法中直接更新生产者位置cursor,注意cursor和nextValue的差异,nextValue用来获取新数据的位置,而cursor是已经发布的数据的位置,对于消费者来讲cursor才是真正的生产者位置。下面是com.lmax.disruptor.SingleProducerSequencer#publish(long)的代码。ide
@Override public void publish(long sequence) { cursor.set(sequence); 更新生成者位置 waitStrategy.signalAllWhenBlocking(); 通知等待的消费者消费 }
这种方式用在MultiProducerSequencer上显然是不合适的,由于一个生产者发布可能会致使其余生产者也发布了。事实上,MultiProducerSequencer在next方法中就直接更新了cursor。MultiProducerSequencer用一个长度和ringbuffer相同的数组availableBuffer来跟踪数据的发布状态。下面是com.lmax.disruptor.MultiProducerSequencer发布相关的代码。函数
@Override public void publish(final long sequence) { setAvailable(sequence); 设置改位置的状态 waitStrategy.signalAllWhenBlocking(); } private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }
能够看到使用cursor来获取消费者可消费的最大位置是不合适的了。在Sequencer中有个getHighestPublishedSequence(long lowerBound, long availableSequence)方法用来返回能够被消费的最大位置。对于SingleProducerSequencer因为是发布时更新cursor,所以能够直接返回availableSequence;对于MultiProducerSequencer是在availableBuffer的[lowerBound,availableSequence]区间上找到最小的已发布位置。性能
SequenceBarrier
SequenceBarrier是协调消费者的进度和它依赖的进度的。这里说依赖是由于消费者自己是有层级的,第一层的消费者依赖(不超过)生产者的进度,第二层的消费者依赖(不超过)第一层的消费进度。从构造方法能够看出当传入一个长度为0的dependentSequences数组时,该barrier的dependentSequence就是生产者的位置。若是大于0就用FixedSequenceGroup包装一下dependentSequences数组,FixedSequenceGroup的get方法返回的就是dependentSequences数组的最小值。下面是com.lmax.disruptor.ProcessingSequenceBarrier的构造函数。学习
ProcessingSequenceBarrier( final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) 若是dependentSequences长度为0,就依赖生产者进度 { dependentSequence = cursorSequence; } else { dependentSequence = new FixedSequenceGroup(dependentSequences); } }
SequenceBarrier的核心方法就是waitFor(final long sequence)。该方法是用来等待入参sequence变成可消费状态的。使用waitStrategy来等待并获取一个有效的sequence,在waitstrategy的全部实现中,这个返回值其实就是dependentSequence。最后经过Sequencer的getHighestPublishedSequence方法获取[sequence,dependentSequence]区间内可消费的最大位置。下面是com.lmax.disruptor.ProcessingSequenceBarrier#waitFor(final long sequence)的代码。
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); 等待sequence有效并返回dependentSequence位置 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } 返回[sequence,dependentSequence]最小已发布位置 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
WaitStrategy是消费者等待消费的动做。判断sequence是否有效的方法是和dependentSequence比较,当且仅当sequence小于等于dependentSequence时有效。 好比以下代码:
while ((availableSequence = dependentSequence.get()) < sequence) { 等待 } return availableSequence;
消费者
Disruptor的消费者有2种一种是WokerHandler一种是EventHandler。能够设置多个WokerHandler,多个WokerHandler会一块儿去处理全部的数据,也能够设置多个EventHandler,多个EventHandler会分别处理全部的数据。
WokerHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)方法用来建立。该方法会建立一个workPool,workPool里面有sequenceBarrier,除此外workPool里还有一个workSequence。每一个workHandler会建立一个workProcessor,workSequence也会传入workProcessor的构造方法。下面是com.lmax.disruptor.WorkerPool部分代码。
private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; // WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors; @SafeVarargs public WorkerPool( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( 建立WorkHandler执行器 ringBuffer, sequenceBarrier, 协调消费进度 workHandlers[i], exceptionHandler, workSequence); 全部WorkHandler使用一个workSequence } }
WorkProcessor能够当作是workHandler的执行者,他的核心方法是run。run方法中使用CAS从workSequence中获取要消费的下标,能够看出workPool中的全部workProcessor是从同一个workSequence中获取的,所以一个workPool里的workHandler是共同消费数据的。当成功获取到须要nextSequence后,将其于cachedAvailableSequence比较,若是小于等于cachedAvailableSequence表示能够消费,不然使用sequenceBarrier等待并从新获取依赖(能够先理解为生产者)的最大可消费位置。这里cachedAvailableSequence和sequencer中提到了gatingSequenceCache思路是同样的,为了避免用每次都去获取,每次获取后将其保存一块儿来,消费者还没消费到这个位置的时候,能够不用去获取,由于这时消费者必定没有超过依赖。
while (true) { try { 省略注解 if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; 从workSequence的下一个位置 sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); 竞争这个位置 } if (cachedAvailableSequence >= nextSequence) 若是这个位置小于缓存的依赖的位置 { event = ringBuffer.get(nextSequence); 获取这个位置的数据并消费 workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); 等待并获取最大可消费位置 } } 省略部分代码 }
EventHandler是由com.lmax.disruptor.dsl.Disruptor#handleEventsWith(final EventHandler<? super T>... handlers)方法建立的。每个EventHandler会被建立为一个BatchEventProcessor。BatchEventProcessor的核心方法是processEvents方法。该方法中就是使用sequencerBarrier去获取了依赖的最新位置,而后从直接当前位置一直消费到依赖最新的位置。这和WorkProcessor是不一样的,由于BatchEventProcessor中的sequence各自增加互不影响,而WorkProcessor的sequence都是从workSequence中去争抢,因此多个EventHandler是分别消费全部的数据。下面是com.lmax.disruptor.BatchEventProcessor#processEvents方法代码。
while (true) { try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); 直接获取最大可消费位置 if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) 直接消费到最大可消费位置 { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); 一匹消费完后才更新消费进度 }
Disruptor中handleEventsWit方法和handleEventsWithWorkerPool方法建立ProcessingSequenceBarrier时传入的dependentSequences都是长度为0的Sequence数组,这样建立的ProcessingSequenceBarrier的dependentSequences就是生产者的位置,这样建立出来的消费者就是依赖于生产者进度的。这2个方法都返回EventHandlerGroup,它包含了表示消费者进度的Sequence数组,当使用EventHandlerGroup建立消费者时就会使用该Sequence数组做为参数建立ProcessingSequenceBarrier,这样建立出来的消费者就会依赖前一个消费者的消费进度。Disruptor总体运行以下图所示:
其中生产者的位置是7,生产者的gatingSeq指向消费者依赖图中的最末端的消费者的seq,表示生产者不能超过最末端的消费者;workpool有2个workhandler分别在2,3,workpool的seqbarrier指向生产者,表示workpool不能超过生产者;eventhandler目前消费到位置是1,他的seqbarrier指向workpool表示其消费进度不能超过workpool。