Java并发编程笔记之ArrayBlockingQueue源码分析

JDK 中基于数组的阻塞队列 ArrayBlockingQueue 原理剖析,ArrayBlockingQueue 内部如何基于一把独占锁以及对应的两个条件变量实现出入队操做的线程安全?数组

首先咱们先大概的浏览一下ArrayBlockingQueue 的内部构造,以下类图:缓存

如类图所示,能够看到ArrayBlockingQueue 内部有个数组items 用来存放队列元素,putIndex变量标示入队元素的下标,takeIndex是出队的下标,count是用来统计队列元素个数,安全

从定义能够知道,这些属性并无使用valatile修饰,这是由于访问这些变量的使用都是在锁块内被用。而加锁了,就足以保证了锁块内变量的内存可见性。cookie

 

另外还有个独占锁lock 用来保证出队入队操做的原子性,这保证了同时只有一个线程能够进行入队出队操做,另外notEmpty,notFull条件变量用来进行出队入队的同步。并发

因为ArrayBlockingQueue 是有界队列,因此构造函数必须传入队列大小的参数。app

接下来咱们进入ArrayBlockingQueue的源码看,以下:框架

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
}
   public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

如上面代码所示,默认状况下使用的是ReentrantLock提供的非公平独占锁进行入队出队操做的加锁。异步

 

接下来主要看看ArrayBlockingQueue的主要的几个操做的源码,以下:async

  1.offer 操做,向队列尾部插入一个元素,若是队列有空闲容量,则插入成功后返回true,若是队列已满则丢弃当前元素,而后返回false,函数

若是e元素为null,则抛出 NullPointerException 异常,另外该方法是不阻塞的。源码以下:

  public boolean offer(E e) {
        //(1)e为null,则抛出NullPointerException异常
        checkNotNull(e);
        //(2)获取独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //(3)若是队列满则返回false
            if (count == items.length)
                return false;
            else {
                //(4)否者插入元素
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

代码(2)获取独占锁,当前线程获取到该锁后,其余入队和出队操做的线程都会被阻塞挂起后放入lock锁的AQS阻塞队列。

代码(3)若是队列满则直接返回false,不然调用enqueue方法后返回true,enqueue的源码以下:

  private void enqueue(E x) {
        //(6)元素入队
        final Object[] items = this.items;
        items[putIndex] = x;
        //(7)计算下一个元素应该存放的下标
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //(8)
        notEmpty.signal();
    }

能够看到上面代码首先把当前元素放入items数组,而后计算下一个元素应该存放的下标,而后递增元素个数计数器,最后激活 notEmpty 的条件队列中由于调用 poll 或者 take 操做而被阻塞的的一个线程。

这里因为在操做共享变量,好比count前加了锁,因此不存在内存不可见问题,加过锁后获取的共享变量都是从主存获取的,而不是在CPU缓存获取寄存器里面的值。

代码(5)释放锁,释放锁后会把修改的共享变量值,好比Count的值刷新回主内存中,这样其余线程经过加锁再次读取这些共享变量后就能够看到最新的值。

 

  2.put操做,向队列尾部插入一个元素,若是队列有空闲则插入后直接返回true,若是队列已满则阻塞当前线程直到队列有空闲插入成功后返回true,

若是在阻塞的时候被其余线程设置了中断标志,则被阻塞线程会抛出InterruptedException 异常而返回,另外若是 e 元素为 null 则抛出 NullPointerException 异常。源码以下:

public void put(E e) throws InterruptedException {
    //(1)
    checkNotNull(e);
    final ReentrantLock lock = this.lock;

    //(2)获取锁(可被中断)
    lock.lockInterruptibly();
    try {

        //(3)若是队列满,则把当前线程放入notFull管理的条件队列
        while (count == items.length)
            notFull.await();

        //(4)插入元素
        enqueue(e);
    } finally {
        //(5)
        lock.unlock();
    }
}

代码(2)在获取锁的过程当中当前线程被其它线程中断了,则当前线程会抛出 InterruptedException 异常而退出。

代码(3)判断若是当前队列满了,则把当前线程阻塞挂起后放入到 notFull 的条件队列,注意这里是使用了 while 而不是 if。为何须要while呢?

这是由于考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候,notFull.await() 在某种状况下会自动返回。

若是使用if语句简单判断一下,那么虚假唤醒后会执行代码(4),元素入队,而且递增计数器,而这时候队列已是满了的,致使队列元素个数大于了队列设置的容量,致使程序出错。

而使用使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是不是满的,若是是则再次进行等待。

代码(4)若是队列不满则插入当前元素。

 

  3.poll操做,从队列头部获取并移除一个元素,若是队列为空则返回 null,该方法是不阻塞的。源码以下:

public E poll() {
    //(1)获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //(2)当前队列为空则返回null,否者调用dequeue()获取
        return (count == 0) ? null : dequeue();
    } finally {
        //(3)释放锁
        lock.unlock();
    }
}

代码(1)获取独占锁

代码(2)若是队列为空则返回 null,否者调用 dequeue() 方法,dequeue 源码以下:

private E dequeue() {
    final Object[] items = this.items;

    //(4)获取元素值
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    //(5)数组中值值为null;
    items[takeIndex] = null;

    //(6)队头指针计算,队列元素个数减一
   if (++takeIndex == items.length)
            takeIndex = 0;
    count--;

    //(7)发送信号激活notFull条件队列里面的一个线程
    notFull.signal();
    return x;
}

如上代码,能够看到首先获取当前队头元素保存到局部变量,而后重置队头元素为null,并从新设置队头下标,元素计数器递减,最后发送信号激活notFull 的条件队列里面一个由于调用 put 或者 offer 而被阻塞的线程。

 

  4.take 操做,获取当前队列头部元素,并从队列里面移除,若是队列为空则阻塞调用线程。若是队列为空则阻塞当前线程知道队列不为空,而后返回元素,

若是若是在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。源码以下:

public E take() throws InterruptedException {
    //(1)获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {

        //(2)队列为空,则等待,直到队列有元素
        while (count == 0)
            notEmpty.await();
        //(3)获取队头元素
        return dequeue();
    } finally {
        //(4) 释放锁
        lock.unlock();
    }
}

