下面以一个简单的例子来看看Disruptor的用法:生产者发送一个long型的消息,消费者接收消息并打印出来。java
首先,咱们定义一个Event:git
public class LongEvent { private long value; public void set(long value) { this.value = value; } }
为了使Disruptor对这些Event提早分配,咱们须要建立一个EventFactory:github
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
事件已经定义好了,咱们须要建立一个消费者来处理这些消息。咱们须要消费者在终端打印接收到的消息的值:网络
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); } }
咱们须要建立一个事件源,咱们假设数据来是来自一些I/O设备,好比网络或文件。app
import com.lmax.disruptor.RingBuffer; public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
显而易见的是,事件发布比使用简单队列更为复杂,这是事件预分配的缘故,若是2个生产者发布消息,即在RingBuffer中声明插槽发布可用数据,并且须要在try/finally块中发布。若是咱们在RingBuffer中申请了一个插槽(RingBuffer.next()),那么咱们必须发布这个Sequence,若是没有发布或者发布失败,那么Disruptor的将会failed,具体点来说,在多生产者的状况下,这将致使消费者失速,并且除了重启没有其余办法能够解决了。异步
使用version3.0的Translator性能
Disruptor的version3.0给开发者提供了Lambda表达式风格的API,将RingBuffer的复杂性封装起来。因此,对于3.0之后的首选方法是经过API中发布事件的Translator部分来发布事件,例如:单元测试
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.EventTranslatorOneArg; public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
这种方法的另外一个优势是能够将Translator代码拖到单独的类中,并方便对其进行单元测试。Disruptor提供了不少不一样的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg等等)能够提供Translator。缘由是能够表示为静态类或者非捕获的lambda做为参数经过Translator传递给RingBuffer。测试
最后一步是将全部的东西串联起来,能够手动的链接全部的组件,可是可能会有点复杂,所以能够经过DSL来简化构建,一些复杂的选项不是经过DSL提供的,可是能够适用于大多数状况。优化
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event)); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
注意再也不须要一些类(例如处理程序、翻译程序),还须要注意lambda用于publishEvent()
指的是传入的参数,若是咱们把代码写成:
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); }
这将建立一个可捕获的lambda,这意味须要经过publishEvent()
实例化一个对象以保存ByteBuffer bb
,这将建立额外的垃圾,为了下降GC压力,则将调用传递给lambda的调用应该是首选。
方法的引用能够用lambda来代替,fashion的写法:
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.set(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventMain::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventMain::translate, bb); Thread.sleep(1000); } } }
使用上述方法在最经常使用的场景中已经够用了,可是若是你但愿追求极致,还可以针对须要运行的硬件和软件,利用一些优化选项来提升性能。调优主要有两种方法:单或多生产者和可选等待策略。
一个在concurrent系统提升性能的最好方式是单个Write的原则,这一样也适用于Disruptor,若是你在这种只有一个单线程的生产者发送Event的的Disruptor中,那么你能利用这个来得到额外的性能。
public class LongEventMain { public static void main(String[] args) throws Exception { //..... // Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor( factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor); //..... } }
能有多少性能优点能够经过 OneToOne performance test测试,Tests运行在i7 Sandy Bridge MacBook Air。
多生产者:
Run 0, Disruptor=26,553,372 ops/sec Run 1, Disruptor=28,727,377 ops/sec Run 2, Disruptor=29,806,259 ops/sec Run 3, Disruptor=29,717,682 ops/sec Run 4, Disruptor=28,818,443 ops/sec Run 5, Disruptor=29,103,608 ops/sec Run 6, Disruptor=29,239,766 ops/sec
单生产者:
Run 0, Disruptor=89,365,504 ops/sec Run 1, Disruptor=77,579,519 ops/sec Run 2, Disruptor=78,678,206 ops/sec Run 3, Disruptor=80,840,743 ops/sec Run 4, Disruptor=81,037,277 ops/sec Run 5, Disruptor=81,168,831 ops/sec Run 6, Disruptor=81,699,346 ops/sec
默认的Disruptor使用的等待策略是BlockingWaitStrategy(阻塞等待策略),阻塞等待策略内部使用的是典型的锁和Condition条件变量来处理线程唤醒,这是最慢的等待策略了,可是在CPU使用率上最保守并且能给予确定的一致性行为。
休眠等待策略(SleepingWaitStrategy)
和BlockingWaitStrategy同样,为了保证CPU的使用率,不是经过一个简单的忙等待循环,而是使用一个叫LockSupport.parknanos(1)
在循环中,在典型的Linux系统中将暂停60µs,这样显然是有优点的,生产者线程不须要增长计数器,也不须要信号条件。可是在生产者和消费者之间移动事件的平均延迟事件会更高。休眠等待策略在不须要低延迟的状况下效果最好,可是对生成线程的影响是很小的,一个常见的用例是异步日志记录。
退出等待策略(YieldingWaitStrategy)
退出等待策略是两个等待策略中能够被用到低延迟的策略,消耗CPU来提升实时性。YieldingWaitStrategy会忙等待Sequence增长为适当的值。在循环体中Thread.yield()
将会容许其余线程运行,当须要很是高的性能和事件处理线程的的数量小于逻辑核心的总数时,这是推荐的等待策略,启用了超线程。
自旋等待策略(BusySpinWaitStrategy)
自旋等待策略是常见的等待策略,可是对部署环境也有很高的要求。自旋等待策略应该只能被用在处理线程数小于实际核数的时候。另外,超线程应该被关闭。
当经过Disruptor传递数据的时候,对象的存活寿命可能比预期的要长,为了不这种状况发生,可能须要在处理完事件之后清除它。若是只有一个单事件处理程序,那么在一个处理程序中清除data就够了。若是有一个事件处理链,那么须要在链结束的地方利用特殊的处理程序来清除对象。
class ObjectEvent<T> { T val; void clear() { val = null; } } public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> { public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch) { // Failing to call clear here will result in the // object associated with the event to live until // it is overwritten once the ring buffer has wrapped // around to the beginning. event.clear(); } } public static void main(String[] args) { Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>( () -> ObjectEvent<String>(), bufferSize, executor); disruptor .handleEventsWith(new ProcessingEventHandler()) .then(new ClearingObjectHandler()); }
参考资料: