disruptor入门


1、Disruptor的简介

  Disruptor是由LAMX(欧洲顶级金融公司)设计和开源的大规模、高并发、低延迟的异步处理框架,也能够说他
是最快的消息框架(JMS)。整个业务逻辑处理器彻底运行在内存中,其LMAX架构能够达到一个线程里每秒处理6百万
流水,用1微秒的延迟能够得到100K+吞吐量的爆炸性能。很是适合那种实时性高、延迟率低、业务流水量大的应用场
景,好比银行的实时交易处理、读写操做分离、数据缓存等。
  Disruptor是基于生产者-消费者模型,实现了"队列“功能的无锁高并发框架。他能够作到一个生产者对应多个消
费者且消费者之间能够并行的处理,也能够造成前后顺序的处理。Disruptor本质上解决的就是在两个独立的处理过
程之间交换数据。Disruptor框架的一些核心类有:
 1.Disruptor:用于控制整个消费者-生产者模型的处理器
   2.RingBuffer:用于存放数据
   3.EventHandler:一个用于处理事件的接口(能够当作生产者,也能够当作消费者)。
   4.EventFactory:事件工厂类。
   5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。
   6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。
   7.用于运行disruptor的线程或者线程池。算法

2、Disruptor的入门

  Disruptor的编写通常能够分为如下几步:
    (1)定义事件;
    (2)定义事件工厂;
    (3)消费者–定义事件处理的具体实现;
    (4)定义用于事件处理(消费者)的线程池;
    (5)指定等待策略:
      Disruptor 提供了多个WaitStrategy的实现,例如:BlockingWaitStrategy、SleepingWaitStrategy、
YieldingWaitStrategy等:
      BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小而且在各类不一样部署环境中能提供
更加一致的性能表现;
      SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差很少,对CPU的消耗也相似,但其
对生产者线程的影响最小,适合用于异步日志相似的场景;
      YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线
数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
      WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
      WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
      WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    (6)生产(发布)消息;
    (7)关闭disruptor业务逻辑处理器;
  Disruptor的一些核心概念有:
    - Ring Buffer(环形缓冲区) :
    曾经RingBuffer是Disruptor中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对经过
Disruptor进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 能够由用户的自定
义实现来彻底替代。
    - Sequence Disruptor :
    经过顺序递增的序号来编号管理。经过其进行交换的数据(事件),对数据(事件)的处理过程老是沿着序
号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一
个AtomicLong也能够用于标识进度,但定义Sequence来负责该问题还有另外一个目的,那就是防止不一样的 Sequence之间
的CPU缓存伪共享(Flase Sharing)问题。
    - Sequencer :
    Sequencer是Disruptor的真正核心。此接口有两个实现类SingleProducerSequencer、MultiProducerSequencer
,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    - Sequence Barrier
    用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。
Sequence Barrier 还定义了决定Consumer是否还有可处理的事件的逻辑。
    - Wait Strategy
    定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor定义了多种不一样的策略,针对不一样的场
景,提供了不同的性能表现)
    - Event
  在Disruptor的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被Disruptor
定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    - EventProcessor
    EventProcessor持有特定消费者的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    - EventHandler
    Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    - Producer
    即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
  栗子:缓存

  Event:服务器

/** * 事件(Event)就是经过 Disruptor 进行交换的数据类型。 * @author lcy * */
public class TransactionEvent { private long seq; private double amount; private long callNumber; public long getCallNumber() { return callNumber; } @Override public String toString() { return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]"; } public void setCallNumber(long callNumber) { this.callNumber = callNumber; } public long getSeq() { return seq; } public void setSeq(long seq) { this.seq = seq; } public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } }

  Factory:架构

/** * Event Factory 定义了如何实例化前面第1步中定义的事件(Event) * Disruptor 经过 EventFactory 在 RingBuffer 中预建立 Event 的实例。 一个 Event 实例实际上被用做一个“数据槽”,发布者发布前,先从 RingBuffer 得到一个 Event 的实例, 而后往 Event 实例中填充数据,以后再发布到 RingBuffer中,以后由 Consumer 得到该 Event 实例并从中读取数据。 * @author lcy * */
public class TransactionEventFactory implements EventFactory<TransactionEvent>{ @Override public TransactionEvent newInstance() { // TODO Auto-generated method stub
        return new TransactionEvent(); } }

