Disruptor系列(二)— disruptor使用

本文译自Dirsruptor在github上的wiki中文章:Getting Startedhtml

获取Disruptor

Disruptor jar包能够从maven仓库mvnrepository获取,能够将其集成进项目的依赖管理中。java

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>


编写事件处理生产者和消费者

为了学习Disruptor的使用,这里以很是简单的例子入手:生产者生产单个long型value传递给消费者。这里简化消费者逻辑,只打印消费的value。首先定义携带数据的Event:git

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value; 
    }
}

为了容许Disruptor可以为咱们预分配这些事件,咱们须要一个EventFactory用于构造事件:github

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

一旦咱们定义了事件,我便再须要建立事件消费者用于消费处理事件。在咱们的例子中,咱们只须要打印value值到控制台便可:网络

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

有了事件消费者,咱们还须要事件生产者产生事件。为了简单起见,咱们假设数据来源于I/O,如:网络或者文件。因为不一样版本的Disruptor,提供了不一样的方式编写生产者。并发

随着3.0版本,Disruptor经过将复杂逻辑囊括在RingBuffer中,从而提供了丰富的Lambda-style API帮助开发者构建Producer。所以从3.0以后,更偏心使用Event Publisher/Event Translator的API发布消息:app

public class LongEventProducerWithTranslator
{
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

这种方式的另外一个优点在于Translator代码能够被分离在单独的类中,同时也比较容易进行无依赖的单元测试。Disruptor提供了许多不一样的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.),能够经过实现这些接口提供translators。缘由是容许转换器被表示为静态类或非捕获lambda做为转换方法的参数经过Ring Buffer上的调用传递给转换器。异步

另外一方式使用3.0版本以前的遗留API构建生产者发布消息,这种方式比较原始:maven

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

从以上的代码流程编写能够看出,事件的发布比使用一个简单的队列要复杂。这是因为须要对事件预分配致使。对于消息的发布有两个阶段,首先在RingBuffer中声明须要的槽位,而后再发布可用的数据。必须使用try/finally语句块包裹消息的发布。必须如今try块中声明使用RingBuffer的槽位,而后再finally块中发布使用的sequece。若是不这样作,将可能致使Disruptor状态的错误,特别是在多生产者的状况下,若是不重启Disruptor将不能恢复。所以推荐使用EventTranslator编写producer。高并发

最后一步须要将以上编写的组件链接起来。虽然能够手动链接各个组件,然而那样可能比较复杂,所以提供了一个DSL用于构造以便简化过程。使用DSL带来装配的简化,可是却对于不少参数没法作到更细致的控制,然而对于大多数状况,DSL仍是很是适合:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}

关于对Disruptor的接口设计的影响之一是Java 8,由于它使用了Functional Interfaces去实现Java Lambdas。在Disruptor API的大多数接口都被定义成Functional Interfaces以便Lambdas能够被使用。以上的LongEventMain可使用Lambdas进行简化:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

能够看出使用Lambdas有大量的类将再也不须要,如:handler,translator等。也能够看出使用Lambdas简化publishEvent()只仅仅涉及到参数传递。

然而若是将代码改为这样:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

注意这里使用了捕获式的Lambda,意味着经过调用publishEvent()时可能须要实例化一个对象来持有ByteBuffer bb将其传递给lambda。这个将可能建立额外的垃圾,若是对GC压力有严格要求的状况下,经过传递参数的方式将更加受欢迎。

使用方法引用来代理上述的lambda将能进一步简化上述的方式,也将更时髦:

public class LongEventMain
{
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
        event.set(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

这里对ringBuffer.publishEvent的参数使用了方法引用替换了lambda,使其更进一步简化。


基本的参数设置

对于大多数场景使用方式便可。然而,若是你能肯定硬件和软件的环境即可以进一步对Disruptor的参数进行调整以提升性能。主要有两种参数能够被调整:

  • single vs. multiple producers
  • alternative wait strategies

Single vs. Multiple Producers

提升并发系统的性能的最好方式是遵循Single Writer Principle,这个也在Disruptor也被应用。若是在你的场景中只仅仅是单生产者,而后你能够调优得到额外的性能提高:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(
            factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), DaemonThreadFactory.INSTANCE);
        //.....
    }
}

为了说明经过这种技术方式能替身多少性能优点,这里有一份测试类OneToOne performance test。在i7 Sandy Bridge MacBook Air的运行结果:

Multiple Producer:

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

Single Producer:

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

Alternative Wait Strategies

Disruptor默认使用的等待策略是BlockingWaitStrategy。内部的BlockingWaitStrategy使用典型的Lock和Condition处理线程的wake-up。BlockingWaitStrategy是等待策略中最慢的,可是在CPU使用率方面是最保守的,最普遍的适用于大多数场景。能够经过调整等待策略参数获取额外的性能。

1.SleepingWaitStrategy

相似BlockingWaitStrategy,SleepingWaitStrategy也试图保持CPU使用率。经过使用简单的忙等循环,可是在循环过程当中调用了LockSupport.parkNanos(1)。在典型的Linux系统上停顿线程60us。然而,它具备如下好处:生产线程不须要采起任何其余增长适当计数器的动做,而且不须要发信号通知条件变量的成本。然而将增大生产者和消费者以前数据传递的延迟。在低延迟没有被要求的场景中,这是一个很是好的策略。一个公共的使用场景是异步日志。

2.YieldingWaitStrategy

YieldingWaitStrategy是一个低延迟系统中等待策略。经过牺牲CPU资源来下降延迟。YieldingWaitStrategy经过busy spin等待sequence增加到合适的值。在内部实现中,经过在循环内部使用Thread.yield()容许其余的队列线程运行。当须要很高的性能且事件处理线程少于CPU逻辑核数时这个策略被强烈推荐。如:启用了超线程。

3.BusySpinWaitStrategy

BusySpinWaitStrategy是高新跟那个的等待策略,可是对环境有限制。若是事件处理器的数量小于物理核数时才使用这个策略。


清理RingBuffer中的对象

当经过Disruptor传递数据时,对象的存活时间可能超过预期。为了可以避免这个发生,在事件处理结束后应当清理下事件对象。若是只有单个生产者,在该生产者中清理对象便是最高效的。而后有时间处理链时,就须要特定的事件处理器被放置在链的最末尾用于清理事件。

class ObjectEvent<T>
{
    T val;

    void clear()
    {
        val = null;
    }
}

public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
    public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the 
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear(); 
    }
}

public static void main(String[] args)
{
    Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
        () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);

    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
}
相关文章
相关标签/搜索