公平锁是指多个线程按照申请锁的顺序来获取锁。java
非公平锁是指多个线程获取锁的顺序并非按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会形成优先级反转或者饥饿现象。node
对于 Java ReentrantLock
而言,经过构造函数指定该锁是不是公平锁,默认是非公平锁。非公平锁的优势在于吞吐量比公平锁大。git
对于Synchronized
而言,也是一种非公平锁。因为其并不像ReentrantLock
是经过 AQS 的来实现线程调度,因此并无任何办法使其变成公平锁。github
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。算法
说的有点抽象,下面会有一个代码的示例。对于 Java ReentrantLock
而言, 他的名字就能够看出是一个可重入锁,其名字是Re entrant Lock
从新进入锁。对于Synchronized
而言,也是一个可重入锁。可重入锁的一个好处是可必定程度避免死锁。编程
synchronized void setA() throws Exception{ Thread.sleep(1000); setB(); } synchronized void setB() throws Exception{ Thread.sleep(1000); }
上面的代码就是一个可重入锁的一个特色,若是不是可重入锁的话,setB 可能不会被当前线程执行,可能形成死锁。数组
独享锁是指该锁一次只能被一个线程所持有。多线程
共享锁是指该锁可被多个线程所持有。并发
对于 Java ReentrantLock
而言,其是独享锁。可是对于 Lock 的另外一个实现类ReadWriteLock
,其读锁是共享锁,其写锁是独享锁。读锁的共享锁可保证并发读是很是高效的,读写,写读 ,写写的过程是互斥的。独享锁与共享锁也是经过 AQS 来实现的,经过实现不一样的方法,来实现独享或者共享。对于Synchronized
而言,固然是独享锁。框架
上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。互斥锁在 Java 中的具体实现就是ReentrantLock
读写锁在 Java 中的具体实现就是ReadWriteLock
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。悲观锁认为对于同一个数据的并发操做,必定是会发生修改的,哪怕没有修改,也会认为修改。所以对于同一个数据的并发操做,悲观锁采起加锁的形式。悲观的认为,不加锁的并发操做必定会出问题。乐观锁则认为对于同一个数据的并发操做,是不会发生修改的。在更新数据的时候,会采用尝试更新,不断从新的方式更新数据。乐观的认为,不加锁的并发操做是没有事情的。
从上面的描述咱们能够看出,悲观锁适合写操做很是多的场景,乐观锁适合读操做很是多的场景,不加锁会带来大量的性能提高。悲观锁在 Java 中的使用,就是利用各类锁。乐观锁在 Java 中的使用,是无锁编程,经常采用的是 CAS 算法,典型的例子就是原子类,经过 CAS 自旋实现原子操做的更新。
分段锁实际上是一种锁的设计,并非具体的一种锁,对于ConcurrentHashMap
而言,其并发的实现就是经过分段锁的形式来实现高效的并发操做。咱们以ConcurrentHashMap
来讲一下分段锁的含义以及设计思想,ConcurrentHashMap
中的分段锁称为 Segment,它即相似于 HashMap(JDK7 与 JDK8 中 HashMap 的实现)的结构,即内部拥有一个 Entry 数组,数组中的每一个元素既是一个链表;同时又是一个 ReentrantLock(Segment 继承了 ReentrantLock)。当须要 put 元素的时候,并非对整个 hashmap 进行加锁,而是先经过 hashcode 来知道他要放在那一个分段中,而后对这个分段进行加锁,因此当多线程 put 的时候,只要不是放在一个分段中,就实现了真正的并行的插入。可是,在统计 size 的时候,可就是获取 hashmap 全局信息的时候,就须要获取全部的分段锁才能统计。分段锁的设计目的是细化锁的粒度,当操做不须要更新整个数组的时候,就仅仅针对数组中的一项进行加锁操做。
这三种锁是指锁的状态,而且是针对Synchronized
。在 Java 5 经过引入锁升级的机制来实现高效Synchronized
。
这三种锁的状态是经过对象监视器在对象头中的字段来代表的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。下降获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另外一个线程所访问,偏向锁就会升级为轻量级锁,其余线程会经过自旋的形式尝试获取锁,不会阻塞,提升性能。
重量级锁是指当锁为轻量级锁的时候,另外一个线程虽然是自旋,但自旋不会一直持续下去,当自旋必定次数的时候,尚未获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其余申请的线程进入阻塞,性能下降。
在 Java 中,自旋锁是指尝试获取锁的线程不会当即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减小线程上下文切换的消耗,缺点是循环会消耗 CPU。
synchronized 的缺陷
Lock、ReadWriteLock 相较于 synchronized,解决了以上的缺陷:
若是采用 Lock,必须主动去释放锁,而且在发生异常时,不会自动释放锁。所以通常来讲,使用 Lock 必须在 try catch 块中进行,而且将释放锁的操做放在 finally 块中进行,以保证锁必定被被释放,防止死锁的发生。
lock()
方法的做用是获取锁。若是锁已被其余线程获取,则进行等待。
tryLock()
方法的做用是尝试获取锁,若是成功,则返回 true;若是失败(即锁已被其余线程获取),则返回 false。也就是说,这个方法不管如何都会当即返回,获取不到锁时不会一直等待。
tryLock(long time, TimeUnit unit)
方法和 tryLock()
方法是相似的,区别仅在于这个方法在获取不到锁时会等待必定的时间,在时间期限以内若是还获取不到锁,就返回 false。若是若是一开始拿到锁或者在等待期间内拿到了锁,则返回 true。
lockInterruptibly()
方法比较特殊,当经过这个方法去获取锁时,若是线程正在等待获取锁,则这个线程可以响应中断,即中断线程的等待状态。也就使说,当两个线程同时经过 lock.lockInterruptibly()
想获取某个锁时,倘若此时线程 A 获取到了锁,而线程 B 只有在等待,那么对线程 B 调用 threadB.interrupt()
方法可以中断线程 B 的等待过程。因为 lockInterruptibly()
的声明中抛出了异常,因此 lock.lockInterruptibly()
必须放在 try 块中或者在调用 lockInterruptibly()
的方法外声明抛出 InterruptedException
。
注意:当一个线程获取了锁以后,是不会被 interrupt() 方法中断的。由于自己在前面的文章中讲过单独调用 interrupt() 方法不能中断正在运行过程当中的线程,只能中断阻塞过程当中的线程。所以当经过 lockInterruptibly() 方法获取某个锁时,若是不能获取到,只有进行等待的状况下,是能够响应中断的。
unlock()
方法的做用是释放锁。
ReentrantLock 是惟一实现了 Lock 接口的类。
ReentrantLock 字面意为可重入锁。
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
ReentrantLock 的核心方法固然是 Lock 中的方法(具体实现彻底基于 Sync
类中提供的方法)。
此外,ReentrantLock 有两个构造方法,功能参考下面源码片断中的注释。
// 同步机制彻底依赖于此 private final Sync sync; // 默认初始化 sync 的实例为非公平锁(NonfairSync) public ReentrantLock() {} // 根据 boolean 值选择初始化 sync 的实例为公平的锁(FairSync)或不公平锁(NonfairSync) public ReentrantLock(boolean fair) {}
Sync
类是 ReentrantLock
的内部类,也是一个抽象类。ReentrantLock
的同步机制几乎彻底依赖于Sync
。使用 AQS 状态来表示锁的保留数(详细介绍参见 AQS)。Sync
是一个抽象类,有两个子类:
FairSync
- 公平锁版本。NonfairSync
- 非公平锁版本。public class ReentrantLockDemo { private ArrayList<Integer> arrayList = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); public static void main(String[] args) { final ReentrantLockDemo demo = new ReentrantLockDemo(); new Thread(() -> demo.insert(Thread.currentThread())).start(); new Thread(() -> demo.insert(Thread.currentThread())).start(); } private void insert(Thread thread) { lock.lock(); try { System.out.println(thread.getName() + "获得了锁"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(thread.getName() + "释放了锁"); lock.unlock(); } } }
👉 更多示例
对于特定的资源,ReadWriteLock 容许多个线程同时对其执行读操做,可是只容许一个线程对其执行写操做。
ReadWriteLock 维护一对相关的锁。一个是读锁;一个是写锁。将读写锁分开,有利于提升并发效率。
ReentrantReadWriteLock 实现了 ReadWriteLock 接口,因此它是一个读写锁。
“读-读”线程之间不存在互斥关系。
“读-写”线程、“写-写”线程之间存在互斥关系。
public interface ReadWriteLock { /** * 返回用于读操做的锁 */ Lock readLock(); /** * 返回用于写操做的锁 */ Lock writeLock(); }
public class ReentrantReadWriteLockDemo { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo(); new Thread(() -> demo.get(Thread.currentThread())).start(); new Thread(() -> demo.get(Thread.currentThread())).start(); } public synchronized void get(Thread thread) { rwl.readLock().lock(); try { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在进行读操做"); } System.out.println(thread.getName() + "读操做完毕"); } finally { rwl.readLock().unlock(); } } }
AQS 做为构建锁或者其余同步组件的基础框架,有必要好好了解一下其原理。
做用:AQS,AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其余同步组件的基础框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等)。
场景:在 LOCK 包中的相关锁(经常使用的有 ReentrantLock、 ReadWriteLock)都是基于 AQS 来构建。然而这些锁都没有直接来继承 AQS,而是定义了一个 Sync 类去继承 AQS。那么为何要这样呢?because:锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就能够很好的隔离两者所关注的事情。
原理:AQS 在内部定义了一个 int 变量 state,用来表示同步状态。AQS 经过一个双向的 FIFO 同步队列来完成同步状态的管理,当有线程获取锁失败后,就被添加到队列末尾。
AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronize。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 等待队列的队头,懒加载。只能经过 setHead 方法修改。 */ private transient volatile Node head; /** 等待队列的队尾,懒加载。只能经过 enq 方法添加新的等待节点。*/ private transient volatile Node tail; /** 同步状态 */ private volatile int state; }
AQS 维护了一个 Node 类型双链表,经过 head 和 tail 指针进行访问。
Node
static final class Node { /** 该等待同步的节点处于共享模式 */ static final Node SHARED = new Node(); /** 该等待同步的节点处于独占模式 */ static final Node EXCLUSIVE = null; /** 等待状态,这个和 state 是不同的:有 1,0,-1,-2,-3 五个值 */ volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 等待锁的线程 */ volatile Thread thread; }
很显然,Node 是一个双链表结构。
waitStatus 5 个状态值的含义:
acquire
/** * 先调用 tryAcquire 查看同步状态。 * 若是成功获取同步状态,则结束方法,直接返回; * 反之,则先调用 addWaiter,再调用 acquireQueued。 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter
addWaiter
方法的做用是将当前线程插入等待同步队列的队尾。
private Node addWaiter(Node mode) { // 1. 将当前线程构建成 Node 类型 Node node = new Node(Thread.currentThread(), mode); // 2. 判断尾指针是否为 null Node pred = tail; if (pred != null) { // 2.2 将当前节点插入队列尾部 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 2.1. 尾指针为 null,说明当前节点是第一个加入队列的节点 enq(node); return node; }
enq
enq
方法的做用是经过自旋(死循环),不断尝试利用 CAS 操做将节点插入队列尾部,直到成功为止。
private Node enq(final Node node) { // 设置死循环,是为了避免断尝试 CAS 操做,直到成功为止 for (;;) { Node t = tail; if (t == null) { // 1. 构造头结点(必须初始化,须要领会双链表的精髓) if (compareAndSetHead(new Node())) tail = head; } else { // 2. 经过 CAS 操做将节点插入队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
acquireQueued
方法的做用是经过自旋(死循环),不断尝试为等待队列中线程获取独占锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 1. 得到当前节点的上一个节点 final Node p = node.predecessor(); // 2. 当前节点可否获取独占式锁 // 2.1 若是当前节点是队列中第一个节点,而且成功获取同步状态,便可以得到独占式锁 // 说明:当前节点的上一个节点是头指针,即意味着当前节点是队列中第一个节点。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued Before
setHead
方法
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
将当前节点经过 setHead 方法设置为队列的头结点,而后将以前的头结点的 next 域设置为 null,而且 pre 域也为 null,即与队列断开,无任何引用方便 GC 时可以将内存进行回收。
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire
方法的做用是使用 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
将节点状态由 INITIAL 设置成 SIGNAL,表示当前线程阻塞。
当 compareAndSetWaitStatus 设置失败,则说明 shouldParkAfterFailedAcquire 方法返回 false,从新进入外部方法 acquireQueued。因为 acquireQueued 方法中是死循环,会再一次执行 shouldParkAfterFailedAcquire,直至 compareAndSetWaitStatus 设置节点状态位为 SIGNAL。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
parkAndCheckInterrupt
方法的做用是调用 LookSupport.park
方法,该方法是用来阻塞当前线程的。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
acquire 流程
综上所述,就是 acquire 的完整流程。能够以一幅图来讲明:
release
release 方法以独占模式发布。若是 tryRelease 返回 true,则经过解锁一个或多个线程来实现。这个方法能够用来实现 Lock.unlock 方法。
public final boolean release(int arg) { // 判断同步状态释放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
unparkSuccessor
unparkSuccessor 方法做用是唤醒 node 的下一个节点。
头指针的后继节点
private void unparkSuccessor(Node node) { /* * 若是状态为负值(便可能须要信号),请尝试清除信号。 * 若是失败或状态因为等待线程而改变也是正常的。 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /** * 释放后继节点的线程。 * 若是状态为 CANCELLED 放或节点明显为空, * 则从尾部向后遍历以找到状态不是 CANCELLED 的后继节点。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 后继节点不为 null 时唤醒该线程 if (s != null) LockSupport.unpark(s.thread); }
总结
acquireInterruptibly
Lock 能响应中断,这是相较于 synchronized 的一个显著优势。
那么 Lock 响应中断的特性是如何实现的?答案就在 acquireInterruptibly 方法中。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) // 线程获取锁失败 doAcquireInterruptibly(arg); }
doAcquireInterruptibly
获取同步状态失败后就会调用 doAcquireInterruptibly 方法
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 将节点插入到同步队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 获取锁出队 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程中断抛异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
与 acquire 方法逻辑几乎一致,惟一的区别是当 parkAndCheckInterrupt 返回 true 时(即线程阻塞时该线程被中断),代码抛出被中断异常。
tryAcquireNanos
经过调用 lock.tryLock(timeout,TimeUnit) 方式达到超时等待获取锁的效果,该方法会在三种状况下才会返回:
咱们仍然经过采起阅读源码的方式来学习底层具体是怎么实现的,该方法会调用 AQS 的方法 tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || // 实现超时等待的效果 doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 1. 根据超时时间和当前时间计算出截止时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 2. 当前线程得到锁出队列 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 3.1 从新计算超时时间 nanosTimeout = deadline - System.nanoTime(); // 3.2 超时返回 false if (nanosTimeout <= 0L) return false; // 3.3 线程阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 3.4 线程被中断抛出被中断异常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
尝试获取共享锁失败,调用 doAcquireShared
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 当该节点的前驱节点是头结点且成功获取同步状态 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
以上代码和 acquireQueued 的代码逻辑十分类似,区别仅在于自旋的条件以及节点出队的操做有所不一样。
releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
doReleaseShared
当成功释放同步状态以后即 tryReleaseShared 会继续执行 doReleaseShared 方法
发送后继信号并确保传播。 (注意:对于独占模式,若是须要信号,释放就至关于调用头的 unparkSuccessor。)
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 若是 CAS 失败,继续自旋 continue; } // 若是头指针变化,break if (h == head) break; } }
acquireSharedInterruptibly 方法与 acquireInterruptibly 几乎一致,再也不赘述。
tryAcquireSharedNanos 方法与 tryAcquireNanos 几乎一致,再也不赘述。
免费Java资料须要本身领取,涵盖了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并发分布式等教程。