Disruptor设计特色html
队里 | 有界性 | 锁 | 底层结构 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 有锁 | 数组 |
LinkedBlockingQueue | 有界 | 有锁 | 链表 |
ConcurrentLinkedQueue | 无界 | 无锁 | 链表 |
在高并发且要求较高的稳定性的系统场景下,非了防止生产者速度过快,只能选有界队列;同时,为了减小Java的垃圾回收对系统性能的影响尽可能选择“数组”做为队列的底层结构,符合条件只有一个:ArrayBlockingQueuejava
加锁:不加锁的性能 > CAS操做的性能 > 加锁的性能。数组
伪共享:缓存系统中是以缓存行(cache line)为单位存储的,当多线程修改互相独立的变量时,若是这些变量共享同一个缓存行,就会无心中影响彼此的性能,
ArrayBlockingQueue有三个成员变量:缓存
这三个变量很容易放到一个缓存行中,可是之间修改没有太多的关联。因此每次修改,都会使以前缓存的数据失效,从而不能彻底达到共享的效果。安全
public class ArrayBlockingQueue<E> { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; }
// value1和value2可能会产生伪共享 class ValueNoPadding { protected volatile long value1 = 0L; protected volatile long value2 = 0L; } // value1和value2中间插入无用值 p1~p14 class ValuePadding { protected long p1, p2, p3, p4, p5, p6, p7; protected volatile long value1 = 0L; protected long p9, p10, p11, p12, p13, p14; protected volatile long value2 = 0L; }
准备数据容器数据结构
// 数据容器,存放生产和消费的数据内容 public class LongEvent { private long value; }
准备数据容器的生产工厂,用于RingBuffer初始化时的数据填充多线程
// 数据容器生产工厂 public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
准备消费者并发
//消费者 public class LongEventConsumer implements EventHandler<LongEvent> { /** * * @param longEvent * @param sequence 当前的序列 * @param endOfBatch 是不是最后一个数据 * @throws Exception */ @Override public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception { String str = String.format("long event : %s l:%s b:%s", longEvent.getValue(), sequence, endOfBatch); System.out.println(str); } }
生产线程、主线程框架
public class Main { public static void main(String[] args) throws Exception { // 线程工厂 ThreadFactory threadFactory = (r) -> new Thread(r); // disruptor-建立一个disruptor // 设置数据容器的工厂类,ringBuffer的初始化大小,消费者线程的工厂类 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, threadFactory); // disruptor-设置消费者 disruptor.handleEventsWith(new LongEventConsumer()); disruptor.start(); // 获取disruptor的RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 主线程开始生产 for (long l = 0; l <= 8; l++) { long nextIndex = ringBuffer.next(); LongEvent event = ringBuffer.get(nextIndex); event.setValue(l); ringBuffer.publish(nextIndex); Thread.sleep(1000); } } }
// 数据左右两边插入多余变量隔离真正的变量 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; }
public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } public long get() { return value; } // 使用UNSAFE操做直接修改内存值 public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } }