高性能高并发队列-Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现居然与I/O操做处于一样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,得到了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还得到了Oracle官方的Duke大奖。html

Java内置队列

介绍Disruptor以前,咱们先来看一看经常使用的线程安全的内置队列有什么问题。Java的内置队列以下表所示。java

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层通常分红三种:数组、链表和堆。其中,堆通常状况下是为了实现带有优先级特性的队列,暂且不考虑。数组

咱们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要经过加锁的方式来保证线程安全;基于链表的线程安全队列分红LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也经过锁的方式来实现线程安全,然后者以及上面表格中的LinkedTransferQueue都是经过原子变量compare and swap(如下简称“CAS”)这种不加锁的方式来实现的。缓存

经过不加锁的方式实现的队列都是无界的(没法保证队列的长度在肯定的范围内);而加锁的方式,能够实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,致使内存溢出,只能选择有界队列;同时,为了减小Java的垃圾回收对系统性能的影响,会尽可能选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。安全

Disruptor论文中讲述了一个实验:服务器

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS操做比单线程无锁慢了1个数量级;有锁且多线程并发的状况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。数据结构

单线程状况下,不加锁的性能 > CAS操做的性能 > 加锁的性能。多线程

在多线程状况下,为了保证线程安全,必须使用CAS或锁,这种状况下,CAS的性能超过锁的性能,前者大约是后者的8倍。并发

综上可知,加锁的性能是最差的。分布式

 

Disruptor的设计方案

Disruptor经过如下设计来解决队列速度慢的问题:

  • 环形数组结构

为了不垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度2^n,经过位运算,加快定位的速度。下标采起递增的形式。不用担忧index溢出的问题。index是long类型,即便100万QPS的处理速度,也须要30万年才能用完。

  • 无锁设计

每一个生产者或者消费者线程,会先申请能够操做的元素在数组中的位置,申请到以后,直接在该位置写入或者读取数据。

一个生产者

写数据

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 如果有m个元素能够写入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 如果返回的正确,则生产者开始写入元素。




图5 单个生产者生产过程示意图

多个生产者

多个生产者的状况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每一个线程获取不一样的一段数组空间进行操做。这个经过CAS很容易达到。只须要在分配元素的时候,经过CAS判断一下这段空间是否已经分配出去便可。

可是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的状况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

下面分读数据和写数据两种状况介绍。

读数据

生产者多线程写入的状况会复杂不少:

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然没法肯定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,而后返回最大连续可读元素的位置;
  3. 消费者读取元素。

以下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

读线程申请读取到下标从3到11的元素,判断writer cursor>=11。而后开始读取availableBuffer,从3开始,日后读取,发现下标为7的元素没有生产成功,因而WaitFor(11)返回6。

而后,消费者读取下标从3到6共计4个元素。




图6 多个生产者状况下,消费者消费过程示意图

写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 如果有m个元素能够写入,则返回最大的序列号。每一个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记本身哪些位置是已经写入成功的。

以下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,日后移一位,开始写下标4位置的元素。Writer2一样的方式。最终都写入完成。




图7 多个生产者状况下,生产者生产过程示意图

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程经过原子变量CAS,保证操做的线程安全。

防止不一样生产者对同一段空间写入的代码,以下所示:

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));

    return next;
}

经过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其余生产者占据。假如已经被占据,该函数会返回失败,While循环从新执行,申请写入空间。

消费者的流程与生产者很是相似,这儿就很少描述了。

总结

Disruptor经过精巧的无锁设计实现了在高并发情形下的高性能。

在美团点评内部,不少高并发场景借鉴了Disruptor的设计,减小竞争的强度。其设计思想能够扩展到分布式场景,经过无锁设计,来提高服务性能。

 

代码样例

/**
 * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;

public class DisruptorMain
{
    public static void main(String[] args) throws Exception
    {
        // 队列中的元素
        class Element {

            private int value;

            public int get(){
                return value;
            }

            public void set(int value){
                this.value= value;
            }

        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>(){
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            {
                System.out.println("Element: " + element.get());
            }
        };

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 建立disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();  
            try
            {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 设置该位置元素的值
                event.set(l); 
            }
            finally
            {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

  切记:必定要在设置值的地方加上

  try{

  }finally{

  },

      不然若是数据发布不成功,最后数据会逐渐填满ringbuffer,最后后面来的数据根本没有办法调用可用空间,致使方法阻塞,占用CPU和内存,没法释放资源,最后致使服务器死机

      注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须获得调用;若是某个请求的 sequence 未被提交,将会堵塞后续的发布操做或者其它的 producer。

Disruptor 还提供另一种形式的调用来简化以上操做,并确保 publish 老是获得调用。

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
    @Override
    publicvoid translateTo(LongEvent event, long sequence, Long data) {
        event.set(data);
    }    
}
    
public static Translator TRANSLATOR = new Translator();
    
public staticvoid publishEvent2(Disruptor<LongEvent> disruptor) {
    // 发布事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long data = getEventDataxxxx();//获取要经过事件传递的业务数据;    
    ringBuffer.publishEvent(TRANSLATOR,data);
}

多个生产者

    在构建Disruptor实例的时候,须要指定生产者是单生产者ProducerType.SINGLE)仍是多生产者ProducerType.MULTI

多个消费者

   (类型1)   多个消费者每一个消费者都有机会消费相同数据,使用handleEventsWith方法

class ComsumerHandler implements EventHandler<ResultTxt>{
    
    private int no;

    public  ComsumerHandler(int no) {
        this.no=no;
    }

    @Override
    public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch)
        throws Exception {
        System.out.println(no+" data commming......"+resultTxt.getBarCode());
    }
}

//设置多个消费者
disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));

     (类型2)  多个消费者,每一个消费者消费不一样数据。也就是说每一个消费者竞争数据,竞争到消费,其余消费者没有机会。使用handleEventsWithWorkerPool方法

class ComsumerHandler implements WorkHandler<ResultTxt>{

    private int no;

    public  ComsumerHandler(int no) {
        this.no=no;
    }

    @Override
    public void onEvent(ResultTxt event)
        throws Exception {
        System.out.println(no+"  data commming......"+event.getBarCode());
    }
}

//多个消费者,每一个消费者竞争消费不一样数据
disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));

 

等待策略

生产者的等待策略

暂时只有休眠1ns。

LockSupport.parkNanos(1);

消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 经过不断重试,减小切换线程致使的系统调用,而下降延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

更多详细信息请参考: http://tech.meituan.com/disruptor.html

相关文章
相关标签/搜索