高并发基础

转载自并发编程网 – ifeve.com本文连接地址: 高并发编程必备基础

1、前言

借用Java并发编程实践中的话”编写正确的程序并不容易,而编写正常的并发程序就更难了”,相比于顺序执行的状况,多线程的线程安全问题是微妙并且出乎意料的,由于在没有进行适当同步的状况下多线程中各个操做的顺序是不可预期的,本文算是对多线程状况下同步策略的一个简单介绍。java

 

2、 什么是线程安全问题

线程安全问题是指当多个线程同时读写一个状态变量,而且没有任何同步措施时候,致使脏数据或者其余不可预见的结果的问题。Java中首要的同步策略是使用Synchronized关键字,它提供了可重入的独占锁。node

3、 什么是共享变量可见性问题

要谈可见性首先须要介绍下多线程处理共享变量时候的Java中内存模型。算法

image.png

Java内存模型规定了全部的变量都存放在主内存中,当线程使用变量时候都是把主内存里面的变量拷贝到了本身的工做空间或者叫作工做内存。数据库

 

image.png

 如图是双核CPU系统架构,每核有本身的控制器和运算器,其中控制器包含一组寄存器和操做控制器,运算器执行算术逻辑运算,而且有本身的一级缓存,而且有些架构里面双核还有个共享的二级缓存。对应Java内存模型里面的工做内存,在实现上这里是指L1或者L2缓存或者本身cpu的寄存器。

 

当线程操做一个共享变量时候操做流程为:编程

  • 线程首先从主内存拷贝共享变量到本身的工做空间
  • 而后对工做空间里的变量进行处理
  • 处理完后更新变量值到主内存

那么假如线程A和B同时去处理一个共享变量,会出现什么状况那?
首先他们都会去走上面的三个流程,假如线程A拷贝共享变量到了工做内存,而且已经对数据进行了更新可是尚未更新会主内存(结果可能目前存放在当前cpu的寄存器或者高速缓存),这时候线程B拷贝共享变量到了本身的工做内存进行处理,处理后,线程A才把本身的处理结果更更新到主内存或者缓存,可知 线程B处理的并非线程A处理后的结果,也就是说线程A处理后的变量值对线程B不可见,这就是共享变量的不可见性问题。缓存

构成共享变量内存不可见缘由是由于三步流程不是原子性操做,下面知道使用恰当同步就能够解决这个问题。安全

咱们知道ArrayList是线程不安全的,由于他的读写方法没有同步策略,会致使脏数据和不可预期的结果,下面咱们就一一讲解如何解决。多线程

这是线程不安全的
public class ArrayList<E> { public E get(int index) { rangeCheck(index); return elementData(index); } public E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } } 

4、原子性

4.1 介绍

假设线程A执行操做Ao和线程B执行操做Bo ,那么从A看,当B线程执行Bo操做时候,那么Bo操做所有执行,要么所有不执行,咱们称Ao和Bo操做互为原子性操做,在设计计数器时候通常都是先读取当前值,而后+1,而后更新会变量,是读-改-写的过程,这个过程必须是原子性的操做。架构

public class ThreadNotSafeCount { private Long value; public Long getCount() { return value; } public void inc() { ++value; } } 

如上代码是线程不安全的,由于不能保证++value是原子性操做。方法一是使用Synchronized进行同步以下:并发

public class ThreadSafeCount { private Long value; public synchronized Long getCount() { return value; } public synchronized void inc() { ++value; } } 

注意,这里不能简单的使用volatile修饰value进行同步,由于变量值依赖了当前值

使用Synchronized确实能够实现线程安全,即实现可见性和同步,可是Synchronized是独占锁,没有获取内部锁的线程会被阻塞掉,那么有没有恰好的实现那?答案是确定的。

4.2 原子变量类

原子变量类比锁更轻巧,好比AtomicLong表明了一个Long值,并提供了get,set方法,get,set方法语义和volatile相同,由于AtomicLong内部就是使用了volatile修饰的真正的Long变量。另外提供了原子性的自增自减操做,因此计数器能够改下为:

public class ThreadSafeCount { private AtomicLong value = new AtomicLong(0L); public Long getCount() { return value.get(); } public void inc() { value.incrementAndGet(); } } 

那么相比使用synchronized的好处在于原子类操做不会致使线程的挂起和从新调度,由于他内部使用的是cas的非阻塞算法。

