【AQS】队列和【Disruptor】环形队列

1:使用场景

咱们在编码的过程当中,常常会碰到这样一种场景:java

  • 须要使用多线程的任务
  • 这个任务须要同步,不能并发
  • 咱们想要这些线程一个个乖乖的排队去执行,别串来串去贼烦

这个时候,你可使用synchronized关键字加锁,固然咱们也可使用jdk1.5以后的juc的各类工具,固然这些juc的工具其实都是基于咱们的aqs队列。git



2:aqs队列

不少好用的juc的工具,咱们这里就不写了,咱们这里就来分析aqs队列github

咱们来本身实现一个aqs队列的模板:bash

/**
     * aqs队列使用模板模式
     */
    private static class AqsSync extends AbstractQueuedSynchronizer {
    
        private static final int LOCK = 1;
        private static final int UNLOCK = 0;

        /**
         * 判断是否处于lock状态
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == LOCK;
        }

        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(UNLOCK, LOCK)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;

        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == UNLOCK) {
                throw new IllegalMonitorStateException();
            }
            //没有线程拥有这个锁
            setExclusiveOwnerThread(null);
            setState(UNLOCK);
            return true;
        }
    }
复制代码

而后咱们来使用一下多线程

public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i <= 5; i++) {
            int count = i;
            executorService.execute(() -> {
                try {
                    doTask(count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }


    private static void doTask(int i) throws InterruptedException {
        AqsSync aqs = new AqsSync();
        aqs.acquire(AqsSync.LOCK);
        //这个方法能够超时就退出阻塞,这里是非公平竞争
        //aqs.tryAcquireNanos(AqsSync.LOCK,TimeUnit.MILLISECONDS.toNanos(2000L));
        System.out.println("开始执行任务" + i);
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        aqs.release(AqsSync.UNLOCK);
        System.out.println("结束执行任务" + i);
    }
复制代码

那么aqs队列这么强力,究竟是怎么实现的呢?原理是什么呢,我给个比较不错的博客你们本身按需看吧 :了解aqs队列并发



2:终极大招—Disruptor队列

留心过的同窗都知道,log4j实现异步日志的关键组件就是这个Disruptor,Disruptor可让咱们的线程排队执行,而且执行高效,内存节省异步

添加maven依赖maven

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
</dependency>
复制代码

随便定义个事件对象pojoide

@Data
public class Book{
    private int bookId;
    private String bookName;
    private String bookType;
}
复制代码

定义一个消费者工具

public class BookEventHander implements EventHandler<Book> {
    @Override
    public void onEvent(Book book, long sequence, boolean endOfBatch) throws InterruptedException {
        String threandName = Thread.currentThread().getName();
        String resultT = "consume one ->thread name : {0} , event :{1}";
        String result = MessageFormat.format(resultT,threandName,book);
        System.out.println(result);
        //模拟业务处理时间
        Thread.sleep(5000L);
    }
}
复制代码

定义一个相似生产者的东西(也能够不用定义直接用lambda,官方给出了三种方式,官方文档

public class BookEventProducer {
    private final RingBuffer<Book> ringBuffer;

    public BookEventProducer(RingBuffer<Book> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void loadBook(Book booksource) {
        long sequence = ringBuffer.next();
        try {
            Book book = ringBuffer.get(sequence);
            book.setBookId(booksource.getBookId());
            book.setBookName(booksource.getBookName());
            book.setBookType(booksource.getBookType());
        } finally {
            //最终的生产实际上靠的是这行代码
            ringBuffer.publish(sequence);
        }
    }
}
复制代码

直接来测试下

public static void main(String[] args) throws InterruptedException {
        //建立线程池  用于建立多个线程消费者
        ExecutorService executor = Executors.newCachedThreadPool();
        //定义环形队列大小 2的n次方只能是
        int ringBufferSize = 2048;

        //生成 disruptor 实例
        Disruptor<Book> disruptor = new Disruptor<>(Book::new, ringBufferSize, executor);

        //链接到消费者
        // Connect the handler
        disruptor.handleEventsWith(new BookEventHander(), new BookEventHanderTwo());

        // 启动 disruptor 而且获取生产者
        RingBuffer<Book> ringBuffer = disruptor.start();
        BookEventProducer producer = new BookEventProducer(ringBuffer);

        //开始进行生产
        System.out.println("开始进行生产");
        for (int l = 0; l <= 10; l++) {
            System.out.println("生产第" + l + "条记录");
            Book booksource = new Book();
            booksource.setBookId(l);
            booksource.setBookType("测试类型" + l);
            booksource.setBookName("测试之书" + l);
            producer.loadBook(booksource);
        }
        disruptor.shutdown();
        executor.shutdown();
    }
复制代码

你们能够看到咱们的任务就会乖乖排队去执行了,因此log4j能够用它来异步的完成日志的有序输出。

最后推荐你们使用单线程做为producer,能够大大提升disruptor的吞吐量!



3:总结

在玩java的时候,不免遇到多线程啊,同步啊什么的问题,这个时候要合理利用咱们的vilatile,sync,juc各类工具以及像咱们的Disruptor工具去完成咱们所需的业务。

相关文章
相关标签/搜索