AQS使用的同步队列是基于一种CLH锁算法来实现。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,若是发现前驱释放了锁就结束自旋.java
同步器中包含了两个节点类型的引用,一个指向头节点(head),一个指向尾节点(tail),没有获取到锁的线程,加入到队列的过程必须保证线程安全,所以同步器提供了一个基于CAS的设置尾节点的方法
CompareAndSetTail(Node expect,Node update)
,它须要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才能正式与以前的尾节点创建关联。
同步器队列遵循FIFO
,首节点是获取锁成功的节点,首节点的线程在释放锁时,会唤醒后续节点,然后继节点在成功获取到锁后,会把本身设置成首节点,设置首节点是由获取锁成功的线程来完成的,因为只有一个线程能成功获取到锁,因此设置首节点不须要CAS
。
package com.rumenz.task.aqs; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class MyLock { private static final Sync STATE_HOLDER = new Sync(); /** * 经过Sync内部类来持有同步状态, 当状态为1表示锁被持有,0表示锁处于空闲状态 */ private static class Sync extends AbstractQueuedSynchronizer { /** * 是否被独占, 有两种表示方式 * 1. 能够根据状态,state=1表示锁被占用,0表示空闲 * 2. 能够根据当前独占锁的线程来判断,即getExclusiveOwnerThread()!=null 表示被独占 */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() != null; } /** * 尝试获取锁,将状态从0修改成1,操做成功则将当前线程设置为当前独占锁的线程 */ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 释放锁,将状态修改成0 */ @Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } } /** * 下面的实现Lock接口须要重写的方法,基本是就是调用内部内Sync的方法 */ public void lock() { STATE_HOLDER.acquire(1); } public void unlock() { STATE_HOLDER.release(1); } }
package com.rumenz.task.aqs; import org.omg.Messaging.SYNC_WITH_TRANSPORT; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class LockTest { private final static Integer clientTotal=100000; private final static Integer threadTotal=200; private static Count count=new Count(); private static Count unSafe=new Count(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch=new CountDownLatch(clientTotal); final Semaphore semaphore=new Semaphore(threadTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(()->{ try{ semaphore.acquire(); count.getIncrement(); unSafe.getUnSafeIncrement(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("safe:"+count.getCount()); System.out.println("unSafe:"+unSafe.getCount()); executorService.shutdown(); } } class Count{ private MyLock myLock; private volatile int count; Count() { this.myLock=new MyLock(); } int getCount(){ return count; } int getIncrement(){ myLock.lock(); count++; myLock.unlock(); return count; } int getUnSafeIncrement(){ count++; return count; } }
safe:100000 unSafe:99995
关注微信公众号:【入门小站】,解锁更多知识点算法