手写JDK组件之阻塞队列BlockedQueue

研究了一段时间框架,有点审美疲劳,今天讲点轻松的,手写一个阻塞队列,实践一把lock+condition。java

“等待通知”机制

首先复习一下经典的 “等待通知”机制。编程

线程首先获取互斥锁,当线程要求的条件不知足时,释放互斥锁,进入等待状态;当要求的条件知足时,通知等待的线程,从新获取互斥锁 --《极客时间-Java并发编程实战》数组

在Java中实现 “等待通知” 机制通常有两种方式,synchronized/Lock+Condition。多线程

经过synchronized实现 “等待-通知” 机制

synchronized同步原语(或称:管程)配合wait()、notify()、notifyAll()就能够实现“等待通知”机制。并发

机理是怎样的呢?框架

当使用synchronized管程对某一块临界区进行加锁,同一时刻,只能容许一个线程进入synchronized保护的临界区中。ide

当该远程进入临界区以后,其余的线程若是来访问临界区就须要进入等待队列中进行等待。测试

这里要注意,等待队列与锁是一一对应关系,每一个互斥锁都有本身的独立的等待队列。this

Java对象的wait()方法就可以让线程进入等待状态,此时线程被阻塞。spa

当线程进入等待队列时,会释放当前持有的互斥锁。当它释放锁以后,其余的线程就有机会得到该互斥锁并进入临界区。

那如何通知知足条件的线程呢?

经过Java对象的notify()和notifyAll()方法就可以实现。当条件知足时调用notify(),会通知等待队列中的线程,通知它 条件曾经知足过

notify()只能保证在通知的那一时间点,条件是知足的。也就是,有可能被通知线程执行的时间点与通知的时间点是不相等的;即:线程执行的时候,条件已经不知足了(可能有其余的线程知足了该条件而插队)

另外,就算线程被通知而唤醒,在进入临界区前依旧须要获取互斥锁,由于这把须要获取的锁在调用wait()的时候已经被释放了。

须要注意的是

wait()、notify()、notifyAll()被调用的前提是获取到了响应的互斥锁,也就是调用这三个方法的位置都是在 synchronized{} 内部。若是调用的位置在synchronized外部或者不是使用同一把互斥锁,JVM会抛出 java.lang.IllegalMonitorStateException 异常。

关于synchronized实现 “等待-通知” 机制咱们就讲到这里。

经过Lock+Condition实现 “等待-通知” 机制与synchronized相似,咱们本文实现阻塞队列BlockedQueue的方式就是经过Lock+Condition实现。

Lock+Condition原理讲解

Condition 定义了等待/通知两种类型的方法:await()/signal()/signalAll()。线程调用这些方法以前须要获取Condition关联的锁。

Condition对象是由Lock对象经过newCondition()方法建立的,也就是说,Condition是依赖Lock对象的。

类比上文中讲到的synchronized实现 “等待-通知” 机制,Lock/Condition涉及到的方法与synchronized方式涉及到的方法的语义是一一对应的,具体以下表:

实现阻塞队列BlockedQueue

了解并复习了 管程中的“等待/通知机制”,咱们开始实现阻塞队列BlockedQueue。

在编写过程当中参考了JUC中的ArrayBlockingQueue源码实现。