能够看到take操做的代码也比较简单,与poll相比,只是步骤(2)若是队列为空,则把当前线程挂起后放入到notEmpty的条件队列,等其余线程调用notEmpty.signal() 方法后在返回,

须要注意的是这里也是使用 while 循环进行检测并等待而不是使用 if。之因此这样作,道理都是同样。这里就不在解释了。

 

  5.peek 操做获取队列头部元素可是不从队列里面移除,若是队列为空则返回 null,该方法是不阻塞的。源码以下:

public E peek() {
    //(1)获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //(2)
        return itemAt(takeIndex);
    } finally {
       //(3)
        lock.unlock();
    }
}

 @SuppressWarnings("unchecked")
final E itemAt(int i) {
        return (E) items[i];
}

peek的实现更加简单,首先获取独占锁,而后从数组items 中获取当前队头下标的值并返回,在返回以前释放了获取的锁。

 

  6. size 操做,获取当前队列元素个数。源码以下:

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

size 操做是简单的,获取锁后直接返回 count,并在返回前释放锁。也许你会有疑问这里有没有修改Count的值,只是简单的获取下,为什么要加锁呢?

答案很简单,若是count声明为volatile,这里就不须要加锁了,由于由于 volatile 类型变量保证了内存的可见性,而 ArrayBlockingQueue 的设计中 count 并无声明为 volatile,

这是由于count的操做都是在获取锁后进行的,而获取锁的语义之一就是获取锁后访问的变量都是从主内存获取的,这就保证了变量的内存可见性。

 

最后用一张图来加深对ArrayBlockingQueue的理解,以下图:

 

总结:ArrayBlockingQueue 经过使用全局独占锁实现同时只能有一个线程进行入队或者出队操做,这个锁的粒度比较大,有点相似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操做的结果是精确的,由于计算前加了全局锁。

 

2、Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用