经常使用的原子类变量为:AtomicLong,AtomicInteger,AtomicBoolean,AtomicReference.

五 CAS介绍

CAS 即CompareAndSet,也就是比较并设置,CAS有三个操做数分别为:内存位置,旧的预期值,新的值,操做含义是当内存位置的变量值为旧的预期值时候使用新的值替换旧的值。通俗的说就是看内存位置的变量值是否是我给的旧的预期值,若是是则使用我给的新的值替换他,若是不是返回给我旧值。这个是处理器提供的一个原子性指令。上面介绍的AtomicLong的自增就是使用这种方式实现:

public final long incrementAndGet() { for (;;) { long current = get();(1) long next = current + 1;(2) if (compareAndSet(current, next))(3) return next; } } public final boolean compareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); } 

假如当前值为1,那么线程A和检查B同时执行到了(3)时候各自的next都是2,current=1,假如线程A先执行了3,那么这个是原子性操做,会把档期值更新为2而且返回1,if判断true因此incrementAndGet返回2.这时候线程B执行3,由于current=1而当前变量实际值为2,因此if判断为false,继续循环,若是没有其余线程去自增变量的话,此次线程B就会更新变量为3而后退出。

这里使用了无限循环使用CAS进行轮询检查,虽然必定程度浪费了cpu资源,可是相比锁来讲避免的线程上下文切换和调度。

6、什么是可重入锁

当一个线程要获取一个被其余线程占用的锁时候,该线程会被阻塞,那么当一个线程再次获取它本身已经获取的锁时候是否会被阻塞那?若是不须要阻塞那么咱们说该锁是可重入锁,也就是说只要该线程获取了该锁,那么能够无限制次数进入被该锁锁住的代码。

先看一个例子若是锁不是可重入的,看看会出现什么问题。

public class Hello{ public Synchronized void helloA(){ System.out.println("hello"); } public Synchronized void helloB(){ System.out.println("hello B"); helloA(); } } 

如上面代码当调用helloB函数前会先获取内置锁,而后打印输出,而后调用helloA方法,调用前会先去获取内置锁,若是内置锁不是可重入的那么该调用就会致使死锁了,由于线程持有并等待了锁。

实际上内部锁是可重入锁,例如synchronized关键字管理的方法,可重入锁的原理是在锁内部维护了一个线程标示,标示该锁目前被那个线程占用,而后关联一个计数器,一开始计数器值为0,说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器会变成1,其余线程在获取该锁时候发现锁的全部者不是本身因此被阻塞,可是当获取该锁的线程再次获取锁时候发现锁拥有者是本身会把计数器值+1, 当释放锁后计数器会-1,当计数器为0时候,锁里面的线程标示重置为null,这时候阻塞的线程会获取被唤醒来获取该锁。

7、Synchronized关键字

7.1 Synchronized介绍

synchronized块是Java提供的一种强制性内置锁,每一个Java对象均可以隐式的充当一个用于同步的锁的功能,这些内置的锁被称为内部锁或者叫监视器锁,执行代码在进入synchronized代码块前会自动获取内部锁,这时候其余线程访问该同步代码块时候会阻塞掉。拿到内部锁的线程会在正常退出同步代码块或者异常抛出后释放内部锁,这时候阻塞掉的线程才能获取内部锁进入同步代码块。

7.2 Synchronized同步实例

内部锁是一种互斥锁,具体说是同时只有一个线程能够拿到该锁,当一个线程拿到该锁而且没有释放的状况下,其余线程只能等待。

对于上面说的ArrayList可使用synchronized进行同步来处理可见性问题。

使用synchronized对方法进行同步 public class ArrayList<E> { public synchronized E get(int index) { rangeCheck(index); return elementData(index); } public synchronized E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } } 

image.png

如图当线程A获取内部锁进入同步代码块后,线程B也准备要进入同步块,可是因为A还没释放锁,因此B如今进入等待,使用同步能够保证线程A获取锁到释放锁期间的变量值对B获取锁后均可见。也就是说当B开始执行A执行的代码同步块时候能够看到A操做的全部变量值,这里具体说是当线程B获取b的值时候可以保证获取的值是2。这时由于线程A进入同步块修改变量值后,会在退出同步块前把值刷新到主内存,而线程B在进入同步块前会首先清空本地内存内容,从主内存从新获取变量值,因此实现了可见性。可是要注意一点全部线程使用的是同一个锁。

注意 Synchronized关键字会引发线程上下文切换和线程调度

