今天来说讲我所知道的单机最快的MQ,它叫Disruptor数组
先来介绍一下Disruptor,从翻译上来看,Disruptor—分裂、瓦解,Disruptor是国外某个金融、股票交易所开发的,2011年得到Duke奖,为成为单机最快的MQ,性能及高,无锁CAS,单机支持高并发并发
怎么样,心动了没?来来来,让我来带你们学习一下今天的主角—Disruptoride
你们能够把Disruptor当作是内存里的高效的队列高并发
无锁(CAS)、高并发,使用环形Buffer,直接覆盖(不用清除)旧数据,下降GC频繁,实现了基于事件的生产者消费者模型(观察者模式)性能
RingBuffer有一个序号sequence,指向下一个可用元素,采用数组实现,没有首尾指针学习
首先,它是基于数组实现的,遍历起来要比链表要快
其次不用维护首尾指针,固然他也没有首尾指针,之须要维护一个sequence便可this
这时就会有小伙伴着急了,怎么能覆盖掉呢,那我数据不就丢失了吗?**spa
那确定是不会就让他这么轻易滴把这数据覆盖掉滴,当须要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,说说比较经常使用的线程
//定义Event消息(事件)类 public class LongEvent{ private long value; private String name; @Override public String toString() { return "LongEvent{" + "value=" + value + ", name='" + name + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
//定义消息(事件)工厂 public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
//定义消息(事件)的消费方式 public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getName()+"-----"+longEvent.getValue()); } }
//消息(事件)生产者 public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long val, String name) { long sequence = ringBuffer.next(); try { LongEvent event = ringBuffer.get(sequence); event.setValue(val); event.setName(name); } finally { ringBuffer.publish(sequence); } } }
public static void main(String[] args) { //new一个消息(事件)工厂 LongEventFactory factory = new LongEventFactory(); //设置环形Buffer的SIZE int size = 1024; //new Disruptor,参数是消息(事件)工厂,Buffer的Size,线程工厂 Disruptor<LongEvent> longEventDisruptor = new Disruptor<LongEvent>(factory, size, Executors.defaultThreadFactory()); //设置如何消费生产者产出的消息(事件) longEventDisruptor.handleEventsWith(new LongEventHandler()); //启动--环形Buffer建立成功,全部的位置均已建立好Event对象 longEventDisruptor.start(); //获取Disruptor的环形Buffer RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer(); //new 消息(事件)生产者 LongEventProducer producer = new LongEventProducer(ringBuffer); //循环调用-往里添加消息 for(long l = 0; l<100; l++) { //TODO 调用producer的生产消息(事件)的方法 producer.onData(l,"MingLog-"+l); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //将消息(事件)发布出去 longEventDisruptor.shutdown(); }
好了,Disruptor讲解到这里就结束了,你们有什么想要学习的均可以私信或评论告诉我哦\~ 我会尽全力知足你们滴,我学,你也学,咳咳\~广告看多了翻译
点赞、关注来一波好吗,秋梨膏~