..................2015年的第一天...................html
本文代码托管在 https://github.com/hupengcool/disruptor-starterjava
关于吹牛逼的话就不说了。。。Disruptor是Java实现的用于线程间通讯的消息组件。其核心是一个Lock-free的Ringbuffer,Disruptor使用CAS而不是Lock。与大部分并发队列使用的Lock相比,CAS显然要快不少。CAS是CPU级别的指令,更加轻量,不须要像Lock同样须要OS的支持,因此每次调用不须要kernel entry,也不须要context switch。固然,使用CAS的代价是Disruptor实现的复杂程度也相对提升了。git
Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是经过Sequence实现。几乎每个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,因为须要在线程间共享,因此Sequence是引用传递,而且是线程安全的;再次,Sequence支持CAS操做;最后,为了提升效率,Sequence经过padding来避免伪共享。github
RingBuffer是存储消息的地方,经过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并无在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,能够在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已尽是须要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。api
SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间创建依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence必定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的前后顺序。由于全部消费者都依赖于RingBuffer,因此消费者的Sequence必定小于等于RingBuffer中名为cursor的Sequence,即消息必定是先被生产者放到Ringbuffer中,而后才能被消费者处理。数组
SequenceBarrier在初始化的时候会收集须要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。须要依赖其余消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到全部被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。安全
当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不一样的等待策略在延迟和CPU资源的占用上有所不一样,能够视应用场景选择:并发
BusySpinWaitStrategy : 自旋等待,相似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。框架
BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。ide
SleepingWaitStrategy : 在屡次循环尝试不成功后,选择让出CPU,等待下次调度,屡次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
YieldingWaitStrategy : 在屡次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。
在Disruptor中,消费者是以EventProcessor的形式存在的。其中一类消费者是BatchEvenProcessor。每一个BatchEvenProcessor有一个Sequence,来记录本身消费RingBuffer中消息的状况。因此,一个消息必然会被每个BatchEvenProcessor消费。
另外一类消费者是WorkProcessor。每一个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其余WorkProcessor消费。这个被WorkProcessor共享的Sequence至关于尾指针。
共享同一个Sequence的WorkProcessor可由一个WorkerPool管理,这时,共享的Sequence也由WorkerPool建立。
下面以Disruptor 3.3.0版本为例介绍Disruptor的初级使用,本文并无用那些比较原始的API,若是想知道上面写的一些api如何使用,能够参考 https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor 为了简化使用,框架提供Disruptor类来简化使用,下面主要是使用这个类来演示。
首先定义一个Event:
/** * Created by hupeng on 2015/1/1. */ public class MyEvent { private long value; public void setValue(long value) { this.value = value; } @Override public String toString() { return "MyEvent{" + "value=" + value + '}'; } }
而后提供一个EventFactory,RingBuffer经过这factory来初始化在Event。
import com.lmax.disruptor.EventFactory; /** * Created by hupeng on 2015/1/1. */ public class MyEventFactory implements EventFactory<MyEvent> { @Override public MyEvent newInstance() { return new MyEvent(); } }
而后写一个Producer类,也就是消息的生产者。
import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Created by hupeng on 2015/1/1. */ public class MyEventProducer { private RingBuffer<MyEvent> ringBuffer; public MyEventProducer(RingBuffer<MyEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg<MyEvent, Long>() { @Override public void translateTo(MyEvent event, long sequence, Long value) { event.setValue(value); } }; public void onData(final Long value) { ringBuffer.publishEvent(TRANSLATOR,value); } }
而后写一个EventHandler。这个就是咱们定义怎么处理消息的地方。
import com.lmax.disruptor.EventHandler; /** * Created by hupeng on 2015/1/1. */ public class MyEventHandler implements EventHandler<MyEvent> { @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(event); } }
主程序:
import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import disruptor.starter.support.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class MyEventMain { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); int bufferSize = 1024; Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(), bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new MyEventHandler(),new MyEventHandler()); // disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler()); //Pipeline RingBuffer<MyEvent> ringBuffer = disruptor.start(); MyEventProducer producer = new MyEventProducer(ringBuffer); for (long i = 0; i < 10; i++) { producer.onData(i); Thread.sleep(1000);// wait for task execute.... } disruptor.shutdown(); ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS); } }
在这个例子中输出
MyEvent{value=0} MyEvent{value=0} MyEvent{value=1} MyEvent{value=1} MyEvent{value=2} MyEvent{value=2} MyEvent{value=3} MyEvent{value=3} MyEvent{value=4} MyEvent{value=4} MyEvent{value=5} MyEvent{value=5} MyEvent{value=6} MyEvent{value=6} MyEvent{value=7} MyEvent{value=7} MyEvent{value=8} MyEvent{value=8} MyEvent{value=9} MyEvent{value=9}
能够看出每一个MyEventHandler(implements EventHandler)都会处理同一条消息。另外咱们还可使用相似:
disruptor.handleEventsWith(new MyEventHandler()).then(new MyEventHandler())
这样的方法来定义依赖关系,好比先执行哪一个handler再执行哪一个handler。其余好比and()详情见api
若是咱们想定义多个handler,可是同时只有一个handler处理某一条消息。能够实现WorkHandler来定义handler:
import com.lmax.disruptor.WorkHandler; /** * Created by hupeng on 2015/1/1. */ public class MyEventWorkHandler implements WorkHandler<MyEvent> { private String workerName; public MyEventWorkHandler(String workerName) { this.workerName = workerName; } @Override public void onEvent(MyEvent event) throws Exception { System.out.println(workerName + " handle event:" + event); } }
这时候咱们改一下咱们的主程序:
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); int bufferSize = 1024; Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new MyEventFactory(), bufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWithWorkerPool(new MyEventWorkHandler("worker-1"),new MyEventWorkHandler("worker-2")); RingBuffer<MyEvent> ringBuffer = disruptor.start(); MyEventProducer producer = new MyEventProducer(ringBuffer); for (long i = 0; i < 10; i++) { producer.onData(i); Thread.sleep(1000);// wait for task execute.... } disruptor.shutdown(); ExecutorsUtils.shutdownAndAwaitTermination(executorService, 60, TimeUnit.SECONDS); }
这时候咱们能够看到输出是这样的:
worker-1 handle event:MyEvent{value=0} worker-2 handle event:MyEvent{value=1} worker-1 handle event:MyEvent{value=2} worker-2 handle event:MyEvent{value=3} worker-1 handle event:MyEvent{value=4} worker-2 handle event:MyEvent{value=5} worker-1 handle event:MyEvent{value=6} worker-2 handle event:MyEvent{value=7} worker-1 handle event:MyEvent{value=8} worker-2 handle event:MyEvent{value=9}
一条消息只被一个handler处理。
这里的ExecutorsUtils就是写的一个关闭ExecutorService的方法
import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ExecutorsUtils { public static void shutdownAndAwaitTermination(ExecutorService pool,int timeout,TimeUnit unit) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(timeout/2, unit)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(timeout/2, unit)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }
概念部分来自http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html ,若是想对这个框架有更一步了解,能够点进去看看,能够参考源代码。