今天用一个订单问题来加深对 Disruptor 的理解。当系统中有订单产生时,系统首先会记录订单信息。同时也会发送消息到其余系统处理相关业务,最后才是订单的处理。java
代码包含如下内容:git
1) 事件对象 Eventgithub
2)三个消费者 Handler数据库
3)一个生产者 Producerdom
4)执行 Main 方法ide
(1) Event测试
public class Trade { private String id;//ID private String name; private double price;//金额 private AtomicInteger count = new AtomicInteger(0); // 省略getter/setter }
(2) Handler 类this
一个负责存储订单信息,一个负责发送 kafka 信息到其余系统中,最后一个负责处理订单信息。线程
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 第一个 Handler1,存储到数据库中 */ public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String id = event.getId(); // 获取订单号 System.out.println(String.format("%s:Thread Id %s 订单信息保存 %s 到数据库中 ....", this.getClass().getSimpleName(), threadId, id)); } }
import com.lmax.disruptor.EventHandler; /** * 第二个 Handler2,订单信息发送到其它系统中 */ public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String id = event.getId(); // 获取订单号 System.out.println(String.format("%s:Thread Id %s 订单信息 %s 发送到 karaf 系统中 ....", this.getClass().getSimpleName(), threadId, id)); } }
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 第三个 Handler2,处理订单信息 */ public class Handler3 implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } @Override public void onEvent(Trade event) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String id = event.getId(); // 获取订单号 System.out.println(String.format("%s:Thread Id %s 订单信息 %s 处理中 ....", this.getClass().getSimpleName(), threadId, id)); } }
(3) Producer 类code
import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; import java.util.UUID; import java.util.concurrent.CountDownLatch; public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP = 1; // 模拟百万次交易的发生 public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } latch.countDown(); } } class TradeEventTranslator implements EventTranslator<Trade>{ @Override public void translateTo(Trade event, long sequence) { event.setId(UUID.randomUUID().toString()); } }
(4) 执行的 Main 方法
package com.github.binarylei.disruptor.demo3; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操做 //使用disruptor建立消费者组C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //声明在C1,C2完事以后执行JMS消息发送操做 也就是流程走到C3 handlerGroup.then(new Handler3()); disruptor.start();//启动 CountDownLatch latch=new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); } }
测试结果以下:
Handler1:Thread Id 10 订单信息保存 a097c77d-08f1-430a-8342-2143963f268f 到数据库中 .... Handler2:Thread Id 11 订单信息 a097c77d-08f1-430a-8342-2143963f268f 发送到 karaf 系统中 .... Handler3:Thread Id 13 订单信息 a097c77d-08f1-430a-8342-2143963f268f 处理中 .... 总耗时:1631
能够看到 Handler3 在 Handler1 和 Handler2 执行完成后才执行。
虽然 disruptor 模式使用起来很简单,可是创建多个消费者以及它们之间的依赖关系须要的样板代码太多了。为了能快速又简单适用于99%的场景,我为 Disruptor 模式准备了一个简单的领域特定语言(DSL),定义了消费顺序。更多Disruptor场景使用
在讲解 Disruptor DSL 以前先看一下多个消费者不重复消费的问题。
默认一个消费者一个线程,若是想要实现 C3 多个消费者共同不重复消费数据,可使用 handlerGroup.thenHandleEventsWithWorkerPool(customers)
//使用disruptor建立消费者组C1, C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); // 多个消费者不重复消费 Handler3[] customers = new Handler3[]{new Handler3(), new Handler3(), new Handler3()}; handlerGroup.thenHandleEventsWithWorkerPool(customers);
在这种状况下,只要生产者(P1)将元素放到ring buffer上,消费者C1和C2就能够并行处理这些元素。可是消费者C3必须一直等到C1和C2处理完以后,才能够处理。在现实世界中的对应的案例就像:在处理实际的业务逻辑(C3)以前,须要校验数据(C1),以及将数据写入磁盘(C2)。
//1. 使用disruptor建立消费者组C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //2. 声明在C1,C2完事以后执行JMS消息发送操做 也就是流程走到C3 handlerGroup.then(new Handler3());
disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3());
咱们甚至能够在一个更复杂的六边形模式中构建一个并行消费者链:
Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3);
天天用心记录一点点。内容也许不重要,但习惯很重要!