在高并发而且响应时间要求比较小的系统中同步打日志已经知足不了需求了,这是由于打日志自己是须要同步写磁盘的,会形成 响应时间 增长,以下图同步日志打印模型为:

异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,而后使用一个线程专门负责从队列中获取日志任务写入磁盘,其模型具体以下图:

 

 

 

如图可知其实 logback 的异步日志模型是一个多生产者单消费者模型,经过使用队列把同步日志打印转换为了异步,业务线程调用异步 appender 只须要把日志任务放入日志队列,日志线程则负责使用同步的 appender 进行具体的日志打印到磁盘。

 

接下来看看异步日志打印具体实现,要把同步日志打印改成异步须要修改 logback 的 xml 配置文件以下:

<appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
        <file>project.log</file>
        <encoding>UTF-8</encoding>
        <append>true</append>

        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- daily rollover -->
            <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
            <!-- keep 7 days' worth of history -->
            <maxHistory>7</maxHistory>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>
                <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
                ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n]]>
            </pattern>
        </layout>
    </appender>

    <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender">
        <discardingThreshold>0</discardingThreshold>
        <queueSize>1024</queueSize>
        <neverBlock>true</neverBlock>
        <appender-ref ref="PROJECT" />
    </appender>
     <logger name="PROJECT_LOGGER" additivity="false">
        <level value="WARN" />
        <appender-ref ref="asyncProject" />
    </logger>

可知 AsyncAppender 是实现异步日志的关键,下节主要讲这个的内部实现。

 

异步原理实现

  首先从 AsyncAppender 的类图结构来从全局了解下 AsyncAppender 中组件构成,类图以下所示:

从上面的类图咱们能够知道以下两点:

  1.如上图可知 AsyncAppender 继承自 AsyncAppenderBase,其中后者具体实现了异步日志模型的主要功能,前者只是重写了其中的一些方法。另外从上类图可知 logback 中的异步日志队列是一个阻塞队列, 后面会知道实际上是一个有界阻塞队列 ArrayBlockingQueue, 其中 queueSize 是有界队列的元素个数默认为 256。

  2.worker 是个线程,也就是异步日志打印模型中的单消费者线程,aai 是一个 appender 的装饰器里面存放同步日志的 appender, 其中 appenderCount 记录 aai 里面附加的同步 appender 的个数;neverBlock 是当日志队列满了的时候是否阻塞打日志的线程的一个开关;discardingThreshold 是一个阈值,当日志队列里面空闲个数小于该值时候新来的某些级别的日志会被直接丢弃,下面会具体讲到。

 

首先咱们来看下什么时候建立的日志队列以及什么时候启动的消费线程,这须要看下 AsyncAppenderBase 的 start 方法,该方法是在解析完毕配置 AsyncAppenderBase 的 xml 的节点元素后被调用,源码以下:

public void start() {
    ...
    //(1)日志队列为有界阻塞队列
    blockingQueue = new ArrayBlockingQueue<E>(queueSize);
    //(2)若是没设置discardingThreshold则设置为队列大小的1/5
    if (discardingThreshold == UNDEFINED)
      discardingThreshold = queueSize / 5;
    //(3)设置消费线程为守护线程,并设置日志名称
    worker.setDaemon(true);
    worker.setName("AsyncAppender-Worker-" + worker.getName());
    //(4)设置启动消费线程
    super.start();
    worker.start();
}

从上代码可知以下两点:

  1. logback 使用的队列是有界队列 ArrayBlockingQueue,之因此使用有界队列是考虑到内存溢出问题,在高并发下写日志的 qps 会很高若是设置为无界队列队列自己会占用很大内存,极可能会形成 内存溢出。

  2.这里消费日志队列的 worker 线程被设置为了守护线程,意味着当主线程运行结束而且当前没有用户线程时候该 worker 线程会随着 JVM 的退出而终止,而无论日志队列里面是否还有日志任务未被处理。另外这里设置了线程的名称是个很好的习惯,由于这在查找问题的时候颇有帮助,根据线程名字就能够定位到是哪一个线程。

 