  Customer:并发

/** * 事件处理类-交易流水初始化 * @author lcy * */
public class AmountTrasfer implements EventTranslator<TransactionEvent>{ @Override public void translateTo(TransactionEvent arg0, long arg1) { arg0.setAmount(Math.random()*99); arg0.setCallNumber(17088888888L); arg0.setSeq(System.currentTimeMillis()); System.out.println("设置交易流水:"+arg0.getSeq()); } }
/** * 消费者–定义事件处理的具体实现 * 拦截交易流水 * @author lcy * */
public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent transactionEvent) throws Exception { System.out.println("交易流水号为:"+transactionEvent.getSeq()+"||交易金额为:"+transactionEvent.getAmount()); } @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
        this.onEvent(arg0); } }
/** * 发送验证短信 * @author lcy * */
public class SendMsgHandler implements EventHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
         System.out.println("向手机号:"+arg0.getCallNumber()+"发送验证短信......"); } }
/** * 交易流水入库操做 * @author lcy * */
public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{ @Override public void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub
        this.onEvent(arg0); } @Override public void onEvent(TransactionEvent arg0) throws Exception { arg0.setSeq(arg0.getSeq()*10000); System.out.println("拦截入库流水号------------  "+arg0.getSeq()); } }

  Producer:框架

/** * 生产者、发布事件 * @author lcy * */
public class TransactionEventProducer implements Runnable { // 线程同步辅助类 - 容许一个或多个线程一直等待
 CountDownLatch cdl; Disruptor disruptor; public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) { super(); this.cdl = cdl; this.disruptor = disruptor; } public TransactionEventProducer() { super(); // TODO Auto-generated constructor stub
 } @Override public void run() { AmountTrasfer th; try { //Event对象初始化类
            th = new AmountTrasfer(); //发布事件
 disruptor.publishEvent(th); } finally { // 递减锁存器的计数 -若是计数到达零,则释放全部等待的线程。
 cdl.countDown(); } } // 定义环大小,2的倍数
    private static final int BUFFER_SIZE = 1024; // 定义处理事件的线程或线程池
    ExecutorService pool = Executors.newFixedThreadPool(7); /** * 批处理模式 * @throws Exception */
    public void BatchDeal() throws Exception { //建立一个单生产者的ringBuffer
        final RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } //设置等待策略,YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。
        }, BUFFER_SIZE,new YieldingWaitStrategy()); //建立SequenceBarrier
        SequenceBarrier barrier = ringBuffer.newBarrier(); //建立消息处理器
        BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler()); //构造反向依赖,eventProcessor之间没有依赖关系则能够将Sequence直接加入
 ringBuffer.addGatingSequences(eventProcessor.getSequence()); //提交消息处理器
 pool.submit(eventProcessor); //提交一个有返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
        Future<Void> submit = pool.submit(new Callable<Void>() { //计算结果,若是没法计算结果则抛出异常
 @Override public Void call() throws Exception { long seq; for (int i=0;i<7000;i++) { System.out.println("生产者:"+i); //环里一个可用的区块
                    seq=ringBuffer.next(); //为环里的对象赋值
                    ringBuffer.get(seq).setAmount(Math.random()*10); System.out.println("TransactionEvent:   "+ringBuffer.get(seq).toString()); //发布这个区块的数据,
 ringBuffer.publish(seq); } return null; } }); //等待计算完成,而后获取其结果。
 submit.get(); Thread.sleep(1000); //关闭消息处理器
 eventProcessor.halt(); //关闭线程池
 pool.shutdown(); } /** * 工做池模式 * @throws Exception */ @SuppressWarnings("unchecked") public void poolDeal() throws Exception { RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); SequenceBarrier barrier = ringBuffer.newBarrier(); //建立一个定长的线程池
        ExecutorService pool2 = Executors.newFixedThreadPool(5); //交易流水入库操做
        WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler(); ExceptionHandler arg2; WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler); workerPool.start(pool2); long seq; for(int i =0;i<7;i++){ seq = ringBuffer.next(); ringBuffer.get(seq).setAmount(Math.random()*99); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); pool2.shutdown(); } /** * disruptor处理器用来组装生产者和消费者 * @throws Exception */ @SuppressWarnings("unchecked") public void disruptorManage() throws Exception{ //建立用于处理事件的线程池
        ExecutorService pool2 = Executors.newFixedThreadPool(7); //建立disruptor对象
        /** * 用来指定数据生成者有一个仍是多个,有两个可选值ProducerType.SINGLE和ProducerType.MULTI * BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。一般用于消费线程数小于CPU数的场景 */ Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() { @Override public TransactionEvent newInstance() { return new TransactionEvent(); } },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy()); //建立消费者组,先执行拦截交易流水和入库操做
        EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler()); //在进行风险交易的2次验证操做
        eventsWith.then(new SendMsgHandler()); //启动disruptor
 disruptor2.start(); //在线程能经过 await()以前,必须调用 countDown() 的次数
        CountDownLatch latch = new CountDownLatch(1); //将封装好的TransactionEventProducer类提交
        pool2.submit(new TransactionEventProducer(latch,disruptor2)); //使当前线程在锁存器倒计数至零以前一直等待,以保证生产者任务彻底消费掉
 latch.await(); //关闭disruptor业务逻辑处理器
 disruptor2.shutdown(); //销毁线程池
 pool2.shutdown(); } }

  Test:dom