public class BlockedQueue<T> {复制代码
final Lock lock = new ReentrantLock();
        // 条件变量:队列不满
        final Condition notFull = lock.newCondition();
        // 条件变量:队列不空
        final Condition notEmpty = lock.newCondition();复制代码
// 阻塞单列最大长度
        int capacity = 0;复制代码
// 当前已经存在下标:入队
        int putIndex = 0;复制代码
// 当前已经存在下标:出队
        int takeIndex = 0;复制代码
// 元素总数
        int elementsSize = 0;复制代码
// 元素数组
        Object[] items;复制代码
// 构造方法
        public BlockedQueue(int capacity) {
            this.capacity = capacity;
            items = new Object[capacity];
            System.out.println("capacity=" + capacity + ",items.size=" + items.length);
        }复制代码

这段代码中咱们声明了阻塞队列,支持泛型。声明了须要的成员变量以及有参构造方法。构造方法中根据外界输入的队列最大长度初始化了内部的元素数组。

提早声明并初始化了Lock(实现方式为ReentrantLock可重入锁),并在Lock基础上初始化了两个Condition条件变量,分别标记队列不满、队列不空。

// 入队
    void enq(T x) {
        lock.lock();
        try {
            // 队列已满
            while (items.length == elementsSize) {
                // 等待队列不满
                notFull.await();
            }
            // 入队操做...
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            ++elementsSize;
            // 入队后, 通知可出队
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.out.println(x.toString() + "--入队完成");
        }
    }复制代码

这段代码为入队逻辑。

首先获取可重入锁,若是加锁成功则进入临界区逻辑,不然尝试解锁。

当队列已经满时,则进入阻塞状态,等待队列不满。

若是队列不满则进行入队,当前下标的元素即为要入队的元素,元素总长度增1。

// 出队
    T deq() {
        lock.lock();
        T x = null;
        try {
            // 队列已空
            while (items.length == 0) {
                // 等待队列不空
                notEmpty.await();
            }
            // 出队操做...
            x = (T) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            elementsSize--;
            // 出队后,通知可入队
            notFull.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return x;
    }复制代码

这段代码为出队逻辑。

首先获取可重入锁,若是加锁成功则进入临界区逻辑,不然尝试解锁。

当队列已经空,则进入阻塞状态,等待队列不空。

若是队列不空则进行出队操做,先暂存当前下标的元素,并将当前下标的元素标记为空(NULL);元素总长度减1,解锁后返回当前已经出队的元素。

public T get(int index) {
        return (T) items[index];
    }复制代码

这段代码为获取对应下标的元素,若是元素不存在则返回空。

测试阻塞队列:单线程操做

开发完基本逻辑以后,咱们写一个demo来测试一下BlockedQueue。

public static void main(String[] args) {
        BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
        for (int i = 0; i < 20; i++) {
            blockedQueue.enq("snowalker:" + i);
        }复制代码
System.out.println("入队结束:-------------------------");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }复制代码
for (int i = 0; i < 20; i++) {
            blockedQueue.deq();
        }
        System.out.println("出队结束:-------------------------");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }复制代码
}复制代码

逻辑很好理解,咱们构造了一个BlockedQueue,添加了20个元素进行入队。入队以后遍历元素,查看入队结果。

接着进行20次出队,并遍历出队后的结果。

运行结果以下:

capacity=20,items.size=20
    入队结束:-------------------------
    snowalker:0
    snowalker:1
    snowalker:2
    snowalker:3
    snowalker:4
    snowalker:5
    snowalker:6
    snowalker:7
    snowalker:8
    snowalker:9
    snowalker:10
    snowalker:11
    snowalker:12
    snowalker:13
    snowalker:14
    snowalker:15
    snowalker:16
    snowalker:17
    snowalker:18
    snowalker:19
    出队结束:-------------------------
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null复制代码

能够看到,进行了20次入队以后元素共有20个;

进行了20次出队操做以后,元素所有为空,表示出队成功。

测试阻塞队列:多线程操做

咱们接着测试一下多线程并发操做下,BlockedQueue的表现。

BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(2);复制代码
Thread thread0 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("线程0准备完毕");
                    for (int i = 0; i < 10; i++) {
                        blockedQueue.enq("线程0-snowalker-" + i);
                    }
                    System.out.println("线程0入队结束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    end.countDown();
                }
            }
        });复制代码
Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("线程1准备完毕");
                    for (int i = 10; i < 20; i++) {
                        blockedQueue.enq("线程1-snowalker-" + i);
                    }
                    System.out.println("线程1入队结束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    end.countDown();
                }
            }
        });复制代码
thread0.start();
        thread1.start();
        begin.countDown();
        end.await();
        System.out.println("主线程准备完毕!");
        System.out.println("主线程遍历开始!");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }
        System.out.println("Bingo!");
    }复制代码

咱们定义了两个线程,每一个线程添加10个元素,经过闭锁CountDownLatch进行并发添加,添加完成以后遍历添加结果。打印以下:

