还在用BlockingQueue?读这篇文章,了解下Disruptor吧

1.何为队列

听到队列相信你们对其并不陌生,在咱们现实生活中队列随处可见,去超市结帐,你会看见你们都会一排排的站得好好的,等待结帐,为何要站得一排排的,你想象一下你们都没有素质,一窝蜂的上去结帐,不只让这个超市崩溃,还会容易形成各类踩踏事件,固然这些事其实在咱们现实中也是会常常发生。java

固然在计算机世界中,队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)老是被插入到尾部,而读取的时候老是从头部开始读取。在计算中队列通常用来作排队(如线程池的等待排队,锁的等待排队),用来作解耦(生产者消费者模式),异步等等。面试

2.jdk中的队列

在jdk中的队列都实现了java.util.Queue接口,在队列中又分为两类,一类是线程不安全的,ArrayDeque,LinkedList等等,还有一类都在java.util.concurrent包下属于线程安全,而在咱们真实的环境中,咱们的机器都是属于多线程,当多线程对同一个队列进行排队操做的时候,若是使用线程不安全会出现,覆盖数据,数据丢失等没法预测的事情,因此咱们这个时候只能选择线程安全的队列。在jdk中提供的线程安全的队列下面简单列举部分队列:数组

队列名字 是否加锁 数据结构 关键技术点 是否有锁 是否有界
ArrayBlockingQueue 是 数组array ReentrantLock 有锁 有界
LinkedBlockingQueue 是 链表 ReentrantLock 有锁 有界
LinkedTransferQueue 否 链表 CAS 无锁 ***
DelayQueue 否 堆Heap CAS 无锁 ***
咱们能够看见,咱们无锁的队列是***的,有锁的队列是有界的,这里就会涉及到一个问题,咱们在真正的线上环境中,***的队列,对咱们系统的影响比较大,有可能会致使咱们内存直接溢出,因此咱们首先得排除***队列,固然并非***队列就没用了,只是在某些场景下得排除。其次还剩下ArrayBlockingQueue,LinkedBlockingQueue两个队列,他们两个都是用ReentrantLock控制的线程安全,他们两个的区别一个是数组,一个是链表,在队列中,通常获取这个队列元素以后紧接着会获取下一个元素,或者一次获取多个队列元素都有可能,而数组在内存中地址是连续的,在操做系统中会有缓存的优化(下面也会介绍缓存行),因此访问的速度会略胜一筹,咱们也会尽可能去选择ArrayBlockingQueue。而事实证实在不少第三方的框架中,好比早期的log4j异步,都是选择的ArrayBlockingQueue。




缓存

固然ArrayBlockingQueue,也有本身的弊端,就是性能比较低,为何jdk会增长一些无锁的队列,其实就是为了增长性能,很苦恼,又须要无锁,又须要有界,这个时候恐怕会忍不住说一句你咋不上天呢?可是还真有人上天了。安全

3.Disruptor

Disruptor就是上面说的那个天,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,而且是一个开源的并发框架,并得到2011Duke’s程序框架创新奖。可以在无锁的状况下实现网络的Queue并发操做,基于Disruptor开发的系统单线程能支撑每秒600万订单。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在内部集成了Disruptor用来替代jdk的队列,以此来得到高性能。网络

3.1为何这么牛逼?

上面已经把Disruptor吹出了花了,你确定会产生疑问,他真的能有这么牛逼吗,个人回答是固然的,在Disruptor中有三大杀器:数据结构

  • CAS
  • 消除伪共享
  • RingBuffer 有了这三大杀器,Disruptor才变得如此牛逼。

    3.1.1锁和CAS

咱们ArrayBlockingQueue为何会被抛弃的一点,就是由于用了重量级lock锁,在咱们加锁过程当中咱们会把锁挂起,解锁后,又会把线程恢复,这一过程会有必定的开销,而且咱们一旦没有获取锁,这个线程就只能一直等待,这个线程什么事也不能作。多线程