/** * 测试类 * @author lcy * */
public class Test { public static void main(String[] args) throws Exception { TransactionEventProducer producer = new TransactionEventProducer(); for (int i = 0; i < 100; i++) producer.disruptorManage(); System.out.println("--------------------------------------------------"); } }

3、记一次生产上的BUG

  前段时间升级的时候出现了这样一个BUG,致使了近万用户的交易失败。首先确认了咱们在生产上并无部署拦
截交易的规则,全部的交易流水都是放行的不会加入咱们的风险名单库。那么内存库里的几万灰名单是怎么来的呢?
  咱们在上线成功后需使用真实的用户进行一波生产上的测试,而在测试的过程当中为了配合测试那边的需求,
需将特定的几个测试帐号配置成加入灰名单并进行2次认证的拦截规则。测试经过后便将那几条测试规则给撤回了。但
是咱们忽略了一个问题,由于Disruptor框架在初始化环的时候,只会new一次这个对象。这就致使了插入环里“槽”的对
象始终都是第一次进入“灰名单”对象,等到环被塞满后下条流水进来的时候就会使用“槽”里的“灰名单”对象。即便这笔
交易不是风险交易也会加入到灰名单中,致使了大量的用户交易失败。
  上线后的次日,咱们头儿就意识到了这个问题,想经过重启服务、清空环来暂时解决这个问题(服务器有负载均
衡),由于环被清空后,以前在环里的“灰名单”对象也就不存在了,并且生产上没有部署将用户加入“灰名单”的规则,环
里的对象就必定是“干净的”,这个问题也就获得了解决。可是、但是、可可是、万万没想到啊,当晚生产上仍是出现了问题。
灰名单里的用户数量已经逼近2万了,大量用户不能进行电子交易。
  为何项目已经被重启了,环也被清空了,也没有规则会产生新的灰名单,那2万的灰名单用户是从哪来的?过后
经过查看代码发现,虽然环被清空了,可是在清空以前已经有部分用户被存到了灰名单里。这些用户在某一时间再次
进行交易时,会从新将这条交易的状态设置为“灰名单”(其余业务须要),这就致使了接待这条交易流水的“槽”会被重
新赋值为“灰名单”的状态,而后环里的“灰名单”槽就会越滚越多。
  Event在使用的时候必定不要忘记将关键的属性进行初始化,这样才能保证从环里取出的对象是初始状态的,不会被上次处理的数据所影响。异步

相关文章
相关标签/搜索