capacity=20,items.size=20
    线程0准备完毕
    线程1准备完毕
    线程0-snowalker-0--入队完成
    线程1-snowalker-10--入队完成
    线程0-snowalker-1--入队完成
    线程1-snowalker-11--入队完成
    线程0-snowalker-2--入队完成
    线程1-snowalker-12--入队完成
    线程0-snowalker-3--入队完成
    线程1-snowalker-13--入队完成
    线程0-snowalker-4--入队完成
    线程1-snowalker-14--入队完成
    线程0-snowalker-5--入队完成
    线程1-snowalker-15--入队完成
    线程1-snowalker-16--入队完成
    线程1-snowalker-17--入队完成
    线程1-snowalker-18--入队完成
    线程0-snowalker-6--入队完成
    线程1-snowalker-19--入队完成
    线程1入队结束:-------------------------
    线程0-snowalker-7--入队完成
    线程0-snowalker-8--入队完成
    线程0-snowalker-9--入队完成
    线程0入队结束:-------------------------
    主线程准备完毕!
    主线程遍历开始!
    线程0-snowalker-0
    线程1-snowalker-10
    线程0-snowalker-1
    线程1-snowalker-11
    线程0-snowalker-2
    线程1-snowalker-12
    线程0-snowalker-3
    线程1-snowalker-13
    线程0-snowalker-4
    线程1-snowalker-14
    线程0-snowalker-5
    线程1-snowalker-15
    线程0-snowalker-6
    线程1-snowalker-16
    线程1-snowalker-17
    线程1-snowalker-18
    线程1-snowalker-19
    线程0-snowalker-7
    线程0-snowalker-8
    线程0-snowalker-9
    Bingo!复制代码

能够看到结果符合预期,咱们接着测试一下并发出队,接着上面的添加结果进行并发出队操做。

CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch dequeue = new CountDownLatch(2);
        for (int i = 0; i < 20; i++) {
            blockedQueue.enq("snowalker:" + i);
        }复制代码
Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("线程2准备完毕");
                    for (int i = 0; i <= 10; i++) {
                        blockedQueue.deq();
                    }
                    System.out.println("线程2出队结束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    dequeue.countDown();
                }
            }
        });复制代码
Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    begin.await();
                    System.out.println("线程3准备完毕");
                    for (int i = 0; i <= 10; i++) {
                        blockedQueue.deq();
                    }
                    System.out.println("线程3出队结束:-------------------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    dequeue.countDown();
                }
            }
        });复制代码
thread2.start();
        thread3.start();
        begin.countDown();
        dequeue.await();
        System.out.println("主线程准备完毕!");
        System.out.println("主线程遍历开始!");
        for (int i = 0; i < 20; i++) {
            System.out.println(blockedQueue.get(i));
        }
        System.out.println("Bingo!");
    }复制代码

咱们准备了20个元素入队,而后并发进行出队,等待两个线程出队完成以后,在主线程进行队列元素的遍历操做,结果以下:

capacity=20,items.size=20
    snowalker:0--入队完成
    snowalker:1--入队完成
    snowalker:2--入队完成
    snowalker:3--入队完成
    snowalker:4--入队完成
    snowalker:5--入队完成
    snowalker:6--入队完成
    snowalker:7--入队完成
    snowalker:8--入队完成
    snowalker:9--入队完成
    snowalker:10--入队完成
    snowalker:11--入队完成
    snowalker:12--入队完成
    snowalker:13--入队完成
    snowalker:14--入队完成
    snowalker:15--入队完成
    snowalker:16--入队完成
    snowalker:17--入队完成
    snowalker:18--入队完成
    snowalker:19--入队完成
    线程2准备完毕
    线程2出队结束:-------------------------
    线程3准备完毕
    线程3出队结束:-------------------------
    主线程准备完毕!
    主线程遍历开始!
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    null
    Bingo!复制代码

结果如上图所示,能够看到并发出队结果知足预期。

小结

本文咱们利用JUC中的Lock+Condition管程实现了自定义BlockedQueue阻塞队列的开发,并经过测试用例测试了并发条件下的出队入队,结果符合预期。

版权声明: 原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以连接形式标明本文地址。
相关文章
相关标签/搜索