Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现居然与I/O操做处于一样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,得到了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还得到了Oracle官方的Duke大奖。html
介绍Disruptor以前,咱们先来看一看经常使用的线程安全的内置队列有什么问题。Java的内置队列以下表所示。java
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层通常分红三种:数组、链表和堆。其中,堆通常状况下是为了实现带有优先级特性的队列,暂且不考虑。数组
咱们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要经过加锁的方式来保证线程安全;基于链表的线程安全队列分红LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也经过锁的方式来实现线程安全,然后者以及上面表格中的LinkedTransferQueue都是经过原子变量compare and swap(如下简称“CAS”)这种不加锁的方式来实现的。缓存
经过不加锁的方式实现的队列都是无界的(没法保证队列的长度在肯定的范围内);而加锁的方式,能够实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,致使内存溢出,只能选择有界队列;同时,为了减小Java的垃圾回收对系统性能的影响,会尽可能选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。安全
Disruptor论文中讲述了一个实验:服务器
Method | Time (ms) |
---|---|
Single thread | 300 |
Single thread with CAS | 5,700 |
Single thread with lock | 10,000 |
Single thread with volatile write | 4,700 |
Two threads with CAS | 30,000 |
Two threads with lock | 224,000 |
CAS操做比单线程无锁慢了1个数量级;有锁且多线程并发的状况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。数据结构
单线程状况下,不加锁的性能 > CAS操做的性能 > 加锁的性能。多线程
在多线程状况下,为了保证线程安全,必须使用CAS或锁,这种状况下,CAS的性能超过锁的性能,前者大约是后者的8倍。并发
综上可知,加锁的性能是最差的。分布式
Disruptor经过如下设计来解决队列速度慢的问题:
为了不垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
数组长度2^n,经过位运算,加快定位的速度。下标采起递增的形式。不用担忧index溢出的问题。index是long类型,即便100万QPS的处理速度,也须要30万年才能用完。
每一个生产者或者消费者线程,会先申请能够操做的元素在数组中的位置,申请到以后,直接在该位置写入或者读取数据。
生产者单线程写数据的流程比较简单:
图5 单个生产者生产过程示意图
多个生产者的状况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每一个线程获取不一样的一段数组空间进行操做。这个经过CAS很容易达到。只须要在分配元素的时候,经过CAS判断一下这段空间是否已经分配出去便可。
可是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的状况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
下面分读数据和写数据两种状况介绍。
生产者多线程写入的状况会复杂不少:
以下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。
读线程申请读取到下标从3到11的元素,判断writer cursor>=11。而后开始读取availableBuffer,从3开始,日后读取,发现下标为7的元素没有生产成功,因而WaitFor(11)返回6。
而后,消费者读取下标从3到6共计4个元素。
图6 多个生产者状况下,消费者消费过程示意图
多个生产者写入的时候:
以下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。
Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,日后移一位,开始写下标4位置的元素。Writer2一样的方式。最终都写入完成。
图7 多个生产者状况下,生产者生产过程示意图
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程经过原子变量CAS,保证操做的线程安全。
防止不一样生产者对同一段空间写入的代码,以下所示:
public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); return next; }
经过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其余生产者占据。假如已经被占据,该函数会返回失败,While循环从新执行,申请写入空间。
消费者的流程与生产者很是相似,这儿就很少描述了。
Disruptor经过精巧的无锁设计实现了在高并发情形下的高性能。
在美团点评内部,不少高并发场景借鉴了Disruptor的设计,减小竞争的强度。其设计思想能够扩展到分布式场景,经过无锁设计,来提高服务性能。
/** * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端 */ import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.ThreadFactory; public class DisruptorMain { public static void main(String[] args) throws Exception { // 队列中的元素 class Element { private int value; public int get(){ return value; } public void set(int value){ this.value= value; } } // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread(r, "simpleThread"); } }; // RingBuffer生产工厂,初始化RingBuffer的时候使用 EventFactory<Element> factory = new EventFactory<Element>() { @Override public Element newInstance() { return new Element(); } }; // 处理Event的handler EventHandler<Element> handler = new EventHandler<Element>(){ @Override public void onEvent(Element element, long sequence, boolean endOfBatch) { System.out.println("Element: " + element.get()); } }; // 阻塞策略 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); // 指定RingBuffer的大小 int bufferSize = 16; // 建立disruptor,采用单生产者模式 Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); // 设置EventHandler disruptor.handleEventsWith(handler); // 启动disruptor的线程 disruptor.start(); RingBuffer<Element> ringBuffer = disruptor.getRingBuffer(); for (int l = 0; true; l++) { // 获取下一个可用位置的下标 long sequence = ringBuffer.next(); try { // 返回可用位置的元素 Element event = ringBuffer.get(sequence); // 设置该位置元素的值 event.set(l); } finally { ringBuffer.publish(sequence); } Thread.sleep(10); } } }
切记:必定要在设置值的地方加上
try{
}finally{
},
不然若是数据发布不成功,最后数据会逐渐填满ringbuffer,最后后面来的数据根本没有办法调用可用空间,致使方法阻塞,占用CPU和内存,没法释放资源,最后致使服务器死机
注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须获得调用;若是某个请求的 sequence 未被提交,将会堵塞后续的发布操做或者其它的 producer。
Disruptor 还提供另一种形式的调用来简化以上操做,并确保 publish 老是获得调用。
public class LongEvent { private long value; public void set(long value) { this.value = value; } } static class Translator implements EventTranslatorOneArg<LongEvent, Long>{ @Override publicvoid translateTo(LongEvent event, long sequence, Long data) { event.set(data); } } public static Translator TRANSLATOR = new Translator(); public staticvoid publishEvent2(Disruptor<LongEvent> disruptor) { // 发布事件; RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long data = getEventDataxxxx();//获取要经过事件传递的业务数据; ringBuffer.publishEvent(TRANSLATOR,data); }
多个生产者
在构建Disruptor实例的时候,须要指定生产者是单生产者(ProducerType.SINGLE)仍是多生产者(ProducerType.MULTI)
多个消费者
(类型1) 多个消费者每一个消费者都有机会消费相同数据,使用handleEventsWith方法
class ComsumerHandler implements EventHandler<ResultTxt>{ private int no; public ComsumerHandler(int no) { this.no=no; } @Override public void onEvent(ResultTxt resultTxt, long sequence, boolean endOfBatch) throws Exception { System.out.println(no+" data commming......"+resultTxt.getBarCode()); } } //设置多个消费者 disruptor.handleEventsWith(new ComsumerHandler(1),new ComsumerHandler(2));
(类型2) 多个消费者,每一个消费者消费不一样数据。也就是说每一个消费者竞争数据,竞争到消费,其余消费者没有机会。使用handleEventsWithWorkerPool方法
class ComsumerHandler implements WorkHandler<ResultTxt>{ private int no; public ComsumerHandler(int no) { this.no=no; } @Override public void onEvent(ResultTxt event) throws Exception { System.out.println(no+" data commming......"+event.getBarCode()); } } //多个消费者,每一个消费者竞争消费不一样数据 disruptor.handleEventsWithWorkerPool(new ComsumerHandler(1),new ComsumerHandler(2));
暂时只有休眠1ns。
LockSupport.parkNanos(1);
名称 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 经过不断重试,减小切换线程致使的系统调用,而下降延迟。推荐在线程绑定到固定的CPU的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU资源之间有很好的折中。延迟比较均匀 |
更多详细信息请参考: http://tech.meituan.com/disruptor.html