既然是有界队列那么确定须要考虑若是队列满了,该如何处置,是丢弃老的日志任务,仍是阻塞日志打印线程直到队列有空余元素那?

要回答这个问题,咱们须要看看具体进行日志打印的AsyncAppenderBase 的 append 方法,源码以下:

protected void append(E eventObject) {
   //(5)调用AsyncAppender重写的isDiscardable
   if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
       return;
   }
   ...
   //(6)放入日志任务到队列
   put(eventObject);
}

private boolean isQueueBelowDiscardingThreshold() {
   return (blockingQueue.remainingCapacity() < discardingThreshold);
}

其中 (5) 调用了 AsyncAppender 重写的 isDiscardable,源码以下:

    //(7)
    protected boolean isDiscardable(ILoggingEvent event) {
        Level level = event.getLevel();
        return level.toInt() <= Level.INFO_INT;
    }

结合 代码(5)和代码(7) 可知若是当前日志的级别小于 INFO_INT 级别而且当前队列的剩余容量小于 discardingThreshold 时候会直接丢弃这些日志任务。

 

接下来看具体步骤 (6) 的 put 方法,源码以下:

 private void put(E eventObject) {
        //(8)
        if (neverBlock) {
            blockingQueue.offer(eventObject);
        } else {
            try {//(9)
                blockingQueue.put(eventObject);
            } catch (InterruptedException e) {
                // Interruption of current thread when in doAppend method should not be consumed
                // by AsyncAppender
                Thread.currentThread().interrupt();
            }
        }
 }

可知若是 neverBlock 设置为了 false(默认为 false)则会调用阻塞队列的 put 方法,而 put 是阻塞的,也就是说若是当前队列满了,若是在企图调用 put 方法向队列放入一个元素则调用线程会被阻塞直到队列有空余空间。这里有必要提下其中第 (9) 步当日志队列满了的时候 put 方法会调用 await() 方法阻塞当前线程,若是其它线程中断了该线程,那么该线程会抛出 InterruptedException 异常,那么当前的日志任务就会被丢弃了。若是 neverBlock 设置为了 true 则会调用阻塞队列的 offer 方法,而该方法是非阻塞的,若是当前队列满了,则会直接返回,也就是丢弃当前日志任务。

 

最后看下 addAppender 方法内是什么的,源码以下:

 public void addAppender(Appender<E> newAppender) {
        if (appenderCount == 0) {
            appenderCount++;
            ...
            aai.addAppender(newAppender);
        } else {
            addWarn("One and only one appender may be attached to AsyncAppender.");
            addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
        }
 }

如上代码可知一个异步 appender 只能绑定一个同步 appender, 这个 appender 会被放到 AppenderAttachableImpl 的 appenderList 列表里面。

 

到这里咱们已经分析完了日志生产线程放入日志任务到日志队列的实现,下面一块儿来看下消费线程是如何从队列里面消费日志任务并写入磁盘的,因为消费线程是一个线程,那就从 worker 的 run 方法看起,源码以下所示:

class Worker extends Thread {

        public void run() {

            AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
            AppenderAttachableImpl<E> aai = parent.aai;

            //(10)一直循环直到该线程被中断
            while (parent.isStarted()) {
                try {//(11)从阻塞队列获取元素
                    E e = parent.blockingQueue.take();
                    aai.appendLoopOnAppenders(e);
                } catch (InterruptedException ie) {
                    break;
                }
            }

            //(12)到这里说明该线程被中断,则吧队列里面的剩余日志任务
            //刷新到磁盘
            for (E e : parent.blockingQueue) {
                aai.appendLoopOnAppenders(e);
                parent.blockingQueue.remove(e);
            }
            ...
..        }
}

其中(11)从日志队列使用 take 方法获取一个日志任务,若是当前队列为空则当前线程会阻塞到 take 方法直到队列不为空才返回,获取到日志任务后会调用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,该方法会循环调用经过 addAppender 注入的同步日志 appener 具体实现日志打印到磁盘的任务。

相关文章
相关标签/搜索