8、 ReentrantReadWriteLock介绍

使用synchronized能够实现同步,可是缺点是同时只有一个线程能够访问共享变量,可是正常状况下,对于多个读操做操做共享变量时候是不须要同步的,synchronized时候没法实现多个读线程同时执行,而大部分状况下读操做次数多于写操做,因此这大大下降了并发性,因此出现了ReentrantReadWriteLock,它能够实现读写分离,多个线程同时进行读取,可是最多一个写线程存在。

对于上面的方法如今能够修改成:

public class ArrayList<E> { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public E get(int index) { Lock readLock = readWriteLock.readLock(); readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public E set(int index, E element) { Lock wirteLock = readWriteLock.writeLock(); wirteLock.lock(); try { return list.set(index, element); } finally { wirteLock.unlock(); } } } 

如代码在get方法时候经过 readWriteLock.readLock()获取了读锁,多个线程能够同时获取这读锁,set方法经过readWriteLock.writeLock()获取了写锁,同时只有一个线程能够获取写锁,其余线程在获取写锁时候会阻塞直到写锁被释放。假如一个线程已经获取了读锁,这时候若是一个线程要获取写锁时候要等待直到释放了读锁,若是一个线程获取了写锁,那么全部获取读锁的线程须要等待直到写锁被释放。因此相比synchronized来讲运行多个读者同时存在,因此提升了并发量。
注意 须要使用者显示调用Lock与unlock操做

9、 Volatile变量

对于避免不可见性问题,Java还提供了一种弱形式的同步,即便用了volatile关键字。该关键字确保了对一个变量的更新对其余线程可见。当一个变量被声明为volatile时候,线程写入时候不会把值缓存在寄存器或者或者在其余地方,当线程读取的时候会从主内存从新获取最新值,而不是使用当前线程的拷贝内存变量值。

volatile虽然提供了可见性保证,可是不能使用他来构建复合的原子性操做,也就是说当一个变量依赖其余变量或者更新变量值时候新值依赖当前老值时候不在适用。与synchronized类似之处在于如图

image.png

如图线程A修改了volatile变量b的值,而后线程B读取了改变量值,那么全部A线程在写入变量b值前可见的变量值,在B读取volatile变量b后对线程B都是可见的,图中线程B对A操做的变量a,b的值均可见的。volatile的内存语义和synchronized有相似之处,具体说是说当线程写入了volatile变量值就等价于线程退出synchronized同步块(会把写入到本地内存的变量值同步到主内存),读取volatile变量值就至关于进入同步块(会先清空本地内存变量值,从主内存获取最新值)。

下面的Integer也是线程不安全的,由于没有进行同步措施

public class ThreadNotSafeInteger { private int value; public int get() { return value; } public void set(int value) { this.value = value; } } 

使用synchronized关键字进行同步以下:

public class ThreadSafeInteger { private int value; public synchronized int get() { return value; } public synchronized void set(int value) { this.value = value; } } 

等价于使用volatile进行同步以下:

public class ThreadSafeInteger { private volatile int value; public int get() { return value; } public void set(int value) { this.value = value; } } 

这里使用synchronized和使用volatile是等价的,可是并非全部状况下都是等价,通常只有知足下面全部条件才能使用volatile