CAS(compare and swap),顾名思义先比较在交换,通常是比较是不是老的值,若是是的进行交换设置,你们熟悉乐观锁的人都知道CAS能够用来实现乐观锁,CAS中没有线程的上下文切换,减小了没必要要的开销。 这里使用JMH,用两个线程,每次1一次调用,在我本机上进行测试,代码以下:并发

@BenchmarkMode
({
Mode
.
SampleTime
})
@OutputTimeUnit
(
TimeUnit
.
MILLISECONDS
)
@Warmup
(
iterations
=
3
,
 time 
=

5
,
 timeUnit 
=

TimeUnit
.
MILLISECONDS
)
@Measurement
(
iterations
=
1
,
batchSize 
=

100000000
)
@Threads
(
2
)
@Fork
(
1
)
@State
(
Scope
.
Benchmark
)
public

class

Myclass

{

Lock

lock

=

new

ReentrantLock
();

long
 i 
=

0
;

AtomicLong
 atomicLong 
=

new

AtomicLong
(
0
);

@Benchmark

public

void
 measureLock
()

{

lock
.
lock
();
        i
++;

lock
.
unlock
();

}

@Benchmark

public

void
 measureCAS
()

{
        atomicLong
.
incrementAndGet
();

}

@Benchmark

public

void
 measureNoLock
()

{
        i
++;

}
}

测试出来结果以下:框架

测试项目 测试结果
Lock 26000ms
CAS 4840ms
无锁 197ms
能够看见Lock是五位数,CAS是四位数,无锁更小是三位数。 由此咱们能够知道Lock>CAS>无锁。



而咱们的Disruptor中使用的就是CAS,他利用CAS进行队列中的一些下标设置,减小了锁的冲突,提升了性能。

另外对于jdk中其余的无锁队列也是使用CAS,原子类也是使用CAS。

3.1.2伪共享

谈到了伪共享就不得不说计算机CPU缓存,缓存大小是CPU的重要指标之一,并且缓存的结构和大小对CPU速度的影响很是大,CPU内缓存的运行频率极高,通常是和处理器同频运做,工做效率远远大于系统内存和硬盘。实际工做时,CPU每每须要重复读取一样的数据块,而缓存容量的增大,能够大幅度提高CPU内部读取数据的命中率,而不用再到内存或者硬盘上寻找,以此提升系统性能。可是从CPU芯片面积和成本的因素来考虑,缓存都很小。

还在用BlockingQueue?读这篇文章,了解下Disruptor吧

CPU缓存能够分为一级缓存,二级缓存,现在主流CPU还有三级缓存,甚至有些CPU还有四级缓存。每一级缓存中所储存的所有数据都是下一级缓存的一部分,这三种缓存的技术难度和制形成本是相对递减的,因此其容量也是相对递增的。

为何CPU会有L一、L二、L3这样的缓存设计?主要是由于如今的处理器太快了,而从内存中读取数据实在太慢(一个是由于内存自己速度不够,另外一个是由于它离CPU太远了,总的来讲须要让CPU等待几十甚至几百个时钟周期),这个时候为了保证CPU的速度,就须要延迟更小速度更快的内存提供帮助,而这就是缓存。对这个感兴趣能够把电脑CPU拆下来,本身把玩一下。

每一次你听见intel发布新的cpu什么,好比i7-7700k,8700k,都会对cpu缓存大小进行优化,感兴趣能够自行下来搜索,这些的发布会或者发布文章。

header 1 header 2
row 1 col 1 row 1 col 2
row 2 col 1 row 2 col 2
Martin和Mike的 QConpresentation演讲中给出了一些每一个缓存时间:


从CPU到 大约须要的CPU周期 大约须要的时间
主存
约60-80纳秒
QPI 总线传输(between sockets, not drawn)
约20ns
L3 cache 约40-45 cycles 约15ns
L2 cache 约10 cycles 约3ns
L1 cache 约3-4 cycles 约1ns
寄存器
1 cycle
缓存行









