在disruptor框架调用start方法以前,须要将消息的消费者指定给disruptor框架。java
对于独立消费的消费者,应当实现EventHandler接口。对于不重复消费的消费者,应当实现WorkHandler接口。
从代码层面而言, 有不一样的具体实现来支持不一样的模式app
此处的测试代码的对接口WorkHandler 进行了改造。
框架
package com.lmax.disruptor.noob; import java.time.Instant; import java.time.format.DateTimeFormatter; public class CompareTest { public static int THREAD = 2; // 线程数量 public static int PER = 1; // 单个线程生产数量 public static int TOTAL_COUNT = THREAD * PER; // 数据总量 public static int SIZE = 4; // 最大容量 public static void main(String[] args) { println("线程数:" + THREAD + " 单线程生产量: " + PER + " 容量:" + SIZE + " 数据总量:" + TOTAL_COUNT); DisruptorTest.execute(); } public static void println(String msg) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "[" + Thread.currentThread().getName() + "] " + msg); } } --------- import java.util.concurrent.ThreadFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; public class DisruptorTest { public static void execute() { Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE, new ThreadFactory() { AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable eventProcessor) { CompareTest.println("EventProcessor wrapper");// 对事件处理总线的封装 Thread thread = new Thread(eventProcessor); thread.setName("EventProcessor" + count.incrementAndGet()); return thread; } }); /** * 建立EventProcessors<Runnable>. * 子过程Disruptor.checkNotStarted()事件处理handler必须在启动以前绑定. */ disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),new DataEventHandler("dataEventHandler2")); // disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"),new DataWorkHandler("dataWorkHandler2")); disruptor.start(); CompareTest.println("disruptor start success!"); RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer(); DataProducer producer = new DataProducer(ringBuffer); DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer); long start = System.currentTimeMillis(); for (int l = 0; l < CompareTest.THREAD; l++) { new Thread(() -> { for (int m = 0; m < CompareTest.PER; m++) { producer.onData(start); // translator.onData(start); 推荐用这种方式作。 } }).start(); } } } ---------- import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.EventHandler; public class DataEventHandler implements EventHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); public String name = null; public DataEventHandler(String name) { this.name = name; } @Override public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(name.contentEquals("dataEventHandler1") ? 1000 : 100); CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence + " count:" + count.incrementAndGet() + " Disruptor 总耗时:" + (System.currentTimeMillis() - event.getStartTime())); } } ---------- import java.util.concurrent.atomic.AtomicLong; import com.lmax.disruptor.WorkHandler; public class DataWorkHandler implements WorkHandler<DataEvent> { public AtomicLong count = new AtomicLong(0); public String name = null; public DataWorkHandler(String name) { this.name = name; } @Override public void onEvent(DataEvent event, long sequence) throws Exception { Thread.sleep(name.contentEquals("dataWorkHandler2") ? 100 :1000); CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence + " count:" + count.incrementAndGet() + " Disruptor 总耗时:" + (System.currentTimeMillis() - event.getStartTime())); } }
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1")).then(new DataEventHandler("dataEventHandler2")).then(new DataEventHandler("dataEventHandler3"));
disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"), new DataWorkHandler("dataWorkHandler2"));
组合方式ide
disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4")).then(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));
disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2")).thenHandleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4"));