原文地址: haifeiWu和他朋友们的博客
博客地址:www.hchstudio.cn
欢迎转载,转载请注明做者及出处,谢谢!java
最近一直在研究队列的一些问题,今天楼主要分享一个高性能的队列 Disruptor 。git
它是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于 Disruptor 开发的系统单线程能支撑每秒600万订单。github
目前,包括 Apache Storm、Log4j2 在内的不少知名项目都应用了Disruptor以获取高性能。在楼主公司内部使用 Disruptor 与 Netty 结合用来作 GPS 实时数据的处理,性能至关强悍。本文从实战角度来大概了解一下 Disruptor 的实现原理。编程
Disruptor经过如下设计来解决队列速度慢的问题:数组
经过上面的介绍,咱们大概能够了解到 Disruptor 是一个高性能的无锁队列,那么该如何使用呢,下面楼主经过 Disruptor 实现一个简单的生产者消费者模型,介绍 Disruptor 的使用缓存
首先,根据 Disruptor 的事件驱动的编程模型,咱们须要定义一个事件来携带数据。服务器
public class DataEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long getValue() {
return value;
}
}
复制代码
为了让 Disruptor 为咱们预先分配这些事件,咱们须要构造一个 EventFactory 来执行构造网络
public class DataEventFactory implements EventFactory<DataEvent> {
@Override
public DataEvent newInstance() {
return new DataEvent();
}
}
复制代码
一旦咱们定义了事件,咱们须要建立一个处理这些事件的消费者。 在咱们的例子中,咱们要作的就是从控制台中打印出值。架构
public class DataEventHandler implements EventHandler<DataEvent> {
@Override
public void onEvent(DataEvent dataEvent, long l, boolean b) throws Exception {
new DataEventConsumer(dataEvent);
}
}
复制代码
接下来咱们须要初始化 Disruptor ,并定义一个生产者来生成消息并发
public class DisruptorManager {
private final static Logger LOG = LoggerFactory.getLogger(DisruptorManager.class);
/*消费者线程池*/
private static ExecutorService threadPool;
private static Disruptor<DataEvent> disruptor;
private static RingBuffer<DataEvent> ringBuffer;
private static AtomicLong dataNum = new AtomicLong();
public static void init(EventHandler<DataEvent> eventHandler) {
//初始化disruptor
threadPool = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(new DataEventFactory(), 8 * 1024, threadPool, ProducerType.MULTI, new BlockingWaitStrategy());
ringBuffer = disruptor.getRingBuffer();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
LOG.info("放入队列中数据编号{},队列剩余空间{}", dataNum.get(), ringBuffer.remainingCapacity());
}
}, new Date(), 60 * 1000);
}
/** * * @param message */
public static void putDataToQueue(long message) {
if (dataNum.get() == Long.MAX_VALUE) {
dataNum.set(0L);
}
// 往队列中加事件
long next = ringBuffer.next();
try {
ringBuffer.get(next).set(message);
dataNum.incrementAndGet();
} catch (Exception e) {
LOG.error("向RingBuffer存入数据[{}]出现异常=>{}", message, e.getStackTrace());
} finally {
ringBuffer.publish(next);
}
}
public static void close() {
threadPool.shutdown();
disruptor.shutdown();
}
}
复制代码
最后咱们来定义一个 Main 方法来执行代码
public class EventMain {
public static void main(String[] args) throws Exception {
DisruptorManager.init(new DataEventHandler());
for (long l = 0; true; l++) {
DisruptorManager.putDataToQueue(l);
Thread.sleep(1000);
}
}
}
复制代码
上面代码具体感兴趣的小伙伴请移步 github.com/haifeiWu/di…
而后咱们能够看到控制台打印出来的数据
Disruptor 经过精巧的无锁设计实现了在高并发情形下的高性能。
另外在Log4j 2中的异步模式采用了Disruptor来处理。在这里楼主遇到一个小问题,就是在使用Log4j 2经过 TCP 模式往 logstash 发日志数据的时候,因为网络问题致使连接中断,从而致使 Log4j 2 不停的往 ringbuffer 中写数据,ringbuffer数据没有消费者,致使服务器内存跑满。解决方案是设置 Log4j 2 中 Disruptor 队列有界,或者换成 UDP 模式来写日志数据(若是数据不重要的话)。