java大牛带你一次清楚,线程并发包J.U.C和AQS的原理

1、J.U.C简介 

Java.util.concurrent 是在并发编程中比较经常使用的工具类。  java

1.Lock node

Lock是JUC包中最重要的组件,解决synchronized关键字在某些场景的短板。  编程

eg.锁修饰的代码块内,调用了同个锁修饰的代码块,锁对象相同,这时候第一个得到锁的代码还没释放,后面又有等待获取锁的代码,就造成死锁状态  bash

2.Lock并发

实现 Lock本质是一个接口,定义了获取和释放锁的抽象方法。定义了锁的一个标准规范。如下是主要实现类: app

ReentrantLock,重入锁,实现了Lock接口。当线程得到锁后,再次获取该锁不须要线程阻塞,只需增长重入次数便可 ide

ReentrantReadWriteLock,读写锁,实现了ReadWriteLock接口。该类维护了两个锁,ReadLock和WriteLock,它们分别实现了Lock函数

接口。适合读多写少的场景。 工具

原则:读和读不互斥、读和写互斥、写和写互斥 源码分析

3.Lock继承关系图 

eg.UML图 

4.重入锁 

设计目的:解决死锁问题 

eg.死锁示例 

public class App {
 
    public synchronized void demo(){ // main得到对象锁
        System.out.println("demo");
        demo2();
    }
    public void demo2(){
        synchronized (this) {
            System.out.println("demo2");
        }
    }
 
    public static void main(String[] args) {
        App app=new App();
        app.demo();
    }
}复制代码

eg.ReentrantLock使用示例 

public class AtomicDemo {
    private static int count=0;
    static Lock lock=new ReentrantLock();
    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count++;
        lock.unlock();
    }
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<1000;i++){
            new Thread(()->{AtomicDemo.inc();}).start();;
        }
        Thread.sleep(3000);
        System.out.println("result:"+count);
    }
}复制代码

eg.ReentrantReadWriteLock使用示例 

public class RWLock {
 
    static ReentrantReadWriteLock wrl = new ReentrantReadWriteLock();
    static Map<String,Object> cacheMap = new HashMap<>();
    static Lock read = wrl.readLock();
    static Lock write = wrl.writeLock();
 
    // 线程B/C/D
    public static final Object get(String key){
        System.out.println("begin read data:" + key);
        read.lock(); // 得到读锁-> 阻塞
        try {
            return cacheMap.get(key);
        }finally {
            read.unlock();
        }
    }
    //线程A
    public static final Object put(String key, Object val){
        write.lock(); // 得到了写锁
        try{
            return cacheMap.put(key,val);
        }finally {
            write.unlock();
        }
    }
 
    public static void main(String[] args) {
        wrl.readLock(); // B线程 ->阻塞
 
        wrl.writeLock(); // A线程
 
        // 读->读是能够共享
        // 读->写 互斥
        // 写->写 互斥
        // 读多写少的场景
    }
 
}
复制代码

2、ReentrantLock实现原理 

1.AQS 

在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。 

2.AQS两种功能 

AQS 的功能分为两种:独占和共享 

独占锁,重入锁ReentrantLock 

共享锁,读写锁ReentrantReadWriteLock 

3.AQS内部实现 

AQS队列内部维护的是一个FIFO双向链表,每一个节点都是双向的,分别指向直接的后继节点和前驱节点。 

每一个Node由线程封装,当线程争抢锁失败后会封装成Node加入AQS队列;当获取锁的线程释放了,会从队列中唤醒一个阻塞的节点(线程)。 

eg.结构 


Node组成 

static final class Node {
	static final Node SHARED = new Node();
	static final Node EXCLUSIVE = null;
	static final int CANCELLED =  1;
	static final int SIGNAL    = -1;
	static final int CONDITION = -2;
	static final int PROPAGATE = -3;
	volatile int waitStatus;
	volatile Node prev; // 前驱节点
	volatile Node next; // 后继节点
	volatile Thread thread; // 当前线程
	Node nextWaiter; // 存储在condition队列中的后继节点
 
	// 是否为共享锁
	final boolean isShared() {
		return nextWaiter == SHARED;
	}
 
	final Node predecessor() throws NullPointerException {
		Node p = prev;
		if (p == null)
			throw new NullPointerException();
		else
			return p;
	}
 
