这篇博客将主要经过几个示例,简单讲述 Disruptor 的使用方法;java
Disruptor 是英国外汇交易公司 LMAX 开发的一个无锁高性能的线程间消息传递的框架。目前包括 Apache Storm、Camel、Log4j2 等知名项目都是用了 Disruptor;git
由于 Disruptor 中的一个很重要的结构 RingBuffer
和 JDK 中的 ArrayBlockingQueue
很类似,其内部都是一个环形数组,因此常常将他们放在一块儿比较,如下是官网公布测试结果github
从图中能够明显看到他们之间性能的巨大差别;数组
此外在使用 Disruptor 的项目中也能看到其性能的差别,例如 Log4j多线程
其中 Loggers all async
采用的是 Disruptor,Async Appender
采用的是 ArrayBlockingQueue, Sync
是同步模式;从图中能够看到,线程越多竞争越激烈的时候 Disruptor 的性能优点越明显,其缘由很很容易想到,由于 ArrayBlockingQueue 的进出由同一把锁控制,因此竞争对其性能有巨大的影响;框架
此外个人笔记本配置为 “i7-8550U 8G”,使用的版本为:async
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
如下经过一个单线程的 demo,演示Disruptor 的基本用法,并个 ArrayBlockingQueue 作简单对比;ide
public class Contrast { public static final int count = 50000000; public static final int size = 1024; private static CountDownLatch latch = new CountDownLatch(1); public void testDisruptor() throws InterruptedException { long start = System.currentTimeMillis(); final Disruptor<Event> disruptor = new Disruptor<>( () -> new Event(), // 绑定事件工厂,主要用于初始化 RingBuffer size, // RingBuffer 大小 DaemonThreadFactory.INSTANCE, // 指定生产者线程工厂,也能够直接传入线程池 ProducerType.SINGLE, // 指定生产者为单线程,也支持多线程模式 new YieldingWaitStrategy() // 等待策略 // new BlockingWaitStrategy() ); Handler handler = new Handler(); disruptor.handleEventsWith(handler); // 绑定事件处理程序 disruptor.start(); RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); // 开始以后 RingBuffer 的全部位置就已经初始化完成 for (int i = 0; i < count; i++) { long seq = ringBuffer.next(); // 获取下一个放置位置 Event event = ringBuffer.get(seq); // 等到指定位置的槽 event.seId(i); // 更新事件,注意这里是更新,不是放入新的,因此不会有 GC 产生 ringBuffer.publish(seq); // 发布事件 } latch.await(); System.out.println("time: " + (System.currentTimeMillis() - start)); } private void testQueue() throws InterruptedException { long start = System.currentTimeMillis(); final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(size); new Thread(() -> { for (int i = 0; i < count; i++) { try { queue.put(new Event(i)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(() -> { for (int i = 0; i < count; i++) { try { Event event = queue.take(); if (i == count - 1) { System.out.println("last: " + event.getLogId()); latch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); latch.await(); System.out.println("time: " + (System.currentTimeMillis() - start)); } class Event { private long id; Event() {} Event(long id) { this.id = id; } public long getLogId() { return id; } public void seId(int id) { this.id = id; } } class Handler implements EventHandler<Event> { private int i = 0; @Override public void onEvent(Event event, long seq, boolean bool) { if (++i == count) { System.out.println("last: " + event.getLogId()); latch.countDown(); } } } public static void main(String[] args) throws InterruptedException { Contrast contrast = new Contrast(); contrast.testDisruptor(); // contrast.testQueue(); } }
Disruptor-YieldingWaitStrategy: 919
Disruptor-BlockingWaitStrategy: 3142
ArrayBlockingQueue : 4307工具其中 BlockingWaitStrategy 等待策略和 ArrayBlockingQueue 大体相识性能
上面的例子在使用多个消费这时,会出现重复消费的状况,若是想要一条消息只消费一次,能够参照下面的代码:
public class MoreConsumer { public static final int count = 5000; public static final int size = 16; public void testDisruptor() { long start = System.currentTimeMillis(); final Disruptor<Event> disruptor = new Disruptor<>( () -> new Event(), size, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool(new Handler("h1"), new Handler("h2"), new Handler("h3")); disruptor.start(); RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < count; i++) { long seq = ringBuffer.next(); Event event = ringBuffer.get(seq); event.id = i; ringBuffer.publish(seq); } System.out.println("time: " + (System.currentTimeMillis() - start)); } class Event { public long id; } class Handler implements WorkHandler<Event> { private String name; Handler(String name) { this.name = name; } @Override public void onEvent(Event event) { System.out.println(name + ": " + event.id); } } public static void main(String[] args) { MoreConsumer moreConsumer = new MoreConsumer(); moreConsumer.testDisruptor(); } }
如上面的代码所示使用 WorkHandler
便可,同时还须要注意选择等待策略,策略不一样也可能致使重复消费的问题,同时官网也只出须要在代码里面保证重复消费问题;
不少也业务逻辑会出现如下的相似状况,第三个消费者,须要等待前面的任务完成后才能继续执行的状况;一般咱们会使用锁、同步工具以及一些其余的方式,但都显得比较麻烦,并且效率比较低,这里若是咱们使用 Disruptor 就能很方便的解决;
disruptor.handleEventsWith(c1Handler, c2Handler); disruptor.after(c1Handler, c2Handler).handleEventsWith(c3Handler);
如此仅需两行代码,就能将上面的关系表述清楚,对于更复杂的状况一样;
对于更多的使用技巧就须要你根据实际状况分析了,下一篇博客将主要分析 Disruptor 为何会那么快;