本次代码测试基于相同的 容量、生产线程数、单个线程生产量; 仅有一个消费线程。java
修改各参数获得的结果:并发
数据规模、并发线程数、 最主要的是容量小时:Disruptor没有优点app
2019-08-29T07:42:35.235Z 线程数:64 单线程生产量: 2048 容量:32 数据总量:131072 2019-08-29T07:42:48.742Z EventProcessor wrapper 2019-08-29T07:42:48.743Z disruptor start success! 2019-08-29T07:42:51.113Z 处理的sequence:131071 count:131072 Disruptor 总耗时:2369 2019-08-29T07:42:36.200Z ArrayBlockingQueue 生产耗时:962 2019-08-29T07:42:36.200Z 处理count:131072 ArrayBlockingQueue 消费耗时:962 2019-08-29T07:42:36.201Z ArrayBlockingQueue 总耗时:963
2019-08-29T08:24:38.641Z 线程数:512 单线程生产量: 2048 容量:32 数据总量:1048576 2019-08-29T08:24:38.670Z EventProcessor wrapper 2019-08-29T08:24:38.670Z disruptor start success! 2019-08-29T08:25:08.590Z 处理的sequence:1048575 count:1048576 Disruptor 总耗时:29918 2019-08-29T08:25:54.753Z 处理count:1048576 ArrayBlockingQueue 消费耗时:9231 2019-08-29T08:25:54.753Z ArrayBlockingQueue 生产耗时:9230 2019-08-29T08:25:54.753Z ArrayBlockingQueue 总耗时:9231
增大容量: Disruptor的性能上升ide
2019-08-29T07:40:28.980Z 线程数:64 单线程生产量: 2048 容量:128 数据总量:131072 2019-08-29T07:40:29.008Z EventProcessor wrapper 2019-08-29T07:40:29.008Z disruptor start success! 2019-08-29T07:40:29.694Z 处理的sequence:131071 count:131072 Disruptor 总耗时:685 2019-08-29T07:47:42.436Z 处理count:131072 ArrayBlockingQueue 消费耗时:508 2019-08-29T07:47:42.436Z ArrayBlockingQueue 生产耗时:508 2019-08-29T07:47:42.436Z ArrayBlockingQueue 总耗时:508
2019-08-29T07:43:39.073Z 线程数:64 单线程生产量: 2048 容量:512 数据总量:131072 2019-08-29T07:43:39.101Z EventProcessor wrapper 2019-08-29T07:43:39.101Z disruptor start success! 2019-08-29T07:43:39.269Z 处理的sequence:131071 count:131072 Disruptor 总耗时:167 2019-08-29T07:43:53.722Z ArrayBlockingQueue 生产耗时:383 2019-08-29T07:43:53.722Z 处理count:131072 ArrayBlockingQueue 消费耗时:383 2019-08-29T07:43:53.722Z ArrayBlockingQueue 总耗时:383
2019-08-29T07:44:05.995Z 线程数:64 单线程生产量: 2048 容量:1024 数据总量:131072 2019-08-29T08:18:10.426Z EventProcessor wrapper 2019-08-29T08:18:10.426Z disruptor start success! 2019-08-29T08:18:10.524Z 处理的sequence:131071 count:131072 Disruptor 总耗时:97 2019-08-29T07:44:06.365Z ArrayBlockingQueue 生产耗时:367 2019-08-29T07:44:06.365Z 处理count:131072 ArrayBlockingQueue 消费耗时:367 2019-08-29T07:44:06.365Z ArrayBlockingQueue 总耗时:367
再增大各指标参数: Disruptor优点愈来愈明显高并发
2019-08-29T07:50:59.911Z 线程数:64 单线程生产量: 65536 容量:1048576 数据总量:4194304 2019-08-29T07:51:28.075Z EventProcessor wrapper 2019-08-29T07:51:28.075Z disruptor start success! 2019-08-29T07:51:28.577Z 处理的sequence:4194303 count:4194304 Disruptor 总耗时:501 2019-08-29T07:51:11.549Z ArrayBlockingQueue 生产耗时:11633 2019-08-29T07:51:11.575Z 处理count:4194304 ArrayBlockingQueue 消费耗时:11659 2019-08-29T07:51:11.575Z ArrayBlockingQueue 总耗时:11659
2019-08-29T07:57:22.994Z 线程数:128 单线程生产量: 65536 容量:1048576 数据总量:8388608 2019-08-29T07:57:23.074Z EventProcessor wrapper 2019-08-29T07:57:23.074Z disruptor start success! 2019-08-29T07:57:24.036Z 处理的sequence:8388607 count:8388608 Disruptor 总耗时:961 2019-08-29T07:58:25.567Z ArrayBlockingQueue 生产耗时:47941 2019-08-29T07:58:25.646Z 处理count:8388608 ArrayBlockingQueue 消费耗时:48020 2019-08-29T07:58:25.647Z ArrayBlockingQueue 总耗时:48021
再大线程数, ArrayBlockingQueue 更耗时了,而Disruptor仍旧很快性能
2019-08-29T08:05:17.927Z 线程数:256 单线程生产量: 65536 容量:1048576 数据总量:16777216 2019-08-29T08:05:18.026Z EventProcessor wrapper 2019-08-29T08:05:18.027Z disruptor start success! 2019-08-29T08:05:20.060Z 处理的sequence:16777215 count:16777216 Disruptor 总耗时:2032
经测试发现: 测试
package com.lmax.disruptor.noob; import java.time.Instant; import java.time.format.DateTimeFormatter; /** * 担忧影响, 分开执行测试 * * @author admin * */ public class CompareTest { public static int THREAD = 2 << 8; // 线程数量 public static int PER = 2 << 10; // 单个线程生产数量 public static int TOTAL_COUNT = THREAD * PER; // 数据总量 public static int SIZE =32; // 最大容量 public static void main(String[] args) { println("线程数:" + THREAD + " 单线程生产量: " + PER + " 容量:" + SIZE + " 数据总量:" + TOTAL_COUNT); new Thread(() -> ArrayBlockingQueueTest.execute()).start(); // new Thread(() -> DisruptorTest.execute()).start(); } public static void println(String msg) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + " " + msg); } }
package com.lmax.disruptor.noob; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ArrayBlockingQueueTest { public static void execute() { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(CompareTest.SIZE); AtomicBoolean endP = new AtomicBoolean(false); AtomicBoolean endC = new AtomicBoolean(false); long startTime = System.currentTimeMillis(); AtomicLong count = new AtomicLong(0); for (int i = 0; i < CompareTest.THREAD; i++) { final int m = i; new Thread(() -> { for (int j = 0; j < CompareTest.PER; j++) { try { queue.put("i" + m + "j" + j); // 队列不够,等待生产 } catch (InterruptedException e) { e.printStackTrace(); } if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) { CompareTest.println("ArrayBlockingQueue 生产耗时:" + (System.currentTimeMillis() - startTime)); endP.set(true); } } }).start(); } new Thread(() -> { AtomicLong consumerCount = new AtomicLong(0); while (true) { try { queue.take(); // 直到消费完全部信息 } catch (InterruptedException e) { e.printStackTrace(); } if (consumerCount.incrementAndGet() == CompareTest.TOTAL_COUNT) { break; } } CompareTest.println("处理count:" + consumerCount.get() + " ArrayBlockingQueue 消费耗时:" + (System.currentTimeMillis() - startTime)); endC.set(true); }).start(); while (!(endC.get() && endP.get())) {} CompareTest.println("ArrayBlockingQueue 总耗时:" + (System.currentTimeMillis() - startTime)); } }
package com.lmax.disruptor.noob; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class DisruptorTest { public static void execute() { Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE, new ThreadFactory() { @Override public Thread newThread(Runnable eventProcessor) { CompareTest.println("EventProcessor wrapper");// 对事件处理总线的封装 Thread thread = new Thread(eventProcessor); thread.setName("EventProcessorWrapper"); return thread; } }); /** * 建立EventProcessors<Runnable>. * 子过程Disruptor.checkNotStarted()事件处理handler必须在启动以前绑定. */ disruptor.handleEventsWith(new DataEventHandler()); disruptor.start(); CompareTest.println("disruptor start success!"); RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer(); DataProducer producer = new DataProducer(ringBuffer); DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer); long start = System.currentTimeMillis(); for (int l = 0; l < CompareTest.THREAD; l++) { new Thread(() -> { for (int m = 0; m < CompareTest.PER; m++) { producer.onData(start); // translator.onData(start); } }).start(); } /** * 关闭 disruptor,方法会堵塞,直至全部的事件都获得处理;并不会自动关闭外部指定的executor,须要主动关闭 */ // disruptor.shutdown(); // CompareTest.println("disruptor shutdown success!"); // executor.shutdown(); } }
事件this
package com.lmax.disruptor.noob; /** * 事件实例封装 业务数据传递对象 * * @author admin * */ public class DataEvent { private long startTime; public long getStartTime() { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } } --- package com.lmax.disruptor.noob; import com.lmax.disruptor.EventFactory; /* * 构建传递的数据封装对象, 在初始化ringBuffer时,直接给entries[]每一个地址上初始化DataEvent */ public class DataEventFactory implements EventFactory { @Override public Object newInstance() { return new DataEvent(); } }
生产事件发布atom
package com.lmax.disruptor.noob; import com.lmax.disruptor.RingBuffer; public class DataProducer { private final RingBuffer<DataEvent> ringBuffer; public DataProducer(RingBuffer<DataEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * 当前仍是生产线程 * <p> * onData用来发布事件,每调用一次就发布一次事件事件 它的参数会经过事件传递给消费者 * * @param data */ public void onData(long data) {// // 能够把ringBuffer看作一个事件队列,那么next就是获得下面一个事件槽, 若没有空闲的时间槽则阻塞 long sequence = ringBuffer.next(); // CompareTest.println("生产置入sequence:" + sequence); try { // 用上面的索引取出一个空的事件用于填充 DataEvent event = ringBuffer.get(sequence);// for the sequence event.setStartTime(data); } finally { // 发布事件 ringBuffer.publish(sequence); } } }
获取下一个事件槽并发布事件要使用try/finally保证事件必定会被发布, 因此最好直接使用 ringBuffer.publishEvent方式将数据交由Translator来处理填充DataEvent,最后finally发布spa
package com.lmax.disruptor.noob; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * 获取下一个事件槽并发布事件(发布事件的时候要使用try/finally保证事件必定会被发布)。 * 若是咱们使用RingBuffer.next()获取一个事件槽,那么必定要发布对应的事件。若是不能发布事件,那么就会引发Disruptor状态的混乱 * 。尤为是在多个事件生产者的状况下会致使事件消费者失速,从而不得不重启应用才能会恢复。 * * @author admin * */ public class DataEventProducerWithTranslator { private final RingBuffer<DataEvent> ringBuffer; // 一个translator能够看作一个事件初始化器,publicEvent方法会调用它 // 填充Event private static final EventTranslatorOneArg<DataEvent, Long> TRANSLATOR = new EventTranslatorOneArg<DataEvent, Long>() { public void translateTo(DataEvent event, long sequence, Long startTime) { event.setStartTime(startTime); } }; public DataEventProducerWithTranslator(RingBuffer<DataEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(Long bb) { ringBuffer.publishEvent(TRANSLATOR, bb); // 当前仍是生产者线程 // CompareTest.println(Thread.currentThread().getName() + " pulishEvent end!"); } }
事件消费处理
package com.lmax.disruptor.noob; import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.EventHandler; /** * 对指定事件的处理过程 * */ public class DataEventHandler implements EventHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); @Override public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception { /** * 消费者线程由初始化Disruptor时指定的threadFactory建立的 */ if (count.incrementAndGet() == CompareTest.TOTAL_COUNT) { CompareTest.println("处理的sequence:" + sequence + " count:" + count.get() + " Disruptor 总耗时:" + (System.currentTimeMillis() - event.getStartTime())); } } }