	Node() {    // Used to establish initial head or SHARED marker
	}
 
	// 将线程构形成一个node,添加到等待队列
	Node(Thread thread, Node mode) {     // Used by addWaiter
		this.nextWaiter = mode;
		this.thread = thread;
	}
 
	// 在condition队列中使用
	Node(Thread thread, int waitStatus) { // Used by Condition
		this.waitStatus = waitStatus;
		this.thread = thread;
	}
}复制代码

eg.加入节点 ​ 

eg.释放节点  ​

4.源码分析 

eg.时序图


 CAS原理

protected final boolean compareAndSetState(int expect, int update) {
	// See below for intrinsics setup to support this
	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}复制代码

经过 cas 乐观锁的方式来作比较并替换。若是当前内存中的 state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,不然返回 false。

state 是 AQS 中的一个属性,它在不一样的实现中所表达的含义不同,对于重入锁的实现来讲,表示一个同步状态。它有两个含义:

1. 当 state=0 时,表示无锁状态

2. 当 state>0 时,表示已经有线程得到了锁,它的大小表示重入次数

Unsafe类

Unsafe类,不在java体系中,在sun.misc包中。

stateoff,表示该字段相对该类在内存中地址的偏移量,经过该偏移量找到对应字段。一个java对象可当作是一段内存,每一个字段都按照必定顺序放在这段内存。

Node状态

Node 有5种状态,分别是:CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、默认状态(0)

CANCELLED:在同步队列中等待的线程等待超时或被中断,须要从同步队列中取消该 Node 的结点, 其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状态后的结点将不会再变化。

SIGNAL:只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程。

CONDITION: 和 Condition 有关系。

PROPAGATE:共享模式下,PROPAGATE 状态的线程处于可运行状态。

默认状态0:初始状态。

经过 Node 的状态来判断,ThreadA 竞争锁失败之后是否应该被挂起。若是ThreadA 的 pred节点状态为 SIGNAL,那就表示能够放心挂起当前线程。

LockSupport类

LockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport其实是调用了 Unsafe 类里的函数。

public native void unpark(Object var1);
public native void park(boolean var1, long var2);复制代码

5.公平锁和非公平锁

AQS 是一个同步队列,它可以实现线程的阻塞以及唤醒,但它并不具有业务功能,因此在不一样的同步场景中,会继承 AQS 来实现对应场景的功能。

Sync继承了AQS,Sync有两个具体实现类:

NoFairSync,非公平抢占锁,无论当前队列是否有等待线程,都有机会得到锁。

FairSync,严格按照FIFO得到锁。

3、Condition类

关键方法:await()和signal()/signalAll()

当AwaitThread线程得到lock锁后,调用await方法,释放掉当前使用的锁,将自身封装成Node节点,Condition状态加入等待队列 SignalThread线程竞争锁,当调用signal方法时,将Condition等待队列中的线程转移到AQS队列中,当SignalThread调用unlock释放当前锁后,AwaitThread抢占锁,若还没释放,则经过自旋加入AQS队列。

eg.使用示例

public class ConditionWait implements Runnable{
 
    private Lock lock;
    private Condition condition;
 
    public ConditionWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        try {
            lock.lock(); // 竞争锁
            try {
                System.out.println("begin - ConditionWait");
                condition.await(); // 阻塞(1. 释放锁, 2.阻塞当前线程, FIFO(单向、双向))
                System.out.println("end - ConditionWait");
 
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            lock.unlock(); // 释放锁
        }
 
    }
 
}
复制代码


public class ConditionNotify implements Runnable{
 
    private Lock lock;
    private Condition condition;
 
    public ConditionNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
 
    @Override
    public void run() {
        try{
            lock.lock(); // 得到了锁
            System.out.println("begin - conditionNotify");
            condition.signal(); // 唤醒阻塞状态的线程
            System.out.println("end - conditionNotify");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 释放锁
        }
    }
}复制代码

 

public class App {
    public static void main(String[] args) {
        Lock lock=new ReentrantLock(); // 重入锁
        Condition condition=lock.newCondition();
        lock.newCondition();
        new Thread(new ConditionWait(lock, condition)).start(); // 阻塞await
        new Thread(new ConditionNotify(lock, condition)).start();
    }
} 复制代码
相关文章
相关标签/搜索