AbstractQueuedSynchronizer 队列同步器

一、当多个线程并发执行的时候,如何完成线程的同步?java

加一把锁,好比设置一个变量state=0;多个线程同时修改变量state=1;修改为功的线程,表示拿到了锁,能够继续执行。为了保证多个线程同时修改时,只有一个线程能够修改为功,能够用方法UNSAFE.compareAndSwapInt。node

/**
             * var1 操做的对象
             * var2 操做的对象属性
             * var3 var2与var3比较,相等才更新
             * var4 更新值
             */
           unsafe.compareAndSwapInt(this, stateOffset, expect, update);

二、对于获取锁失败的线程该如何处理呢?安全

获取锁失败的线程,咱们能够调用方法 LockSupport.park(this); 使当前线程中止,而后将线程插入到队列的尾部。head的后继节点始终指向队列的第一个节点。释放锁,经过head获取队列的第一个节点,而后调用方法并发

LockSupport.unpark(thread);

唤醒该节点上的线程,而后唤醒起来的线程再次经过设置state变量为0,获取锁。ide

上面就是AbstractQueuedSynchronizer(aqs) 的基本原理。测试

三、采用aqs实现一把锁ui

定义静态内部类Syn,该类继承自AbstractQueuedSynchronizer ,重写方法tryAcquire和tryRelease。AbstractQueuedSynchronizer类采用了模板方法的模式,实现了同步器的基本骨架。this

/**
 * protected boolean tryAcquire(int arg)	独占式获取同步状态,试着获取,成功返回true,反之为false
 * protected boolean tryRelease(int arg) 	独占式释放同步状态,等待中的其余线程此时将有机会获取到同步状态
 * protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。
 */
public class SelfLock{
    // 静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放锁,将状态设置为0 因为持有锁的线程只有一个 故不须要用cas更改状态
        protected boolean tryRelease(int releases) {
            if (getState() == 0) throw new
                    IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
    // 仅须要将操做代理到Sync上便可
    private final Sync sync = new Sync();
    public void lock() {
        sync.acquire(1);
    }
    public void unlock() {
        sync.release(0);
    }
}

测试线程

public class TestSelfLock {
    private static SelfLock selfLock = new SelfLock();
    private static int count;
    public static void main(String[] args) throws IOException {
        for (int i = 0; i < 100; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        selfLock.lock();//获取锁
                        count++;
                        System.out.println(Thread.currentThread().getName()+"正在运行;count="+count);
                    }finally {
                        selfLock.unlock();//释放锁
                    }

                }
            }).start();
        }
        System.in.read();
    }

}

从运行结果能够看出该锁保证cout变量的线程安全。代理

 

四、AQS获取锁分析

SelfLock的lock方法调用sync.acquire(1),方法acquire以下:

//tryAcquire就是咱们重写的方法。若是tryAcquire调用返回false,那么会执行&&后面的代码,
//将当前线程封装成一个节点放到aqs内部的队列里。
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter方法:从下面的源码能够看出addWaiter实际上就是将当前线程封装成node节点,而后将该节点插入到队列的尾部。为了保证每个节点均可以安全的插入到队列的尾部,采用了for循环+cas的方式。

private Node addWaiter(Node mode) {
        //将当前线程封装成Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //若是tail不为null
        Node pred = tail;//将pred 指向tail指向的对象。
        if (pred != null) {
            //将tail节点设置当前节点的前驱节点
            node.prev = pred;
            //将node 设置成尾部节点,用了cas操做,若是设置成功,tail指向的对象变为node,
            //pred依然指向的原来的tail指向的对象
            if (compareAndSetTail(pred, node)) {
                //若是设置成功,那么将pred的后继节点设置成node
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    //同步器经过“死循环”来保证节点的正确添加,在“死循
    //环”中只有经过CAS将节点设置成为尾节点以后,当前线程才能从该方法返回,不然,当前线
    //程不断地尝试设置。能够看出,enq(final Node node)方法将并发添加节点的请求经过CAS变
    //得“串行化”了。
    private Node enq(final Node node) {
        for (;;) {//经过for循环,直到设置成功。
            Node t = tail;
            if (t == null) { // Must initialize
                //若是tail为null 必须初始化tail和head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//设置尾节点 cas操做。
                    t.next = node;
                    return t;
                }
            }
        }
    }

 acquireQueued方法:若是当前线程获取锁失败,那么该线程就会被阻塞在for循环里。当被唤醒后,就会再次尝试获取锁,获取锁成功后,跳出循环,继续日后执行。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //若是node的前驱节点为head,而且tryAcquire方法返回true,则将
                //将头结点设置成node
                if (p == head && tryAcquire(arg)) {
                    //设置node为head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //parkAndCheckInterrupt方法里利用LockSupport.park(this)将当前线程挂起 ,
                //等待被前驱节点唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

五、AQS释放锁分析

unlock方法:该方法会调用队列同步器的release方法。

public void unlock() {
    sync.release(1);
}

release方法会调用咱们重写的tryRelease方法, release方法主要的做用就是经过head找到head的后继节点,而后调用LockSupport.unpark(thread)唤醒该节点上的线程。

public final boolean release(int arg) {
    //调用咱们从新的方法tryRelease
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//获取head的后继节点
                               //利用LockSupport.unpark(thread)唤醒后继节点的线程
        return true;
    }
    return false;
}

 

最后:AQS是JUC中不少同步组件的构建基础,好比ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,具体能够详细查看这些类的内部实现。

相关文章
相关标签/搜索