在cpu的多级缓存中,并非以独立的项来保存的,而是相似一种pageCahe的一种策略,以缓存行来保存,而缓存行的大小一般是64字节,在Java中Long是8个字节,因此能够存储8个Long,举个例子,你访问一个long的变量的时候,他会把帮助再加载7个,咱们上面说为何选择数组不选择链表,也就是这个缘由,在数组中能够依靠缓冲行获得很快的访问。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧
缓存行是万能的吗?NO,由于他依然带来了一个缺点,我在这里举个例子说明这个缺点,能够想象有个数组队列,ArrayQueue,他的数据结构以下:

class

ArrayQueue
{

long
 maxSize
;

long
 currentIndex
;
}

对于maxSize是咱们一开始就定义好的,数组的大小,对于currentIndex,是标志咱们当前队列的位置,这个变化比较快,能够想象你访问maxSize的时候,是否是把currentIndex也加载进来了,这个时候,其余线程更新currentIndex,就会把cpu中的缓存行置位无效,请注意这是CPU规定的,他并非只吧currentIndex置位无效,若是此时又继续访问maxSize他依然得继续从内存中读取,可是MaxSize倒是咱们一开始定义好的,咱们应该访问缓存便可,可是却被咱们常常改变的currentIndex所影响。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧

Padding的魔法

为了解决上面缓存行出现的问题,在Disruptor中采用了Padding的方式,

class

LhsPadding
{

protected

long
 p1
,
 p2
,
 p3
,
 p4
,
 p5
,
 p6
,
 p7
;
}
class

Value

extends

LhsPadding
{

protected

volatile

long
 value
;
}
class

RhsPadding

extends

Value
{

protected

long
 p9
,
 p10
,
 p11
,
 p12
,
 p13
,
 p14
,
 p15
;
}

其中的Value就被其余一些无用的long变量给填充了。这样你修改Value的时候,就不会影响到其余变量的缓存行。

最后顺便一提,在jdk8中提供了@Contended的注解,固然通常来讲只容许Jdk中内部,若是你本身使用那就得配置Jvm参数 -RestricContentended = fase,将限制这个注解置位取消。不少文章分析了ConcurrentHashMap,可是都把这个注解给忽略掉了,在ConcurrentHashMap中就使用了这个注解,在ConcurrentHashMap每一个桶都是单独的用计数器去作计算,而这个计数器因为时刻都在变化,因此被用这个注解进行填充缓存行优化,以此来增长性能。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧

3.1.3RingBuffer

在Disruptor中采用了数组的方式保存了咱们的数据,上面咱们也介绍了采用数组保存咱们访问时很好的利用缓存,可是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并非真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,好比数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。

实际上,在这些框架中取余并非使用%运算,都是使用的&与运算,这就要求你设置的大小通常是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增长了访问速度。 若是在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

固然其不只解决了数组快速访问的问题,也解决了不须要再次分配内存的问题,减小了垃圾回收,由于咱们0,10,20等都是执行的同一片内存区域,这样就不须要再次分配内存,频繁的被JVM垃圾回收器回收。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧

自此三大杀器已经说完了,有了这三大杀器为Disruptor如此高性能垫定了基础。接下来还会在讲解如何使用Disruptor和Disruptor的具体的工做原理。

3.2Disruptor怎么使用

下面举了一个简单的例子:

ublic 
static

void
 main
(
String
[]
 args
)

throws

Exception