  • 写入变量值时候不依赖变量的当前值,或者可以保证只有一个线程修改变量值。
  • 写入的变量值不依赖其余变量的参与。
  • 读取变量值时候不能由于其余缘由进行枷锁。

另外 加锁能够同时保证可见性和原子性,而volatile只保证变量值的可见性。

注意 volatile关键字不会引发线程上下文切换和线程调度。另外volatile还用来解决重排序问题,后面会讲到。

10、 乐观锁与悲观锁

10.1 悲观锁

悲观锁,指数据被外界修改持保守态度(悲观),在整个数据处理过程当中,将数据处于锁定状态。 悲观锁的实现,每每依靠数据库提供的锁机制 。数据库中实现是对数据记录进行操做前,先给记录加排它锁,若是获取锁失败,则说明数据正在被其余线程修改,则等待或者抛出异常。若是加锁成功,则获取记录,对其修改,而后事务提交后释放排它锁。
一个例子:select * from 表 where .. for update;

悲观锁是先加锁再访问策略,处理加锁会让数据库产生额外的开销,还有增长产生死锁的机会,另外在多个线程只读状况下不会产生数据不一致行问题,不必使用锁,只会增长系统负载,下降并发性,由于当一个事务锁定了该条记录,其余读该记录的事务只能等待。

10.2 乐观锁

乐观锁是相对悲观锁来讲的,它认为数据通常状况下不会形成冲突,因此在访问记录前不会加排他锁,而是在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,具体说根据update返回的行数让用户决定如何去作。乐观锁并不会使用数据库提供的锁机制,通常在表添加version字段或者使用业务状态来作。
具体能够参考:https://www.atatech.org/articles/79240

乐观锁直到提交的时候才去锁定,因此不会产生任何锁和死锁。

11、独占锁与共享锁

根据锁可以被单个线程仍是多个线程共同持有,锁又分为独占锁和共享锁。独占锁保证任什么时候候都只有一个线程能读写权限,ReentrantLock就是以独占方式实现的互斥锁。共享锁则能够同时有多个读线程,但最多只能有一个写线程,读和写是互斥的,例如ReadWriteLock读写锁,它容许一个资源能够被多线程同时进行读操做,或者被一个线程 写操做,但二者不能同时进行。

独占锁是一种悲观锁,每次访问资源都先加上互斥锁,这限制了并发性,由于读操做并不会影响数据一致性,而独占锁只容许同时一个线程读取数据,其余线程必须等待当前线程释放锁才能进行读取。

共享锁则是一种乐观锁,它放宽了加锁的条件,容许多个线程同时进行读操做。

12、公平锁与非公平锁

根据线程获取锁的抢占机制锁能够分为公平锁和非公平锁,公平锁表示线程获取锁的顺序是按照线程加锁的时间多少来决定的,也就是最先加锁的线程将最先获取锁,也就是先来先得的FIFO顺序。而非公平锁则运行闯入,也就是先来不必定先得。

ReentrantLock提供了公平和非公平锁的实现:
公平锁ReentrantLock pairLock = new ReentrantLock(true);
非公平锁 ReentrantLock pairLock = new ReentrantLock(false);
若是构造函数不传递参数,则默认是非公平锁。

在没有公平性需求的前提下尽可能使用非公平锁,由于公平锁会带来性能开销。
假设线程A已经持有了锁,这时候线程B请求该锁将会被挂起,当线程A释放锁后,假如当前有线程C也须要获取该锁,若是采用非公平锁方式,则根据线程调度策略线程B和C二者之一可能获取锁,这时候不须要任何其余干涉,若是使用公平锁则须要把C挂起,让B获取当前锁。

十3、 AbstractQueuedSynchronizer介绍

AbstractQueuedSynchronizer提供了一个队列,大多数开发者可能历来不会直接用到AQS,AQS有个变量用来存放状态信息 state,能够经过protected的getState,setState,compareAndSetState函数进行调用。对于ReentrantLock来讲,state能够用来表示该线程获可重入锁的次数,semaphore来讲state用来表示当前可用信号的个数,FutuerTask用来表示任务状态(例如还没开始,运行,完成,取消)。

十4、CountDownLatch原理

14.1 一个例子

public class Test { private static final int ThreadNum = 10; public static void main(String[] args) { //建立一个CountDownLatch实例,管理计数为ThreadNum CountDownLatch countDownLatch = new CountDownLatch(ThreadNum); //建立一个固定大小的线程池 ExecutorService executor = Executors.newFixedThreadPool(ThreadNum); //添加线程到线程池 for(int i =0;i<ThreadNum;++i){ executor.execute(new Person(countDownLatch, i+1)); } System.out.println("开始等待全员签到..."); try { //等待全部线程执行完毕 countDownLatch.await(); System.out.println("签到完毕,开始吃饭"); } catch (InterruptedException e) { e.printStackTrace(); }finally { executor.shutdown(); } } static class Person implements Runnable{ private CountDownLatch countDownLatch; private int index; public Person(CountDownLatch cdl,int index){ this.countDownLatch = cdl; this.index = index; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("person " + index +"签到"); //线程执行完毕,计数器减一 countDownLatch.countDown(); } } } 

如上代码,建立一个线程池和CountDownLatch实例,每一个线程经过构造函数传入CountDownLatch的实例,主线程经过await等待线程池里面线程任务所有执行完毕,子线程则执行完毕后调用countDown计数器减一,等全部子线程执行完毕后,主线程的await才会返回。

14.2 原理

先看下类图:
image.png

可知CountDownLatch内部仍是使用AQS实现的。
首先经过构造函数初始化AQS的状态值

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } 

而后看下await方法:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //若是线程被中断则抛异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试看当前是否计数值为0,为0则直接返回,否者进入队列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } 

若是tryAcquireShared返回-1则 进入doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入队列状态为共享节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //若是多个线程调用了await被放入队列则一个个返回。 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //shouldParkAfterFailedAcquire会把当前节点状态变为SIGNAL类型,而后调用park方法把当先线程挂起, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 

调用await后,当前线程会被阻塞,直到全部子线程调用了countdown方法,并在计数为0时候调用该线程unpark方法激活线程,而后该线程从新tryAcquireShared会返回1。

而后看下 countDown方法:

委托给sync public void countDown() { sync.releaseShared(1); } 
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 

首先看下tryReleaseShared

protected boolean tryReleaseShared(int releases) { //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一更新到state for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } 

该函数一直返回false直到当前计数器为0时候才返回true。
返回true后会调用doReleaseShared,该函数主要做用是调用uppark方法激活调用await的线程,代码以下:

private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //节点类型为SIGNAL,把类型在经过cas设置回去,而后调用unpark激活调用await的线程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 

激活主线程后,主线程会在调用tryAcquireShared获取锁。

十5、ReentrantLock独占锁原理

15.1 ReentrantLock结构

先上类图:

image.png

可知ReentrantLock最终仍是使用AQS来实现,而且根据参数决定内部是公平仍是非公平锁,默认是非公平锁

public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 

加锁代码:

public void lock() { sync.lock(); } 

15.2 公平锁原理

先看Lock方法:
lock方法最终调用FairSync重写的tryAcquire方法

protected final boolean tryAcquire(int acquires) { //获取当前线程和状态值 final Thread current = Thread.currentThread(); int c = getState(); //状态为0说明该锁未被任何线程持有 if (c == 0) { //为了实现公平,首先看队列里面是否有节点,有的话再看节点所属线程是否是当前线程,是的话hasQueuedPredecessors返回false,而后使用原子操做compareAndSetState保证一个线程更新状态为1,设置排他锁归属为当前线程。其余线程经过cass则返回false. if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //状态不为0说明该锁已经被线程持有,则看是不是当前线程持有,是则重入锁次数+1. else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 

公平性保证代码:

public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 

再看看unLock方法,最终调用了Sync的tryRelease方法:

protected final boolean tryRelease(int releases) { //若是不是锁持有者调用UNlock则抛出异常。 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是当前可重入次数为0,则清空锁持有线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置可重入次数为原始值-1 setState(c); return free; } 

15.3 非公平锁原理

final void lock() { //若是当前锁空闲0,则设置状态为1,而且设置当前线程为锁持有者 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//调用重写的tryAcquire方法->nonfairTryAcquire方法 } 
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//状态为0说明没有线程持有该锁 if (compareAndSetState(0, acquires)) {//cas原子性操做,保证只有一个线程能够设置状态 setExclusiveOwnerThread(current);//设置锁全部者 return true; } }//若是当前线程是锁持有者则可重入锁计数+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 

15.3 总结

可知公平与非公平都是先执行tryAcquire尝试获取锁,若是成功则直接获取锁,若是不成功则把当前线程放入队列。对于放入队列里面的第一个线程A在unpark后会进行自旋调用tryAcquire尝试获取锁,假如这时候有一个线程B执行了lock操做,那么也会调用tryAcquire方法尝试获取锁,可是线程B并不在队列里面,可是线程B有可能比线程A优先获取到锁,也就是说虽然线程A先请求的锁,可是却有可能没有B先获取锁,这是非公平锁实现。而公平锁要保证线程A要比线程B先获取锁。因此公平锁相比非公平锁在tryAcquire里面添加了hasQueuedPredecessors方法用来保证公平性。

十6、ReentrantReadWriteLock原理

image.png

如图读写锁内部维护了一个ReadLock和WriteLock,而且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁实现。咱们知道AQS里面只维护了一个state状态,而ReentrantReadWriteLock则须要维护读状态和写状态,一个state是没法表示写和读状态的。因此ReentrantReadWriteLock使用state的高16位表示读状态也就是读线程的个数,低16位表示写锁可重入量。

static final int SHARED_SHIFT   = 16;

共享锁(读锁)状态单位值65536 
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回读锁线程数  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

16.1 WriteLock

  • lock 获取锁
    对应写锁只须要分析下Sync的tryAcquire和tryRelease
    “`java

protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        //c!=0说明读锁或者写锁已经被某线程获取
        if (c != 0) {
            //w=0说明已经有线程获取了读锁或者w!=0而且当前线程不是写锁拥有者,则返回false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
           //说明某线程获取了写锁,判断可重入个数
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");

           // 设置可重入数量(1)
            setState(c + acquires);
            return true;
        }

       //写线程获取写锁
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
- unlock 释放锁

```Java
        protected final boolean tryRelease(int releases) {
     // 看是不是写锁拥有者调用的unlock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
//获取可重入值,这里没有考虑高16位,由于写锁时候读锁状态值确定为0
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
       //若是写锁可重入值为0则释放锁,否者只是简单更新状态值。
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

16.2 ReadLock

对应读锁只须要分析下Sync的tryAcquireShared和tryReleaseShared

  • lock 获取锁

    protected final int tryAcquireShared(int unused) { //获取当前状态值 Thread current = Thread.currentThread(); int c = getState(); //若是写锁计数不为0说明已经有线程获取了写锁,而后看是否是当前线程获取的写锁。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读锁计数 int r = sharedCount(c); //尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } 
  • unlock 释放锁

    protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //循环直到本身的读计数-1 cas更新成功 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 

十7、 什么是重排序问题

Java内存模型中,容许编译器和处理器对指令进行重排序,可是重排序能够保证最终执行的结果是与程序顺序执行的结果一致,而且只会对不存在数据依赖性的指令进行重排序,这个重排序在单线程下对最终执行结果是没有影响的,可是在多线程下就会存在问题。

  • 一个例子

    int a = 1;(1) int b = 2;(2) int c= a + b;(3) 

    如上c的值依赖a和b的值,因此重排序后可以保证(3)的操做在(2)(1)以后,可是(1)(2)谁先执行就不必定了,这在单线程下不会存在问题,由于并不影响最终结果。

  • 一个多线程例子

public static class ReadThread extends Thread { public void run() { while(!Thread.currentThread().isInterrupted()){ if(ready){(1) System.out.println(num+num);(2) } System.out.println("read thread...."); } } } public static class Writethread extends Thread { public void run() { num = 2;(3) ready = true;(4) System.out.println("writeThread set over..."); } } private static int num =0; private static boolean ready = false; public static void main(String[] args) throws InterruptedException { ReadThread rt = new ReadThread(); rt.start(); Writethread wt = new Writethread(); wt.start(); Thread.sleep(10); rt.interrupt(); System.out.println("main exit"); } 

如代码因为(1)(2)(3)(4) 之间不存在依赖,因此写线程(3)(4)可能被重排序为先执行(4)在执行(3),那么执行(4)后,读线程可能已经执行了(1)操做,而且在(3)执行前开始执行(2)操做,这时候打印结果为0而不是4.

解决:使用volatile 修饰ready能够避免重排序。

十8、 什么是中断

Java中断机制是一种线程间协做模式,经过中断并不能直接终止另外一个线程,而是须要被中断的线程根据中断状态自行处理。

例如当线程A运行时,线程B能够调用A的 interrupt()方法来设置中断标志为true,并当即返回。设置标志仅仅是设置标志,线程A并无实际被中断,会继续往下执行的,而后线程A能够调用isInterrupted方法来看本身是否是被中断了,返回true说明本身被别的线程中断了,而后根据状态来决定是否终止本身活或者干些其余事情。

Interrupted经典使用代码

public void run(){ try{ .... //线程退出条件 while(!Thread.currentThread().isInterrupted()&& more work to do){ // do more work; } }catch(InterruptedException e){ // thread was interrupted during sleep or wait } finally{ // cleanup, if required } } 

使用场景:

  • 故意调用interrupt()设置中断标志,做为线程退出条件
public static class MyThread extends Thread { public void run() { while (!Thread.currentThread().isInterrupted()) { System.out.println("do Someing...."); } } } public static void main(String[] args) throws InterruptedException { MyThread t = new MyThread(); t.start(); Thread.sleep(1000); t.interrupt(); } 
  • 当线程中为了等待一些特定条件的到来时候,通常会调用Thread.sleep(),wait,join方法在阻塞当前线程,好比sleep(3000);那么到3s后才会从阻塞下变为激活状态,可是有可能在在3s内条件已经知足了,这时候能够调用该线程的interrupt方法,sleep方法会抛出InterruptedException异常,线程恢复激活状态。
public static class SleepInterrupt extends Object implements Runnable{ public void run(){ try{ System.out.println("thread-sleep for 2000 seconds"); Thread.sleep(2000000); System.out.println("thread -waked up"); }catch(InterruptedException e){ System.out.println("thread-interrupted while sleeping"); return; } System.out.println("thread-leaving normally"); } } public static void main(String[] args) throws InterruptedException { SleepInterrupt si = new SleepInterrupt(); Thread t = new Thread(si); t.start(); //主线程休眠2秒,从而确保刚才启动的线程有机会执行一段时间 try { Thread.sleep(2000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("main() - interrupting other thread"); //中断线程t t.interrupt(); System.out.println("main() - leaving"); } 

InterruptedException的处理
若是抛出 InterruptedException那么就意味着抛出异常的方法是阻塞方法,好比Thread.sleep,wait,join。
那么接受到异常后如何处理的,醉简单的是直接catch掉,不作任何处理,可是中断发生通常是为了取消任务或者退出线程来使用的,因此若是直接catch掉那么就会失去作这些处理的时机,出发你能肯定不须要根据中断条件作其余事情。

  • 第一种方式 catch后作一些清理工做,而后在throw出去
  • 第二种方式 catch后,从新设置中断标示

十9、FutureTask 原理

19.1 一个例子

static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(1000); int sum = 0; for (int i = 0; i < 100; i++) sum += i; return sum; } } public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); System.out.println("主线程在执行任务"); try { System.out.println("task运行结果" + futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("全部任务执行完毕"); executor.shutdown(); } 

如上代码主线程会在futureTask.get()出阻塞直到task任务执行完毕,而且会返回结果。

19.2原理

先看下类图结构

image.png

FutureTask 内部有一个state用来展现任务的状态,而且是volatile修饰的:

/** Possible state transitions: * NEW -> COMPLETING -> NORMAL 正常的状态转移 * NEW -> COMPLETING -> EXCEPTIONAL 异常 * NEW -> CANCELLED 取消 * NEW -> INTERRUPTING -> INTERRUPTED 中断 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 

其中构造FutureTask实例时候状态为new

public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } 

把FutureTask提交到线程池或者线程执行start时候会调用run方法:

public void run() { //若是当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程能够成功。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //当前状态为new 则调用任务的call方法执行任务 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移 } //执行任务成功则保存结果更新状态,unpark全部等待线程。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //状态从new->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //状态从COMPLETING-》NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //unpark全部等待线程。 finishCompletion(); } } 

任务提交后,会调用 get方法获取结果,这个get方法是阻塞的。

public V get() throws InterruptedException, ExecutionException { int s = state; //若是当前状态是new或者COMPLETING则等待,由于位normal或者exceptional时候才说明数据计算完成了。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } 
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若是被中断,则抛异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //组建单列表 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超时则返回 if (nanos <= 0L) { removeWaiter(q); return state; } //否者设置park超时时间 LockSupport.parkNanos(this, nanos); } else //直接挂起当前线程 LockSupport.park(this); } } 
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 

在submit任务后还能够调用futuretask的cancel来取消任务:

public boolean cancel(boolean mayInterruptIfRunning) { //只有任务是new的才能取消 if (state != NEW) return false; //运行时容许中断 if (mayInterruptIfRunning) { //完成new->INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //完成INTERRUPTING->INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //不容许中断则直接new->CANCELLED else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; } 

二10、ConcurrentHashMap原理简述

翻看ConcurrentHashMap的源码知道ConcurrentHashMap使用分离锁,整个map分段segment,每一个segments是继承了ReentrantLock,使用ReentrantLock的独占锁用来控制同一个段只能有一个线程进行写,可是不一样段能够多个线程同时写。另外不管是段内仍是段外多个线程均可以同时读取,由于他使用了volatile语义的读,并没加锁。而且当前段有写线程时候,该段也容许多个读线程存在。

put的大概逻辑,首先计算key的hash值,而后根据必定算法(位移和与操做)计算出该元素应该放到那个segment,而后调用segment.put方法,该方法里面使用ReentrantLock进行写控制,第一个线程tryLock获取锁进行写入,其余写线程则自旋调用tryLock 循环尝试。

get的大概逻辑,使用UNSAFE.getObjectVolatile 在不加锁状况下获取volatile语义的值。

相关文章
相关标签/搜索