{

// 队列中的元素

class

Element

{

@Contended

private

String
 value
;

public

String
 getValue
()

{

return
 value
;

}

public

void
 setValue
(
String
 value
)

{

this
.
value 
=
 value
;

}

}

// 生产者的线程工厂

ThreadFactory
 threadFactory 
=

new

ThreadFactory
()

{

int
 i 
=

0
;

@Override

public

Thread
 newThread
(
Runnable
 r
)

{

return

new

Thread
(
r
,

"simpleThread"

+

String
.
valueOf
(
i
++));

}

};

// RingBuffer生产工厂,初始化RingBuffer的时候使用

EventFactory
<
Element
>
 factory 
=

new

EventFactory
<
Element
>()

{

@Override

public

Element
 newInstance
()

{

return

new

Element
();

}

};

// 处理Event的handler

EventHandler
<
Element
>
 handler 
=

new

EventHandler
<
Element
>()

{

@Override

public

void
 onEvent
(
Element
 element
,

long
 sequence
,

boolean
 endOfBatch
)

throws

InterruptedException

{

System
.
out
.
println
(
"Element: "

+

Thread
.
currentThread
().
getName
()

+

": "

+
 element
.
getValue
()

+

": "

+
 sequence
);
//                Thread.sleep(10000000);

}

};

// 阻塞策略

BlockingWaitStrategy
 strategy 
=

new

BlockingWaitStrategy
();

// 指定RingBuffer的大小

int
 bufferSize 
=

8
;

// 建立disruptor,采用单生产者模式

Disruptor
<
Element
>
 disruptor 
=

new

Disruptor
(
factory
,
 bufferSize
,
 threadFactory
,

ProducerType
.
SINGLE
,
 strategy
);

// 设置EventHandler
        disruptor
.
handleEventsWith
(
handler
);

// 启动disruptor的线程
        disruptor
.
start
();

for

(
int
 i 
=

0
;
 i 
<

10
;
 i
++)

{
            disruptor
.
publishEvent
((
element
,
 sequence
)

->

{

System
.
out
.
println
(
"以前的数据"

+
 element
.
getValue
()

+

"当前的sequence"

+
 sequence
);
                element
.
setValue
(
"我是第"

+
 sequence 
+

"个"
);

});

}

}

在Disruptor中有几个比较关键的: ThreadFactory:这是一个线程工厂,用于咱们Disruptor中生产者消费的时候须要的线程。 EventFactory:事件工厂,用于产生咱们队列元素的工厂,在Disruptor中,他会在初始化的时候直接填充满RingBuffer,一次到位。 EventHandler:用于处理Event的handler,这里一个EventHandler能够看作是一个消费者,可是多个EventHandler他们都是独立消费的队列。 WorkHandler:也是用于处理Event的handler,和上面区别在于,多个消费者都是共享同一个队列。 WaitStrategy:等待策略,在Disruptor中有多种策略,来决定消费者获消费时,若是没有数据采起的策略是什么?下面简单列举一下Disruptor中的部分策略

BlockingWaitStrategy:经过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

  • BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

  • LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,若是两个线程同时访问一个访问waitfor,一个访问signalAll时,能够减小lock加锁次数.

  • LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过期间后抛异常。

  • YieldingWaitStrategy:尝试100次,而后Thread.yield()让出cpu

    EventTranslator:实现这个接口能够将咱们的其余数据结构转换为在Disruptor中流通的Event。

3.3工做原理

上面已经介绍了CAS,减小伪共享,RingBuffer三大杀器,介绍下来讲一下Disruptor中生产者和消费者的整个流程。

3.3.1生产者

对于生产者来讲,能够分为多生产者和单生产者,用ProducerType.Single,和ProducerType.MULTI区分,多生产者和单生产者来讲多了CAS,由于单生产者因为是单线程,因此不须要保证线程安全。

在disruptor中一般用disruptor.publishEvent和disruptor.publishEvents()进行单发和群发。

在disruptor发布一个事件进入队列须要下面几个步骤:

  1. 首先获取RingBuffer中下一个在RingBuffer上能够发布的位置,这个能够分为两类:
    • 历来没有写过的位置
    • 已经被全部消费者读过,能够在写的位置。 若是没有读取到会一直尝试去读,disruptor作的很巧妙,并无一直占据CPU,而是经过LockSuport.park(),进行了一下将线程阻塞挂起操做,为的是不让CPU一直进行这种空循环,否则其余线程都抢不到CPU时间片。
      还在用BlockingQueue?读这篇文章,了解下Disruptor吧
      获取位置以后会进行cas进行抢占,若是是单线程就不须要。

  2. 接下来调用咱们上面所介绍的EventTranslator将第一步中RingBuffer中那个位置的event交给EventTranslator进行重写。
  3. 进行发布,在disruptor还有一个额外的数组用来记录当前ringBuffer所在位置目前最新的序列号是多少,拿上面那个0,10,20举例,写到10的时候这个avliableBuffer就在对应的位置上记录目前这个是属于10,有什么用呢后面会介绍。进行发布的时候须要更新这个avliableBuffer,而后进行唤醒全部阻塞的生产者。

下面简单画一下流程,上面咱们拿10举例是不对的,由于bufferSize必需要2的N次方,因此咱们这里拿Buffersize=8来举例:下面介绍了当咱们已经push了8个event也就是一圈的时候,接下来再push 3条消息的一些过程: 1.首先调用next(3),咱们当前在7这个位置上因此接下来三条是8,9,10,取余也就是0,1,2。 2.重写0,1,2这三个内存区域的数据。 3.写avaliableBuffer。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧

对了不知道你们对上述流程是否是很熟悉呢,对的那就是相似咱们的2PC,两阶段提交,先进行RingBuffer的位置锁定,而后在进行提交和通知消费者。具体2PC的介绍能够参照个人另一篇文章再有人问你分布式事务,给他看这篇文章。

3.3.1消费者

对于消费者来讲,上面介绍了分为两种,一种是多个消费者独立消费,一种是多个消费者消费同一个队列,这里介绍一下较为复杂的多个消费者消费同一个队列,能理解这个也就能理解独立消费。 在咱们的disruptor.strat()方法中会启动咱们的消费者线程以此来进行后台消费。在消费者中有两个队列须要咱们关注,一个是全部消费者共享的进度队列,还有个是每一个消费者独立消费进度队列。 1.对消费者共享队列进行下一个Next的CAS抢占,以及对本身消费进度的队列标记当前进度。 2.为本身申请可读的RingBuffer的Next位置,这里的申请不只仅是申请到next,有可能会申请到比Next大的一个范围,阻塞策略的申请的过程以下:

  • 获取生产者对RingBuffer最新写的位置
  • 判断其是否小于我要申请读的位置
  • 若是大于则证实这个位置已经写了,返回给生产者。
  • 若是小于证实尚未写到这个位置,在阻塞策略中会进行阻塞,其会在生产者提交阶段进行唤醒。 3.对这个位置进行可读校验,由于你申请的位置多是连续的,好比生产者目前在7,接下来申请读,若是消费者已经把8和10这个序列号的位置写进去了,可是9这个位置还没来得及写入,因为第一步会返回10,可是9实际上是不能读的,因此得把位置向下收缩到8。
    还在用BlockingQueue?读这篇文章,了解下Disruptor吧
    4.若是收缩完了以后比当前next要小,则继续循环申请。 5.交给handler.onEvent()处理
    同样的咱们举个例子,咱们要申请next=8这个位置。 1.首先在共享队列抢占进度8,在独立队列写入进度7 2.获取8的可读的最大位置,这里根据不一样的策略进行,咱们选择阻塞,因为生产者生产了8,9,10,因此返回的是10,这样和后续就不须要再次和avaliableBuffer进行对比了。 3.最后交给handler进行处理。
    还在用BlockingQueue?读这篇文章,了解下Disruptor吧

    4.Log4j中的Disruptor





下面的图展示了Log4j使用Disruptor,ArrayBlockingQueue以及同步的Log4j吞吐量的对比,能够看见使用了Disruptor完爆了其余的,固然还有更多的框架使用了Disruptor,这里就不作介绍了。
还在用BlockingQueue?读这篇文章,了解下Disruptor吧

最后

本文介绍了传统的阻塞队列的缺点,后文重点吹逼了下Disruptor,以及他这么牛逼的缘由,以及具体的工做流程。

若是之后有人问你叫你设计一个高效无锁队列,须要怎么设计?相信你能从文章中总结出答案,若是对其有疑问或者想和我交流思路,能够关注个人公众号,加我好友和我一块儿讨论。

若是上面问题有什么疑问的话能够关注公众号,来和我一块儿讨论吧,关注便可免费领取海量最新java学习资料视频,以及最新面试资料。

若是你们以为这篇文章对你有帮助,或者你有什么疑问想提供1v1免费vip服务,均可以关注个人公众号,关注便可免费领取海量最新java学习资料视频,以及最新面试资料,你的关注和转发是对我最大的支持,O(∩_∩)O:

还在用BlockingQueue?读这篇文章,了解下Disruptor吧

相关文章
相关标签/搜索