Java深刻学习并发原理总结

Java 并发多线程基础总结java

线程池

线程池的简介

线程池就是首先建立一些线程,它们的集合称为线程池。使用线程池能够很好地提升性能,线程池在系统启动时即建立大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束之后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。node

为何要使用线程池面试

若是不使用线程池,每个任务都会新开一个线程处理。算法

为了减小建立和销毁线程的次数,让每一个线程能够屡次使用,可根据系统状况调整执行的线程数量,防止消耗过多内存,因此咱们能够使用线程池。数据库

线程池的好处编程

  • 加快响应速度
  • 合理利用 CPU 和内存
  • 统一管理

线程池的工做机制segmentfault

  1. 线程池刚建立的时候没有任何线程,当来了新的请求的时候才会建立核心线程去处理对应的请求。
  2. 当处理完成以后,核心线程并不会回收。
  3. 在核心线程达到指定的数量以前,每个请求都会在线程池中建立一个新的核心线程。
  4. 当核心线程全都被占用的时候,新来的请求会放入工做队列中。工做队列本质上是一个阻塞队列
  5. 当工做队列被占满,再来的新请求会交给临时线程来处理。
  6. 临时线程在使用完成以后会继续存活一段时间,直到没有请求处理才会被销毁。

线程池参数详解

线程池构造函数的参数数组

参数名 类型 含义
corePoolSize int 核心线程数
maxPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池须要新的线程时,使用 ThreadFactory 来建立新的线程
Handler RejectedExecutionHandler 因为线程池没法接受所提交的任务所给出的拒绝策略
  • corePoolSize:指的是核心线程数,线程池初始化完成后,默认状况下,线程池并无任何线程,线程池会等待任务到来时,再建立新的线程去执行任务。
  • maxPoolSize:线程池有可能会在核心线程数上,额外增长一些线程,可是这些新增长的线程有一个上限,最大不能超过 maxPoolSize。缓存

    • 若是线程数小于 corePoolSize,即便其余工做线程处于空闲状态,也会建立一个新的线程来运行任务。
    • 若是线程数大于等于 corePoolSize 但少于 maxPoolSize,则将任务放进工做队列中。
    • 若是队列已满,而且线程数小于 maxPoolSize,则建立一个新线程来运行任务。
    • 若是队列已满,而且线程数已经大于等于 maxPoolSize,则使用拒绝策略来拒绝该任务。
  • keepAliveTime:一个线程若是处于空闲状态,而且当前的线程数量大于 corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由 keepAliveTime 来设定。
  • workQueue:新任务被提交后,会先进入到此工做队列中,任务调度时再从队列中取出任务。jdk 中提供了四种工做队列:安全

    • ArrayBlockingQueue:基于数组的有界阻塞队列,按 FIFO 排序。新任务进来后,会放到该队列的队尾,有界的数组能够防止资源耗尽问题。当线程池中线程数量达到 corePoolSize 后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。若是队列已是满的,则建立一个新线程,若是线程数量已经达到 maxPoolSize,则会执行拒绝策略。
    • LinkedBlockingQueue:基于链表的无界阻塞队列(其实最大容量为 Interger.MAX),按照 FIFO 排序。因为该队列的近似无界性,当线程池中线程数量达到 corePoolSize 后,再有新任务进来,会一直存入该队列,而不会去建立新线程直到 maxPoolSize,所以使用该工做队列时,参数 maxPoolSize 实际上是不起做用的。
    • SynchronousQueue:一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,若是没有可用线程,则建立新线程,若是线程数量达到 maxPoolSize,则执行拒绝策略。
    • PriorityBlockingQueue:具备优先级的无界阻塞队列,优先级经过参数 Comparator 实现。
    • delayQueue:具备优先级的延时无界阻塞队列
    • LinkedTransferQueue:基于链表的无界阻塞队列
    • LinkedBlockingDeque:基于链表的双端阻塞队列
  • threadFactory:建立一个新线程时使用的工厂,能够用来设定线程名、是否为 daemon 线程等等
  • handler:当工做队列中的任务已到达最大限制,而且线程池中的线程数量也达到最大限制,这时若是有新任务提交进来,就会执行拒绝策略。

添加线程的流程

线程池用法演示

  • newFixedThreadPool:固定大小线程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);//核心线程数
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-5
...

咱们能够看到,打印出来的最多的线程也就是五个。

咱们看一下源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
  • 第一个参数:corePoolSize,核心线程数:5
  • 第二个参数:maxPoolSize,最大线程数:5
  • 第三个参数:keepAliveTime,最大存活时间:0
  • 第四个参数:存活时间单位,单位毫秒
  • 第五个参数:workQueue,阻塞队列使用的是 LinkedBlockingQueue,也就是无界队列

最后 new ThreadPoolExecutor(),咱们看下这个方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

将咱们的参数传递过去后,线程工厂使用的是默认的线程工厂,和默认的拒绝策略处理器。

因为咱们使用的是无界阻塞队列,因此至关于 maxPoolSize 没有用处。若是任务特别多,核心线程处理不过来的话,就会一直将任务放入到 LinkedBlockingQuene 中,可能会致使 OOM。

演示 OOM:

//-Xms5m -Xmx5m
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:13)
  • newFixedThreadPool:单个核心线程的线程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

咱们看下源码:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor 其实和 newFixedThreadPool 差距不大,只是将核心线程数和最大线程数都设置为了 1,一样也是使用的 LinkedBlockingQueue,也可能会致使 OOM。

  • newCachedThreadPool:可缓存的线程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5
pool-1-thread-3
pool-1-thread-9
pool-1-thread-6
pool-1-thread-10
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
pool-1-thread-12
pool-1-thread-12
pool-1-thread-10
pool-1-thread-15
pool-1-thread-13

咱们看下源码:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

能够看出来,它核心线程数为 0,最大线程数量为 int 的最大值,存活时间为 60 秒,使用的是 SynchronousQueue,也就是不存储任务的阻塞队列。

SynchronousQueue 的确不会致使 OOM,可是!咱们的线程池能够存放 2147483647 个线程。在内存不够的状况下依然会报出 OOM!

  • newFixedThreadPool:支持定时及周期性任务执行的线程池
public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        //executorService.schedule(new ThreadPoolTest(),5, TimeUnit.SECONDS); //延时运行
        executorService.scheduleAtFixedRate(new ThreadPoolTest(),1,3,TimeUnit.SECONDS);//重复运行
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

schedule()方法参数:任务,多久后运行、时间单位

scheduleAtFixedRate()方法参数:任务、第一次执行时间:一、每隔多久运行一次:三、时间单位

咱们看一下源码:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

咱们看到 DelayedWordkQueue 继承了 AbstractCollection 接口,实现了 BlockingQueue,因此和 ArrayBlockingQueue 以及 LinkedBlockingQueue 是兄弟关系。
DelayedWorkQueue 定义了一个 DelayQueue,因此 DelayedWorkQueue 的实现是依赖 DelayQueue 的。

DelayQueue:Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。若是延迟都尚未期满,则队列没有头部,而且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于 0 的值时,将发生到期。即便没法使用 take 或 poll 移除未到期的元素,也不会将这些元素做为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不容许使用 null 元素。

BlockingQueue 核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
  • 抛出异常

当阻塞队列满时,再往队列里 add 插入元素会抛出 IllegalStateException:Queue full

当阻塞队列空时,再往队列里 remove 移除元素会抛出 NoSuchElementException

  • 特殊值

插入方法,成功 true 失败 false。

移除方法,成功返回元素,没有元素就返回 null。

  • 阻塞

当阻塞队列满时,生产者线程继续往队列里 put 元素,队列就会一直阻塞生产线程直到 put 数据 or 响应退出。

当阻塞队列空时,消费者线程试图从队列里 take 元素,队列就会一直阻塞消费者线程直到队列可用。

  • 超时退出

当阻塞队列满时,队列会阻塞生产者线程必定时间,超出时间后生产者线程就会推出。

正确的建立线程池的方法

Executors 存在什么问题?

在阿里巴巴 Java 开发手册中提到,使用 Executors 建立线程池可能会致使 OOM(OutOfMemory ,内存溢出)。

咱们以前也已经演示了 OOM 的状况,咱们看下如何正确建立线程池。

避免使用 Executors 建立线程池,主要是避免使用其中的默认实现,那么咱们能够本身直接调用 ThreadPoolExecutor 的构造函数来本身建立线程池。在建立的同时,给 BlockQueue 指定容量就能够了。

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10))
具体咱们须要根据不一样的业务场景、本身设置线程池的参数、想使用某种队列、想使用本身的线程工厂、想指定某种拒绝策略等等,来实现更合适的线程池。

中止线程池的正确方法

第一种:shutdown

调用线程池的此方法后,再也不接受新的任务,若是有新的任务增长则会抛出异常,待全部任务都执行关闭后,进行关闭。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                executorService.shutdown();
            }
            executorService.execute(new ThreadPoolTest());
        }
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@3764951d rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:16)
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-1

第二种:isShutdown

当调用 shutdown 以后,此值为 true。并非全部任务都执行完毕才是 true。

第三种:isTerminated

线程池全部任务是否已经关闭,包括正在执行和队列中的任务都结束了则返回 true。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            if (i >= 3) {
                executorService.shutdown();
                System.out.println(executorService.isTerminated());
            }else{
                executorService.execute(new ThreadPoolTest());
            }
        }
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("最后线程池状态是否关闭:"+executorService.isTerminated());
    }

    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

false
false
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
最后线程池状态是否关闭:true

第四种:awaitTermination

检测阻塞等待一段时间后,若是线程池任务都执行完了,返回 true,不然 false。

第五种:shutdownNow

马上关闭全部线程。该方法会返回所未完成方法的集合。

public class ThreadPoolTest implements Runnable {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            if (i >= 3) {
                Collection<Runnable> runnables = executorService.shutdownNow();
                runnables.forEach(System.out::println);
            }else{
                executorService.execute(new ThreadPoolTest());
            }
        }
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            System.out.println("我被中断了!");
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行结果:

我被中断了!
pool-1-thread-1
我被中断了!
pool-1-thread-2
com.thread.ThreadPoolTest@4e50df2e

拒绝策略解析

拒接时机

  1. 当 executor 关闭时,提交新任务会被拒绝
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@2b193f2d rejected from java.util.concurrent.ThreadPoolExecutor@355da254[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:15)
  1. 当 executor 对最大线程和工做队列容量使用有限边界而且已经饱和时。

四种拒绝策略

  • CallerRunsPolicy:在调用者线程中直接执行被拒绝任务的 run 方法,除非线程池已经 shutdown,则直接抛弃任务。
  • AbortPolicy:直接丢弃任务,并抛出 RejectedExecutionException 异常。(默认拒绝策略)
  • DiscardPolicy:直接丢弃任务,什么都不作。
  • DiscardOldestPolicy:该策略下,抛弃进入队列最先的那个任务,而后尝试把此次拒绝的任务放入队列。

Executor 家族解析

Executor、ExecutorService、ThreadPoolExecutor、Executors 之间的关系

  1. Executor
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor 里面只有一个 execute(Runnable command)回调接口。用于执行已提交的 Runnable 任务对象。

  1. ExecutorService
public interface ExecutorService extends Executor {

ExecutorService 接口是继承 Executor 接口,增长了一些关于中断的方法。

方法 invokeAny 和 invokeAll 是批量执行的最经常使用形式,它们执行任务 collection,而后等待至少一个,
或所有任务完成(可以使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。

submit 方法是提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。

  1. ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {

ThreadPoolExecutor 是 ExecutorService 的一个实现类,它使用可能的几个池线程之一执行每一个提交的任务,一般使用 Executors 工厂方法配置。
线程池能够解决两个不一样问题:因为减小了每一个任务调用的开销,它们一般能够在执行大量异步任务时提供加强的性能,而且还能够提供绑定和管理资源(包括执行任务集时使用的线程)的方法。

  1. Executors
public class Executors {

Executors 是一个工具类,能够用于方便的建立线程池。

线程池实现线程复用的原理

咱们直接看 execute 方法源码:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //若是正在运行的核心线程数小于核心线程总数
        if (workerCountOf(c) < corePoolSize) {
            //增长一个核心线程来执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

咱们看一下 addWorker 方法:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

首先判断当前线程池的状态,若是已经状态不是 shutdown 或者 running,或者已经为 shutdown 可是工做队列已经为空,那么这个时候直接返回添加工做失败。接下来是对线程池线程数量的判断,根据调用时的 core 的值来判断是跟 corePoolSize 仍是 maximumPoolSize 判断。

在确认了线程池状态以及线程池中工做线程数量以后,才真正开始添加工做线程。

新创建一个 worker 类(线程池的内部类,具体的工做线程),将要执行的具体线程作为构造方法中的参数传递进去,接下来将其加入线程池的工做线程容器 workers,而且更新工做线程最大量,最后调用 worker 工做线程的 start()方法,就完成了工做线程的创建与启动。

接下来咱们能够看最重要的,也就是咱们以前创建完 Worker 类以后立马调用的 run()方法了

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

接下来可见,咱们所须要的任务,直接在工做线程中直接以 run()方式以非线程的方式所调用,这里也就是咱们所须要的任务真正执行的地方。

在执行完毕后,工做线程的使命并无真正宣告段落。在 while 部分 worker 仍旧会经过 getTask()方法试图取得新的任务。

下面是 getTask()的实现:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

首先仍旧会判断线程池的状态是不是 running 仍是 shutdown 以及 stop 状态下队列是否仍旧有须要等待执行的任务。

若是状态没有问题,则会跟据 allowCoreThreadTimeOut 和 corePoolSize 的值经过对前面这两个属性解释的方式来选择从任务队列中得到任务的方式(是否设置 timeout)。

其中的 timedOut 保证了确认前一次试图取任务时超时发生的记录,以确保工做线程的回收。

在 runWorker()方法的最后调用了 processWorkerExit 来执行工做线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
}

先确保已经从新更新了线程池中工做线程的数量,以后从线程池中的工做线程容器移去当前工做线程,而且将完成的任务总数加到线程池的任务总数当中。

以后尝试设置线程池状态为 TERMINATED。

若是线程池的线程数量小于核心线程时, 则增长一个线程来继续处理任务队列中任务。

execute 执行流程图

线程池状态

  • RUNNING :接受新的任务并处理排队任务
  • SHUTDOWN:不接受新的任务,但处理排队任务
  • STOP:不接受新任务,也不处理排队任务,并中断正在执行的任务
  • TIDYING:全部任务都已终止并 workerCount 为 0 时,并执行 terminate()方法
  • TERMINATED:terminate()运行完成

源码:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

使用线程池的注意点

  • 避免任务堆积
  • 避免线程数过分增长
  • 排查线程泄露

ThreadLocal 详解

什么是 ThreadLocal

ThreadLocal 提供一个线程(Thread)局部变量,访问到某个变量的每个线程都拥有本身的局部变量。说白了,ThreadLocal 就是想在多线程环境下去保证成员变量的安全。

ThreadLocal 的用途

  • 用途一:每一个线程须要独享的对象
  • 用途二:每一个线程内须要保存全局变量(例如在拦截器中获取的用户信息),可让不一样方法直接使用,避免参数传递的麻烦

用途一:每一个线程须要一个独享的对象

每一个 Thread 内有本身的实例副本,不共享

好比:教材只有一本,一块儿作笔记有线程安全的问题,复印后就能够解决这个问题。

需求:咱们想打印出两个线程不一样的时间

public class ThreadLocalTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return simpleDateFormat.format(date);
    }
}

运行结果:

1970-01-01 08:01:41
1970-01-01 08:01:40

看起来是咱们想要的结果。

可是若是咱们想打印 1000 条不一样的时间,须要用到不少线程,咱们就会建立销毁 1000 个 SimpleDateFormat 对象,无疑是浪费内存的写法。

既然这样,那咱们就把 SimpleDateFormat 建立为类变量试试看。

public class ThreadLocalTest {

    static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        return simpleDateFormat.format(date);
    }
}

运行结果:

能够看到这样会引起线程安全的问题。

固然,咱们也能够进行加锁来解决这个问题,可是会引起效率问题。

正确方案使用 ThreadLocal 来解决这个问题

public class ThreadLocalTest {

    static ThreadLocal<SimpleDateFormat> threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
        threadLocal.remove();
    }

    public static String getDate(int seconds) {
        Date date = new Date(1000 * seconds);
        return threadLocal.get().format(date);
    }
}

用途二:当前用户信息须要被线程内全部方法共享

比较繁琐的方案就是做为参数层层专递。

若是使用 map 也须要保证线程安全问题,因此须要加锁或者使用 ConcurrentHashMap,但都对性能有影响

因此咱们使用 ThreadLocal 来实现。

public class ThreadLocalTest {

    public static void main(String[] args) {
        new Service1().precess();
    }
}

class Service1 {
    static ThreadLocal<User> threadLocal = new ThreadLocal<>();

    public void precess() {
        User user = new User("Jack");
        threadLocal.set(user);
        new Service2().precess();
        threadLocal.remove();
    }
}

class Service2 {
    public void precess() {
        System.out.println("Service2拿到" + Service1.threadLocal.get().name);
        new Service3().precess();
    }
}

class Service3 {
    public void precess() {
        System.out.println("Service3拿到" + Service1.threadLocal.get().name);
    }
}

class User {
    public String name;

    public User(String name) {
        this.name = name;
    }
}

运行结果:

Service2拿到Jack
Service3拿到Jack

ThreadLocal 的两个做用

  1. 让某个须要用到的对象在线程间隔离
  2. 在任何方法中均可以轻松获取到该对象。

场景一:initialValue:在 ThreadLocal第一次get的时候把对象给初始化出来,对象的初始化时机能够由咱们控制

场景二:set:若是须要保存到 ThreadLocal 中的对象生成时机不禁咱们控制,咱们能够使用 ThreadLocal.set()直接放到 ThreadLocal 中去,以便后续使用。

使用 ThreadLocal 带来的好处

  • 达到线程安全
  • 不须要加锁,提升执行效率
  • 更高效的利用内存节省开销
  • 免去传参的繁琐

ThreadLocal 原理

Thread、ThreadLocal、ThreadLocalMap 之间的关系

在 Thread 类中包含一个成员变量

/* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

一个 ThreadLocalMap 又能够包含无数个 ThreadLocal。

图解以下:

ThreadLocal 重要方法介绍

  • T initialValue() :初始化,咱们看一下方法原理

咱们先看下 get 方法:

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
}

能够看到在 ThreadLocalMap 为 null 的时候咱们调用了 setInitialValue()方法

private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在 initialValue 方法没有被重写的时候返回的是 null,由于咱们已经重写了,因此它会将咱们的 value 放到 ThreadLocalMap 中的 ThreadLocal 对象中。

一般,每一个线程最多调用一次此方法,若是已经调用了 remove()方法后,再调用 get(),就会再次触发 initialValue()方法。

  • set(T t):为这个线程设置一个新值
  • T get():获得这个线程对应的 value,若是是第一次调用 get,则会调用 InitialValue()来获取值。
  • void remove():删除对应这个线程的值

ThreadLocal 重要方法解析

get 方法解析:

咱们先看源码:

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
}

刚才已经讲过 map 为 null 的状况,咱们看下若是不为 null 是如何获取到值的。

首先在 map.getEntry(this)中咱们从不为 null 的 ThreadLocalMap 中 getEntry 也就是咱们的 key,this 就是咱们当前的 ThreadLocal 对象,获得的 e 也就是咱们的键值对,而后.value 来返回咱们的结果。

set 方法解析:

public void set(T value) {
        //获取当前线程对象
        Thread t = Thread.currentThread();
        //获取当前线程对象的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        //若是不为null就set进去,k为当前ThreadLocal,v就是咱们传入的对象
        if (map != null)
            map.set(this, value);
        else
        //为null就去建立ThreadLocalMap并set当前k、v
            createMap(t, value);
}

remove 方法解析:

public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
}

remove 方法比较简单,就是拿到 ThreadLocalMap 而后删除掉 k 等于当前对象的 ThreadLocal。

ThreadLocalMap 类

ThreadLocalMap,也就是 Thread.threadlocals

ThreadLocalMap 类是每一个线程 Thread 类里面的变量,里面最重要的一个键值对数组Entry[] table,能够认为是一个 map。

  • k:当前 ThreadLocal
  • v:实际存储的成员变量
/**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
    private Entry[] table;

若是发生哈希冲突

ThreadLocalMap 和 HashMap 有所不一样,HashMap(jdk8)采用的是链表+红黑树

而 ThreadLocalMap 采用的是线性探测法,若是发生冲突,就继续寻找下一个空位置,而不是使用链表。

ThreadLocal 注意点

  1. 内存泄漏

在 ThreadLocal 中有一个静态内部类也就是 ThreadLocalMap。

ThreadLocalMap 中的 Entry 是继承了 WeakReference 也就是弱引用

弱引用的特色就是在垃圾回收器线程扫描它所管辖的内存区域的过程当中,一旦发现了只具备弱引用的对象,无论当前内存空间足够与否,都会回收它的内存。

可是咱们发现下面一句 value = v;又包含了强引用。

正常状况下,当线程终止,保存在 ThreadLocal 中的 value 就会被垃圾回收,由于没有任何强引用了。

可是若是线程不终止(好比线程须要保持好久),那么 key 对应的 value 就不能被回收,由于有如下调用链:

Thread --> ThreadLocalMap --> Entry(key 为 null) --> Value

由于 value 和 Thread 之间还保存这个强引用链路,因此致使value没法被回收,就可能回出现 OOM。

JDK 已经考虑到这个问题,因此在 set、remove、rehash 方法中会扫描 key 为 null,若是 key 为 null 则会把 value 也设置为 null。

可是若是 ThreadLocal 不被使用,那么 set、remove、rehash 方法也不会被调用,若是同时线程并无中止,则调用链会一直存在,就会致使 value 的内存泄漏。

因此咱们须要在使用完 ThreadLocal 后主动使用 remove()方法来避免内存泄漏。

  1. 若是 set 进去的是一个 static 对象,则仍是会有并发访问的问题
  2. 子线程访问问题

咱们来看一下什么是子线程访问问题。

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("Hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
        }, "Thread01").start();
    }
}

运行结果:

null

咱们看一下为何是 null,咱们直接跟进到 get 方法中:

能够很清楚的看到,咱们在 get 的时候拿到当前线程是 Thead01,而咱们 set 进去的是 main 线程,因此咱们拿到的 ThreadLocalMap 是 null。

而后咱们调用 setInitialValue()方法

private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在第一句调用了 initialValue()方法:

protected T initialValue() {
        return null;
}

这下咱们就明白了,咱们返回了个 null,而且在 Thead01 子线程中建立了一个 ThreadLocalMap,value 为 null 。

咱们看另外一个例子:

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "Hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
            // System.out.println(threadLocal1.get());
        }, "Thread01").start();
    }
}

运行结果:

Hello

我相信你们已经明白为何能获取到 Hello 了。

咱们看源码:

static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
}

由于在 withInitial 里面咱们继承了 ThreadLocal 而且重写了 initialValue 方法,因此咱们得到到了 Hello。

可是,这样作咱们在子线程中,至关因而又建立了一个 ThreadLocalMap 将 value 存了进去。

InheritableThreadLocal 解析

咱们刚才已经看到了在子线程中没法访问到父线程 ThreadLocal 类型变量的值。

咱们试试 InheritableThreadLocal 类

public class ThreadLocalTest {

    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("hello");
        new Thread(() -> {
            System.out.println(threadLocal.get());
        }, "Thread01").start();
    }
}

运行结果:

hello

可是,InheritableThreadLocal 为何可以读取出来?

在 Thread 类中,inheritableThreadLocals,他的类型同 Thread 内部的 threadLocals 变量。

咱们看一下这个类源码:

public class InheritableThreadLocal<T> extends ThreadLocal<T> {

    //该函数在父线程建立子线程,向子线程复制InheritableThreadLocal变量时使用
    protected T childValue(T parentValue) {
        return parentValue;
    }

    /**
     * 因为重写了getMap,操做InheritableThreadLocal时,
     * 将只影响Thread类中的inheritableThreadLocals变量,
     * 与threadLocals变量再也不有关系
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * 相似于getMap,操做InheritableThreadLocal时,
     * 将只影响Thread类中的inheritableThreadLocals变量,
     * 与threadLocals变量再也不有关系
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

InheritableThreadLocal 继承了 ThreadLocal 而且重写了三个方法。

咱们这个时候回过头看 Thread 类的初始化 init 方法

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {

若是 parent 的 inheritableThreadLocals 不是 null,那么就会将当前线程的 inheritableThreadLocals 设置为 parent 的 inheritableThreadLocals

parent 是什么?以前也说过了,就是建立这个线程的线程,也就是平时说的父线程。

因此说借助于 inheritableThreadLocals,能够实现,建立线程向被建立线程之间数据传递

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
}

逻辑很清晰,建立了一个 ThreadLocalMap

简单理解:这个建立的 ThreadLocalMap 就是根据入参的 ThreadLocalMap,拷贝建立一份。

总结

其实就是从父线程(当前建立线程)中复制的一份,而后后续的数据读取解析,则是经过 inheritableThreadLocals 变量,和内部的那个 threadLocals 没有什么关系。

Lock 接口

什么是 Lock

锁是一种工具,用于控制对共享资源的访问。

Lock 和 synchronized,这两个都是最多见的锁,它们均可以达到线程安全的目的,可是在使用上和功能上有较大不一样。

Lock 并非用来代替synchronized 的,而是在使用 synchronized 不适合或者不足以知足要求的时候,来提供更高级更灵活的功能。

Lock 接口最多见的实现类是ReentrantLock

一般状况下,Lock 只容许一个线程来访问这个共享资源。不过有的时候,一些特殊的实现也能够容许并发访问,好比 ReadWriteLock 里面的ReadLock

为何须要 Lock

首先咱们先看一下为何 synchronized 不够用?

  1. 效率低:锁的释放状况少,视图获取锁时不能设定超时、不能中断一个正在试图获取锁的线程。
  2. 不够灵活:加锁和释放的时机单一,每一个锁仅有单一的条件(某个对象)。
  3. 没法知道是否成功得到到锁

Lock 主要方法介绍

在 Lock 中声明了四个方法来获取锁

  • lock()
  • tryLock()
  • tryLock(long time,TImeUnit unit)
  • lockInterruptibly()

lock()

lock()就是最普通的获取锁,若是锁已经被其它线程获取,则进行等待。

lock 不会像 synchronized 同样在异常时自动释放锁

所以最佳实践是在finally中释放锁,以保证发生异常时锁必定会被释放。

public class LockTest {

    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        lock.lock();
        try {
            //业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

}

顺便说一下为何不能在 try 内写上 lock.lock();

阿里巴巴规范手册:在使用阻塞等待获取锁的方式中,必须在 try 代码块以外,而且在加锁方法与 try 代码块之间没有任何可能抛出异常的方法调用,避免加锁成功后,在 finally 中没法解锁。

  • 说明一:若是在 lock 方法与 try 代码块之间的方法调用抛出异常,那么没法解锁,形成其它线程没法成功获取锁。
  • 说明二:若是 lock 方法在 try 代码块以内,可能因为其它方法抛出异常,致使在 finally 代码块中,unlock 对未加锁的对象解锁,它会调用 AQS 的 tryRelease 方法(取决于具体实现类),抛出 IllegalMonitorStateException 异常。
  • 说明三:在 Lock 对象的 lock 方法实现中可能抛出 unchecked 异常,产生的后果与说明二相同。

lock 方法不能被中断,这会带来很大隐患:一旦陷入死锁,lock()就会陷入永久等待。

tryLock()

tryLock 用来尝试获取锁,若是当前锁没有被其余线程占用,则获取成功,返回 true,不然返回 false,表明获取锁失败。

相比于 lock,这样的方法显然功能更强大了,咱们能够根据是否能获取到锁来决定后续程序的行为

该方法会当即返回,并不会在拿不到锁时阻塞。

tryLock(long time,TimeUnit unit)

该方法就是在该时间段内尝试获取锁,若是超过期间就放弃。

lockInterruptibly()

至关因而把 tryLock 的时间设为无限,在等待锁的过程当中,线程能够被中断

可见性保证

Lock 一样也是遵循Happens-before原则。

Lock 的加锁解锁和 synchronized 有一样的内存语义,也就是说,下一个线程加锁后能够看到全部前一个线程解锁前发生的全部操做

锁的分类图

这些分类并不是互斥的,也就是多个类型能够并存:有可能一个锁,同时属于两种类型。

乐观锁和悲观锁

为何会诞生非互斥同步锁————互斥同步锁的劣势

  • 阻塞和唤醒带来的性能劣势
  • 永久阻塞:若是持有锁的线程被永久阻塞,好比遇到了无限循环、死锁等活跃性问题,那么等待线该程释放锁的那几个悲催线程,将永远得不到执行。

什么是乐观锁和悲观锁

悲观锁:顾名思义,悲观锁是基于一种悲观的态度类来防止一切数据冲突,它是以一种预防的姿态在修改数据以前把数据锁住,而后再对数据进行读写,在它释放锁以前任何人都不能对其数据进行操做,直到前面一我的把锁释放后下一我的数据加锁才可对数据进行加锁,而后才能够对数据进行操做,通常数据库自己锁的机制都是基于悲观锁的机制实现的。

典型例子:synchronized、Lock 接口

乐观锁:乐观锁是对于数据冲突保持一种乐观态度,操做数据时不会对操做的数据进行加锁(这使得多个任务能够并行的对数据进行操做),只有到数据提交的时候才经过一种机制来验证数据是否存在冲突,通常使用 CAS 算法来实现的。

典型例子:Atomic 原子类、并发容器等

开销对比

悲观锁的原始开销要高于乐观锁,可是特色是一劳永逸,临界区持锁时间就算愈来愈差,也不会对互斥锁的开销产生影响。

相反,虽然乐观锁一开始开销比较小,可是若是自旋时间很长,或者不停重试,那么消耗的资源也会愈来愈多

两种锁各自的使用场景:各有千秋

  • 悲观锁:适用于并发写入多的状况,适用于临界区持锁时间比较长的状况,悲观锁能够避免大量的无用自旋等消耗。
  • 乐观锁:适用于读多写少的场景,不加锁可让读取性能大幅提升。

可重入锁和不可重入锁

可重入锁就是一个类的 A、B 两个方法,A、B 都有拥有同一把锁,当 A 方法调用时,得到锁,在 A 方法的锁尚未被释放时,调用 B 方法时,B 方法也得到该锁。

不可重入锁就是一个类的 A、B 两个方法,A、B 都有拥有同一把锁,当 A 方法调用时,得到锁,在 A 方法的锁尚未被释放时,调用 B 方法时,B 方法也得到不了该锁,必须等 A 方法释放掉这个锁。

synchronized 和 ReentrantLock 都是可重入锁

下面使用 ReentrantLock 证实可重入锁的例子:

public class LockTest {

    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        methodA();
    }

    public static void methodA() {
        System.out.println("未得到锁以前,count为:" + lock.getHoldCount());
        lock.lock();
        try {
            System.out.println("得到A的锁,count为:" + lock.getHoldCount());
            methodB();
        } finally {
            lock.unlock();
            System.out.println("释放A的锁,count为:" + lock.getHoldCount());
        }
    }

    public static void methodB() {
        lock.lock();
        try {
            System.out.println("得到B的锁,count为:" + lock.getHoldCount());
        } finally {
            lock.unlock();
            System.out.println("释放B的锁,count为:" + lock.getHoldCount());
        }
    }

}

运行结果:

未得到锁以前,count为:0
得到A的锁,count为:1
得到B的锁,count为:2
释放B的锁,count为:1
释放A的锁,count为:0

证实了 ReentrantLock 是可重入锁,在 holdCount = 0 的时候就会释放该锁。

public void unlock() {
        sync.release(1);
}

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

在 unlock()方法中咱们看到当前状态 - 1,若是 c == 0 就说明释放该锁,否则就只修改锁的状态 state。

可重入锁

不可重入锁

公平锁和非公平锁

什么是公平和非公平?

公平指的是按照线程的请求顺序,来分配锁;非公平指的是,不彻底按照请求的顺序,在必定状况下,能够插队。

为何要有非公平锁

假设线程 A 持有一把锁,线程 B 请求这把锁,因为线程 A 已经持有这把锁了,因此线程 B 会陷入等待,在等待的时候线程 B 会被挂起,也就是进入阻塞状态,那么当线程 A 释放锁的时候,本该轮到线程 B 苏醒获取锁,但若是此时忽然有一个线程 C 插队请求这把锁,那么根据非公平的策略,会把这把锁给线程 C,这是由于唤醒线程 B 是须要很大开销的,颇有可能在唤醒以前,线程 C 已经拿到了这把锁而且执行完任务释放了这把锁。

相比于等待唤醒线程 B 的漫长过程,插队的行为会让线程 C 自己跳过陷入阻塞的过程,若是在锁代码中执行的内容很少的话,线程 C 就能够很快完成任务,而且在线程 B 被彻底唤醒以前,就把这个锁交出去,这样是一个共赢的局面,对于线程 C 而言,不须要等待提升了它的效率,而对于线程 B 而言,它得到锁的时间并无推迟,由于等它被唤醒的时候,线程 C 早就释放锁了,由于线程 C 的执行速度相比于线程 B 的唤醒速度,是很快的,因此 Java 设计非公平锁,是为了提升总体的运行效率避免唤醒带来的空档期

代码案例公平锁

public class LockTest {

    public static void main(String[] args) {
        PrintQueue printQueue = new PrintQueue();
        Thread thread[] = new Thread[10];
        for (int i = 0; i < 10; i++) {
            thread[i] = new Thread(new Job(printQueue));
        }
        for (int i = 0; i < 10; i++) {
            thread[i].start();
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Job implements Runnable {

    PrintQueue printQueue;

    public Job(PrintQueue printQueue) {
        this.printQueue = printQueue;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "开始打印");
        printQueue.printJob(new Object());
        System.out.println(Thread.currentThread().getName() + "打印完毕");
    }
}

class PrintQueue {

    private Lock queueLock = new ReentrantLock(true);

    public void printJob(Object document) {
        queueLock.lock();
        try {
            int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,须要" + duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            queueLock.unlock();
        }

        queueLock.lock();
        try {
            int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,须要" + duration + "秒");
            Thread.sleep(duration * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            queueLock.unlock();
        }
    }
}

运行结果:

Thread-0开始打印
Thread-0正在打印,须要3
Thread-1开始打印
Thread-2开始打印
Thread-3开始打印
Thread-4开始打印
Thread-5开始打印
Thread-6开始打印
Thread-1正在打印,须要4
Thread-2正在打印,须要7
Thread-3正在打印,须要7
Thread-4正在打印,须要6
Thread-5正在打印,须要5
Thread-6正在打印,须要5
Thread-0正在打印,须要8秒
Thread-0打印完毕
Thread-1正在打印,须要2秒
Thread-1打印完毕
Thread-2正在打印,须要3秒
Thread-2打印完毕
Thread-3正在打印,须要4秒
Thread-3打印完毕
Thread-4正在打印,须要2秒
Thread-4打印完毕
...

测试非公平锁只须要将参数改成 false 便可。true 表明公平锁

private Lock queueLock = new ReentrantLock(true);

源码分析

公平锁:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //  和非公平锁相比,这里多了一个判断:队列中是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
}

非公平锁:

static final class NonfairSync extends Sync {
    final void lock() {
        //  和公平锁相比,这里会直接先进行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

注意

若是使用 tryLock()方法,它是不遵照设定的公平原则,若是有线程执行 tryLock()的时候,一旦有线程释放了锁,那么这个正在执行 tryLock()的线程会当即得到锁,即便以前已经有人在排队了。

总结

优点 劣势
公平锁 在平等状况下,每一个线程在等待一段时间后都会得到执行的机会 更慢,吞吐量小
不公平锁 更快,吞吐量大 可能会致使在阻塞队列中的线程长期处于饥饿状态

非公平锁和公平锁的两处不一样:

非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,若是这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。

非公平锁在 CAS 失败后,和公平锁同样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,若是发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,可是公平锁会判断等待队列是否有线程处于等待状态,若是有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,若是这两次 CAS 都不成功,那么后面非公平锁和公平锁是同样的,都要进入到阻塞队列等待唤醒。

相对来讲,非公平锁会有更好的性能,由于它的吞吐量比较大。固然,非公平锁让获取锁的时间变得更加不肯定,可能会致使在阻塞队列中的线程长期处于饥饿状态。

共享锁和排它锁

什么是共享锁和排它锁

排它锁:又称为独占锁、共享锁

共享锁:又称为读锁,得到共享锁以后,能够查看但没法修改和删除数据,其余线程此时也能够得到到共享锁,也能够查看但没法修改和删除数据。

共享锁和排它锁的典型是读写锁ReentrantReadWriteLock,其中读锁是共享锁,写锁是排它锁

在没有读写锁以前,咱们假设使用 ReentrantLock,虽然保证了线程安全,可是也浪费了必定的资源多个读操做同时进行,并无线程安全问题

在读的地方使用读锁,写的地方使用写锁,灵活控制,若是没有写锁的状况下,读是无阻塞的,大大提升效率。

读写锁的规则

  • 多个线程只申请读锁,均可以申请到。
  • 若是有一个线程占用了读锁,则此时其余线程若是申请写锁,则申请写锁的线程会等待释放读锁。
  • 若是有一个线程占用了写锁,则此时其余线程若是申请读锁,则申请读锁的线程会等待释放写锁。

ReentrantReadWriteLock 具体用法

public class LockTest {

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
        new Thread(() -> read()).start();
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
    }

    private static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();
        }
    }

    private static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
        } finally {
            System.out.println(Thread.currentThread().getName() + "印刷完成");
            writeLock.unlock();
        }
    }
}

运行结果:

Thread-0开始印刷《Thinking in Java》
Thread-0印刷完成
Thread-1开始学习《Thinking in Java》
Thread-1太难了!我不学了!
Thread-2开始学习《Thinking in Java》
Thread-2太难了!我不学了!
Thread-3开始印刷《Thinking in Java》
Thread-3印刷完成
Thread-4开始学习《Thinking in Java》
Thread-4太难了!我不学了!

读锁插队策略

假设线程 1 和线程 2 在读取,线程 3 想要写入,可是拿不到锁,因而进入等待队列,线程 4 不在队列中,如今想要读取。

此时有两种策略

  1. 读能够插队,效率高

可是这样可能会致使后面一堆读线程过来,一直轮不到线程 3 来写。致使写入饥饿。

  1. 避免饥饿

一个个排队,这样就不会致使饥饿,ReentrantReadWriteLock 就是采用第二种策略。

更确切的说就是:在非公平锁状况下,容许写锁插队,也容许读锁插队,可是读锁插队的前提是队列中的头节点不能是想获取写锁的线程。

公平锁源码:

非公平锁源码:

锁的升降级

升降级是指读锁升级为写锁,写锁降级为度锁。在 ReentrantReadWriteLock 读写锁中,只支持写锁降级为读锁,而不支持读锁升级为写锁。

代码演示:

public class LockTest {

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
    }

    private static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "得到到了写锁");
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();
        }
    }

    private static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "在写锁中获取到了读锁");
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "印刷完成");
            writeLock.unlock();
        }
    }
}

运行结果:

Thread-0开始印刷《Thinking in Java》
Thread-0在写锁中获取到了读锁
Thread-0印刷完成
Thread-1开始学习《Thinking in Java》

咱们能够看到在写锁中成功得到到了读锁,而在读锁中被一直阻塞。说明不支持锁升级!

为何 ReentrantReadWriteLock 不支持锁升级

主要是避免死锁,例如两个线程 A 和 B 都在读, A 升级要求 B 释放读锁,B 升级要求 A 释放读锁,互相等待造成死循环。若是能严格保证每次都只有一个线程升级那也是能够的。

总结

  1. 读写锁特色特色:读锁是共享锁,写锁是排他锁,读锁和写锁不能同时存在
  2. 插队策略:为了防止线程饥饿,读锁不能插队
  3. 升级策略:只能降级,不能升级
  4. ReentrantReadWriteLock 适合于读多写少的场合,能够提升并发效率,而 ReentrantLock 适合普通场合

自旋锁和阻塞锁

阻塞或者唤醒一个 Java 线程须要操做系统切换 CPU 状态来完成,这种状态转换须要耗费处理器时间。

若是同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长

在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的话费可能会让系统得不偿失

若是物理机器有多个处理器,可以让两个或以上的线程同时并行,咱们就可让后面那个请求锁的线程不放弃 CPU 的执行时间,看看持有锁是否会在短期内释放锁。

而为了让当前线程"稍等一下",咱们须要让当前线程进行自旋,若是在自旋过程当中前面锁定的线程释放了锁,那么当前线程就能够直接获取同步资源,避免了资源消耗,这就是自旋锁

阻塞锁就是若是没拿到锁,会直接阻塞当前线程,直到被唤醒。

自旋锁的缺点

若是锁被占用时间很长,那么自旋的线程就会白白浪费处理器资源。

代码演示

public class LockTest {

    private AtomicReference<Thread> sign = new AtomicReference<>();

    public void lock() {
        Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {
            System.out.println("自旋获取失败,再次尝试");
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        sign.compareAndSet(current, null);
    }

    public static void main(String[] args) {
        LockTest spinLock = new LockTest();
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");
            spinLock.lock();
            System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinLock.unlock();
                System.out.println(Thread.currentThread().getName() + "释放了自旋锁");
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
    }

}

运行结果:

Thread-0开始尝试获取自旋锁
Thread-1开始尝试获取自旋锁
Thread-0获取到了自旋锁
Thread-0释放了自旋锁
Thread-1获取到了自旋锁
Thread-1释放了自旋锁

在 while 中会进行大量的循环判断,能够尝试打印语句看看。

后续会讲述 Atomic 原子类是如何使用 CAS 算法来实现自旋。

自旋锁的使用场景

  • 自选锁通常用于多核的服务器,在并发度不是特别高的状况下,比阻塞锁的效率高。
  • 另外,自旋锁适用于临界区比较短小的状况,不然若是临界区很大(一旦拿到锁,好久才释放)那也是不合适的。

可中断锁和不可中断锁

在 Java 中,synchronized 是不可中断锁,而 Lock 是可中断锁,由于 tryLock(time)和 lockinterruptibly 都能响应中断。

synchronized 原理以及锁优化

同步代码块:

monitorenter 指令插入到同步代码块的开始位置,monitorexit 指令插入到同步代码块的结束位置,JVM 须要保证每个 monitorenter 都有一个 monitorexit 与之相对应。任何对象都有一个 monitor 与之相关联,当且一个 monitor 被持有以后,他将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 全部权,即尝试获取对象的锁。

同步方法:

synchronized 方法则会被翻译成普通的方法调用和返回指令如:invokevirtual、areturn 指令,在 JVM 字节码层面并无任何特别的指令来实现被 synchronized 修饰的方法,而是在 Class 文件的方法表中将该方法的 accessflags 字段中的 synchronized 标志位置 1,表示该方法是同步方法并使用调用该方法的对象或该方法所属的 Class 在 JVM 的内部对象表示 Klass 作为锁对象。

synchronized 锁 和 对象头息息相关。因此咱们先了解一下对象在堆中的结构:

咱们须要先了解两个重要的概念:Java 对象头、Monitor。

Java 对象头

synchronized 用的锁是存在 Java 对象头里的,那么什么是 Java 对象头呢?

Hotspot 虚拟机的对象头主要包括两部分数据:Mark Word(标记字段)Klass Pointer(类型指针)。其中 Klass Point 是是对象指向它的类元数据的指针,虚拟机经过这个指针来肯定这个对象是哪一个类的实例,Mark Word 用于存储对象自身的运行时数据,它是实现轻量级锁和偏向锁的关键。可是若是对象是数组类型,则须要三个机器码,由于 JVM 虚拟机能够经过 Java 对象的元数据信息肯定 Java 对象的大小,可是没法从数组的元数据来确认数组的大小,因此用一块来记录数组长度。

Mark Word

Mark Word 用于存储对象自身的运行时数据,如哈希码(HashCode)、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等等,占用内存大小与虚拟机位长一致。

Klass Word

存储指向对象所属类(元数据)的指针,JVM 经过这个肯定这个对象属于哪一个类。

Monitor

什么是 Monitor?

咱们能够把它理解为一个同步工具,也能够描述为一种同步机制,它一般被描述为一个对象。

与一切皆对象同样,全部的 Java 对象是天生的 Monitor,每个 Java 对象都有成为 Monitor 的潜质,由于在 Java 的设计中 ,每个 Java 对象都带了一把看不见的锁,它叫作内部锁或者 Monitor 锁。

Monitor 是线程私有的数据结构,每个线程都有一个可用 Monitor Record 列表,同时还有一个全局的可用列表。每个被锁住的对象都会和一个 Monitor 关联(对象头的 Mark Word 中的 Lock Word 指向 Monitor 的起始地址),同时 Monitor 中有一个 Owner 字段存放拥有该锁的线程的惟一标识,表示该锁被这个线程占用。

Monitor 是由 ObjectMonitor 实现的,源码是 C++来实现的。主要结构以下:

ObjectMonitor() {
        _header       = NULL;
        _count        = 0;   // 记录个数
        _waiters      = 0,   // 等待线程数
        _recursions   = 0;  //  重入次数
        _object       = NULL;
        _owner        = NULL;  // 当前持有锁的线程
        _WaitSet      = NULL;  // 调用了 wait 方法的线程被阻塞 放在这里
        _WaitSetLock  = 0 ;    // 保护等待队列,简单的自旋
        _Responsible  = NULL ;
        _succ         = NULL ;
        _cxq          = NULL ;
        FreeNext      = NULL ;
        _EntryList    = NULL ; // 等待锁 处于block的线程 有资格成为候选资源的线程
        _SpinFreq     = 0 ;
        _SpinClock    = 0 ;
        OwnerIsThread = 0 ;
      }

咱们知道 synchronized 是重量级锁,效率很低。不过在 JDK 1.6 中对 synchronized 的实现进行了各类优化,使得它显得不是那么重了。

锁优化

JDK1.6 对锁的实现引入了大量的优化,如自旋锁适应性自旋锁锁消除锁粗化偏向锁轻量级锁等技术来减小锁操做的开销。

锁主要存在四中状态,依次是:无锁状态偏向锁状态轻量级锁状态重量级锁状态。他们会随着竞争的激烈而逐渐升级。注意锁能够升级不可降级,这种策略是为了提升得到锁和释放锁的效率。

适应自旋锁

所谓自适应就意味着自旋的次数再也不是固定的,它是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。

线程若是自旋成功了,那么下次自旋的次数会更加多,由于虚拟机认为既然上次成功了,那么这次自旋也颇有可能会再次成功,那么它就会容许自旋等待持续的次数更多。反之,若是对于某个锁,不多有自旋可以成功的,那么在之后要锁的时候自旋的次数会减小甚至省略掉自旋过程,以避免浪费处理器资源。

锁消除

锁消除是发生在编译器级别的一种锁优化方式。

有时候咱们写的代码彻底不须要加锁,却执行了加锁操做。

好比 StringBuffer 的 append()方法,Vector 的 add()方法。

若是 JVM 明显检测到没有发生方法逃逸,就会将内部的锁消除。

锁粗化

一般状况下,为了保证多线程间的有效并发,会要求每一个线程持有锁的时间尽量短,可是在某些状况下,一个程序对同一个锁不间断、高频地请求、同步与释放,会消耗掉必定的系统资源,由于锁的请求、同步与释放自己会带来性能损耗,这样高频的锁请求就反而不利于系统性能的优化了,虽然单次同步操做的时间可能很短。锁粗化就是告诉咱们任何事情都有个度,有些状况下咱们反而但愿把不少次锁的请求合并成一个请求,以下降短期内大量锁请求、同步、释放带来的性能损耗。

public void doSomethingMethod(){
    synchronized(lock){
        //do some thing
    }
    synchronized(lock){
        //do other thing
    }
}

偏向锁

若是使用锁的线程都只有一个,那么,维护轻量级锁都是浪费的。偏向锁的目标是,减小无竞争且只有一个线程使用锁的状况下,使用轻量级锁产生的性能消耗。轻量级锁每次申请、释放锁都至少须要一次 CAS,但偏向锁只有初始化时须要一次 CAS。

“偏向”的意思是,偏向锁假定未来只有第一个申请锁的线程会使用锁(不会有任何线程再来申请锁),所以,只须要在 Mark Word 中 CAS 记录 owner,若是记录成功,则偏向锁获取成功,记录锁状态为偏向锁,之后当前线程等于 owner 就能够零成本的直接得到锁;不然,说明有其余线程竞争,膨胀为轻量级锁

偏向锁没法使用自旋锁优化,由于一旦有其余线程申请锁,就破坏了偏向锁的假定。

轻量级锁

轻量级锁的目标是,减小无实际竞争状况下,使用重量级锁产生的性能消耗,包括系统调用引发的内核态与用户态切换、线程阻塞形成的线程切换等。

顾名思义,轻量级锁是相对于重量级锁而言的。使用轻量级锁时,不须要申请互斥量,仅仅将 Mark Word 中的部分字节 CAS 更新指向线程栈中的 Lock Record,若是更新成功,则轻量级锁获取成功,记录锁状态为轻量级锁;不然,说明已经有线程得到了轻量级锁,目前发生了锁竞争(不适合继续使用轻量级锁),接下来膨胀为重量级锁

固然,因为轻量级锁自然瞄准不存在锁竞争的场景,若是存在锁竞争但不激烈,仍然能够用自旋锁优化,自旋失败后再膨胀为重量级锁。

重量级锁

内置锁在 Java 中被抽象为监视器锁(monitor)。在 JDK 1.6 以前,监视器锁能够认为直接对应底层操做系统中的互斥量(mutex)。这种同步方式的成本很是高,包括系统调用引发的内核态与用户态切换、线程阻塞形成的线程切换等。所以,后来称这种锁为“重量级锁”。

synchronized 锁的升级过程

锁升级是单向的: 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁


图片引用 blog.dreamtobe.cn

原子类

什么是原子类,有什么用

  • 首先原子,就是不可分割。一个操做是不可中断的,即便在多线程环境下也能够保证。
  • java.util.concurrent.atomic 包。
  • 原子类的做用和锁类似,是为了保证并发状况下线程安全。不过原子类相比于锁,有必定的优点。
  • 粒度更细:原子变量能够把竞争范围缩小到变量级别,这是咱们能够得到到最细粒度的状况,一般锁的粒度都要大于变量的粒度。
  • 效率更高:一般,使用原子类的效率比使用锁要高。

六类原子类纵览

Atomic 基本类型原子类

以 AtomicInteger 为例,经常使用方法:

  • int get() : 获取当前值
  • int getAndSet(int i): 获取当前值,并设置新值
  • int getAndIncrement() : 获取当前的值,并自增
  • int getAndDecrement() :获取当前值,并自减
  • int getAndAdd(int delta): 获取当前值,并在当前值上增长预期值
  • boolean compareAndSet(int expect,int update):若是输入的数值等于预期值,则以原子的方式将该值设置为输入值(update)。

代码演示:

public class AtomicIntegerTest implements Runnable {

    private static AtomicInteger atomicInteger = new AtomicInteger();
    private static int i = 0;

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerTest test = new AtomicIntegerTest();
        Thread thread = new Thread(test);
        Thread thread1 = new Thread(test);
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        System.out.println("原子类结果为:" + atomicInteger.get());
        System.out.println("普通int结果为:" + i);
    }

    @Override
    public void run() {
        for (int j = 0; j < 10000; j++) {
            atomicInteger.getAndIncrement();
            i++;
        }
    }
}

运行结果:

原子类结果为:20000
普通int结果为:18647

Atomic 数组原子类

直接代码演示:

public class AtomicArrTest {

    public static AtomicIntegerArray integerArray = new AtomicIntegerArray(1000);

    public static void main(String[] args) throws InterruptedException {

        //自减
        Runnable runnable1 = () -> {
            for (int i = 0; i < integerArray.length(); i++) {
                integerArray.getAndDecrement(i);
            }
        };

        //自加
        Runnable runnable2 = () -> {
            for (int i = 0; i < integerArray.length(); i++) {
                integerArray.getAndIncrement(i);
            }
        };

        Thread[] threads1 = new Thread[100];
        Thread[] threads2 = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads1[i] = new Thread(runnable1);
            threads2[i] = new Thread(runnable2);
            threads1[i].start();
            threads2[i].start();
        }

        //等待线程运行结束
        for (int i = 0; i < 100; i++) {
            threads1[i].join();
            threads2[i].join();
        }

        for (int i = 0; i < integerArray.length(); i++) {
            if (integerArray.get(i) != 0) {
                System.out.println("原子类型不安全!发生不等于0的错误" + i);
            }
        }
        System.out.println("运行结束");
    }

}

运行结果:

运行结束

能够发现结果并无一加一减或者一减一加不等于 0 的错误。

Atomic Reference 引用类型原子类

AtomicReference 和 AtomicInteger 很是相似,不一样之处就在于 AtomicInteger 是对整数的封装,而 AtomicReference 则对应普通的对象引用。也就是它能够保证你在修改对象引用时的线程安全性。

AtomicReference 是做用是对”对象”进行原子操做。 提供了一种读和写都是原子性的对象引用变量。

代码演示:

public class AtomicReferenceTest {

    public static void main(String[] args) throws InterruptedException {
        AtomicReference<Integer> ref = new AtomicReference<>(new Integer(1000));
        Runnable runnable = () -> {
            for (; ; ) {
                Integer num = ref.get();
                if (ref.compareAndSet(num, num + 1)) {//cas
                    break;
                }
            }
        };
        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread t = new Thread(runnable, "Thread-" + i);
            list.add(t);
            t.start();
        }
        for (Thread t : list) {
            t.join();
        }
        System.out.println(ref.get()); //输出结果:2000
    }

}

把普通变量升级为具备原子功能

能够使用 AtomicIntegerFieldUpdater 对普通变量进行升级

那为何不直接在一开始就进行声明为原子变量呢?

由于在有的时候,好比咱们只有在某一时刻须要原子操做,存在大量并发的状况。而在大部分时候都没有并发问题的话,就没有必要一直都进行原子操做。

代码演示

public class AtomicIntegerFieldUpdaterTest implements Runnable {

    private static Candidate tom = new Candidate();
    private static Candidate peter = new Candidate();
    private static AtomicIntegerFieldUpdater<Candidate> candidateUpdater;

    public static class Candidate {
        volatile int score;
    }

    public static void main(String[] args) throws InterruptedException {
        candidateUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
        AtomicIntegerFieldUpdaterTest test = new AtomicIntegerFieldUpdaterTest();
        Thread thread1 = new Thread(test);
        Thread thread2 = new Thread(test);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("普通变量:" + tom.score);
        System.out.println("原子变量:" + peter.score);
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            tom.score++;
            candidateUpdater.getAndIncrement(peter);
        }
    }
}

注意点

AtomicIntegerFieldUpdater 不支持 static,以及修饰符不可见范围。

Adder 累加器

Adder 是 Java 8 中引入的一个类。

高并发下 LongAdder 比 AtomicLong效率高,不过本质仍是空间换时间

竞争激烈的时候,LongAdder 把不一样线程对应到不一样的 Cell 上进行修改,下降了冲突几率,是多段锁的理念,提升了并发效率。

代码演示 AtomicLong 耗时

public class AtomicLongTest {

    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong(0);
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.get());
        System.out.println("AtomicLong耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private AtomicLong counter;

        public Task(AtomicLong counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.incrementAndGet();
            }
        }
    }

}

运行结果:

100000000
AtomicLong耗时:1624

代码演示 AtomicLong 耗时

public class LongAdderTest {

    public static void main(String[] args) {
        LongAdder counter = new LongAdder();
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) {

        }
        long end = System.currentTimeMillis();
        System.out.println(counter.sum());
        System.out.println("LongAdder耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private LongAdder counter;

        public Task(LongAdder counter) {
            this.counter = counter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        }
    }

}

运行结果:

100000000
LongAdder耗时:464

能够看到差距很是大,咱们看一下为何 AtomicLong 在并发下执行时间这么长。

AtomicLong 的弊端

由于每一次加法,都要进行 flush 和 refresh 致使耗费资源。

在线程 1 进行了修改操做后,就要当即刷新到主存,而后其余线程再去进行更新。

LongAdder 的改进

LongAdder 的实现原理是,在每一个线程内部都有一个本身的计数器,仅在本身内部计数,这样就不会被其余线程的计数器干扰。

如图示,第一个线程计数器的值也就是 ctr‘ 为 1 的时候,可能线程 2 的 str‘’已是 3 了,它们之间并不存在竞争关系,因此在加和的过程当中,不须要同步,也不须要 flush 和 refresh。

LongAdder 引入了分段累加的概念,内部有一个 base 变量和一个 Cell[]数组共同参与计数:

base 变量:竞争不激烈,直接累加到变量上。

Cell[] 数组:竞争激烈,各个线程分散累加到本身的槽 Cell[i] 中。

sum 方法源码

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {//若是没有用到cell直接返回
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;//逐步累加
            }
        }
        return sum;
}

总结

  1. 在低争用的状况下,二者差距不大,可是在竞争激烈的状况下,LongAdder 吞吐量要高,可是要消耗更多的空间。
  2. LongAdder 适合的场景是统计求和的场景,并且 LongAdder 只提供了 add 方法,而 AtomicLong 还具备 CAS 方法。

Accumulator 累加器

Accumulator 和 Adder 相似,就是一个更通用版本的 Adder。

代码演示

public class LongAccumulatorTest {

     public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 100);
        ExecutorService executor = Executors.newFixedThreadPool(8);
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        executor.shutdown();
        while (!executor.isTerminated()) {

        }
        System.out.println(accumulator.getThenReset());
    }
}

运行结果:

145

CAS

CAS 是 compare and swap 的缩写,也就是咱们所说的比较并交换。cas 是一种基于锁的操做,并且是乐观锁。

举例就是我认为 V 的值应该是张三,若是是的话我就把它改成李四,若是不是张三,就说明被人就改过了,那我就不修改了。

CAS 中有三个操做数:内存值 V,预期值 A,要修改的值 B,当V == A 时,则修改成B。不然什么都不作,返回如今的 V 值。

CAS 源码解析

例如 AtomicInteger 原子类加载了 Unsafe 工具,用来直接操做内存数据

用 volatile 修饰 value 字段,保证可见性。

就以 getAndAdd 方法举例,咱们看下源码:

public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

底层调用了 unsafe 类的方法:

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

里面使用了 do while 循环,若是 a,b 线程同时执行这个方法,a 线程拿到值 1 后 cpu 执行时间到了挂起,b 开始执行,也拿到 1,可是没有挂起,接着将值变成了 2。

这个时候 a 线程恢复执行,去比较的时候发现手上的 1 和内存里面的值 2 不等,这个时候就要进行下一个循环。

compareAndSwapInt 是 native 方法,Unsafe 类提供了硬件级别的原子操做,而咱们传入的 valueOffset 就是根据内存偏移地址获取数据原值,这样就能够经过 unsafe 来实现 CAS。

总结

  • CAS 存在 ABA 问题。

ABA 问题就是在主内存中本来是 A 后来有另一个线程修改成了 B 后又改回了 A,第一个线程回来看后仍是 A 觉得没有变化,实际上已经有了变化。

如何解决 ABA 问题?

AtomicStampedReference 增长版本号,进行版本号判断。

  • 自旋时间可能过长。

并发容器

并发容器概览

  • ConcurrentHashMap:线程安全的 HashMap
  • CopyOnWriteArrayList:线程安全的 List
  • BlockingQueue:这是一个接口,表示阻塞队列
  • ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现,是一个线程安全的 LinkedList
  • ConcurrentSkipListMap:是一个 Map,使用跳表的数据结构进行快速查找

古老的同步容器

Vector 和 HashTable

并发性能较差,关键方法都是使用 synchronized 修饰的方法级别。

public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
        Entry<K,V> entry = (Entry<K,V>)tab[index];
        for(; entry != null ; entry = entry.next) {
            if ((entry.hash == hash) && entry.key.equals(key)) {
                V old = entry.value;
                entry.value = value;
                return old;
            }
        }

        addEntry(hash, key, value, index);
        return null;
    }

HashMap 和 ArrayList

虽然这两个类不是线程安全的,可是咱们能够使用 Collections.synchronizedList()和 Collections.synchronizedMap()使其变为线程安全的。

打开源码能够看到是使用的同步代码块的方式:

ConcurrentHashMap

Map 家族概览:

HashMap 关于并发的特色

  1. 非线程安全
  2. 迭代时不容许修改
  3. 只读的并发是安全的
  4. 若是要用在并发的话,使用 Collections.synchronizedMap(new HashMap())

Java1.7 中 ConcurrentHashMap 结构

java 1.7 中 ConcurrentHashMap 最外层是多个segment每一个 segment 的底层数据结构和 HashMap 相似,仍然是数组和链表组成的拉链法

每一个 segment 中包含独立的ReentrantLock锁,每一个 segment 之间互不影响,提升了并发效率。

ConcurrentHashMap 默认有 16 个 segment,因此最多支持 16 个线程同时并发写入。这个值能够在初始化时填入,一旦初始化后,是不能扩容的。

Java8 中 ConcurrentHashMap 结构

put 方法解析

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 获得 hash 值
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 若是数组为空,进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组
            tab = initTable();

        // 找该 hash 值对应的数组下标,获得第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 若是数组该位置为空,
            // 用一次 CAS 操做将这个新值放入其中便可,这个 put 操做差很少就结束了
            // 若是 CAS 失败,那就是有并发操做,进入下一次循环
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        //说明正在扩容
        else if ((fh = f.hash) == MOVED)
            // 数据迁移
            tab = helpTransfer(tab, f);

        else { // f 是该位置的头结点,并且不为空

            V oldVal = null;
            // 获取数组该位置的头结点的监视器锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 若是发现了"相等"的 key,判断是否要进行值覆盖,而后也就能够 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判断是否要将链表转换为红黑树,临界值和 HashMap 同样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 这个方法和 HashMap 中稍微有一点点不一样,那就是它不是必定会进行红黑树转换,
                    // 若是当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }

    addCount(1L, binCount);
    return null;
}

get 方法分析

  1. 计算 hash 值
  2. 根据 hash 值找到数组对应位置: (n - 1) & h
  3. 根据该位置处结点性质进行相应查找

若是该位置为 null,那么直接返回 null

若是该位置处的节点恰好就是咱们须要的,返回该节点的值便可

若是该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,而后经过 find 方法去寻找

若是以上 3 条都不知足,那就是链表,进行遍历比对

CopyOnWriteArrayList

Vector 和 SynchronizedList 的锁粒度太大,并发效率较低,而且迭代时没法编辑

另外 CopyOnWriteSet 是用来代替同步 Set。

适用场景

读多写少

读操做能够尽量的快,而写即便慢一些也不要紧。

在不少应用场景中,读操做可能会远远多于写操做。好比,有些系统级别的信息,每每只须要加载或者修改不多的次数,可是会被系统内全部模块频繁的访问。对于这种场景,咱们最但愿看到的就是读操做能够尽量的快,而写即便慢一些也不要紧。

读写规则

以前的读写锁:读读共享、写写互斥、读写互斥。

读写锁规则的升级:读取是彻底不须要加锁的,而且更强的是,写入也不会阻塞读取操做,只有写入和写入之间须要同步等待。

代码演示

首先咱们看一下使用 ArrayList 带来的修改问题。

对 Vector、ArrayList 在迭代的时候若是同时对其进行修改就会抛出 java.util.ConcurrentModificationException 异常

public class Test {

    public static void main(String[] args) {
        List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {
                list.remove("5");
            }
            if (next.equals("4")) {
                list.add("new");
            }
        }
    }

}

运行结果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
    at java.util.ArrayList$Itr.next(ArrayList.java:859)
    at test.Test.main(Test.java:25)

咱们看一下源码:

final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
}

在建立迭代器的时候会把对象的 modCount 的值传递给迭代器的 expectedModCount。

每次 next 的时候判断是否一致若是不一致则抛出异常。

使用 CopyOnWriteArrayList

public class Test {

    public static void main(String[] args) {
        List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {
            String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {
                list.remove("5");
            }
            if (next.equals("4")) {
                list.add("new");
            }
        }
    }

}

运行结果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4]
[1, 2, 3, 4, new]

源码解析

先看一下 add 方法

public boolean add(E e) {
        //1.得到独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //2.得到Object[]数组
            Object[] elements = getArray();
            //3.得到elements的长度
            int len = elements.length;
            //4.复制到新的数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //5.将add的元素添加到新元素
            newElements[len] = e;
            //6.替换以前的数据
            setArray(newElements);
            return true;
        } finally {
            //7.释放独占锁
            lock.unlock();
        }
}

CopyOnWriteArrayList 使用了 ReentrantLock 独占锁,保证同时只有一个线程对集合进行修改操做。

数据是存储在 CopyOnWriteArrayList 中的 array 数组中的。

在添加元素的时候,并非直接往 array 里面 add 元素,而是复制出来了一个新的数组,而且复制出来的数组的长度是 【旧数组的长度+1】,再把旧的数组替换成新的数组。

get方法

public E get(int index) {
        return get(getArray(), index);
}

get 方法没有加锁,很简单,直接获取元素。 可是不保证数据是最新的,也就是弱一致性

set方法

public E set(int index, E element) {
        //得到独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //得到Object数组
            Object[] elements = getArray();
            //根据下标,得到旧的元素
            E oldValue = get(elements, index);
            //若是旧的元素不等于新的元素
            if (oldValue != element) {
                // 得到旧数组的长度
                int len = elements.length;
                // 复制出新的数组
                Object[] newElements = Arrays.copyOf(elements, len);
                // 修改
                newElements[index] = element;
                //替换
                setArray(newElements);
            } else {
                //为了保证volatile 语义,即便没有修改,也要替换成新的数组
                setArray(elements);
            }
            return oldValue;
        } finally {
            //释放独占锁
            lock.unlock();
        }
    }

仍是使用 lock 加锁,而后复制一个 arr 副本进行修改,以后覆盖。

remove 方法

public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

能够看到,remove 方法和 add,set 方法是同样的,第一步仍是先获取独占锁,来保证线程安全性,若是要删除的元素是最后一个,则复制出一个长度为【旧数组的长度-1】的新数组,随之替换,这样就巧妙的把最后一个元素给删除了,若是要删除的元素不是最后一个,则分两次复制,随之替换。

CopyOnWrite 的缺点

CopyOnWrite 容器有不少优势,可是同时也存在两个问题,即内存占用问题和数据一致性问题。因此在开发的时候须要注意一下。

内存占用问题:由于 CopyOnWrite 的写时复制机制,因此在进行写操做的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象。

数据一致性问题:CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。

阻塞队列简介

什么是阻塞队列?

阻塞队列(BlockingQueue) 是一个支持两个附加操做的队列。这两个附加的操做是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列经常使用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

经常使用的队列主要有如下两种:

先进先出(FIFO):先插入队列的元素也最早出队列,相似于排队的功能。

后进先出(LIFO):后插入队列的元素最早出队列,这种队列优先处理最近发生的事件。

核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

ArrayBlockingQueue

ArrayBlockingQueue 是一个阻塞式的队列,继承自 AbstractBlockingQueue,间接的实现了 Queue 接口和 Collection 接口。底层以数组的形式保存数据(实际上可看做一个循环数组)。而且是一个基于数组的阻塞队列。

ArrayBlockingQueue 是一个有界队列,有界也就意味着,它不可以存储无限多数量的对象。因此在建立 ArrayBlockingQueue 时,必需要给它指定一个队列的大小。

而且还能够指定是否公平,若是保证公平的话,那么等待了最长时间的线程会被优先处理,不过会带来性能损耗。

代码示例

有 10 个面试者,一共只有一个面试官,大厅里有 3 个位置,每一个面试时间是 10 秒,模拟面试场景。

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {

        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();
    }
}

class Interviewer implements Runnable {

    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("10个候选人都来啦");
        for (int i = 0; i < 10; i++) {
            String candidate = "Candidate" + i;
            try {
                queue.put(candidate);
                System.out.println("安排好了" + candidate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put("stop");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {

        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg;
        try {
            while (!(msg = queue.take()).equals("stop")) {
                System.out.println(msg + "到了");
            }
            System.out.println("全部候选人都结束了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

10个候选人都来啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
Candidate0到了
安排好了Candidate3
Candidate1到了
Candidate2到了
安排好了Candidate4
Candidate3到了
Candidate4到了
Candidate5到了
安排好了Candidate5
安排好了Candidate6
Candidate6到了
安排好了Candidate7
Candidate7到了
安排好了Candidate8
Candidate8到了
安排好了Candidate9
Candidate9到了
全部候选人都结束了

源码解析

ArrayBlockingQueue 进队操做采用了加锁的方式保证并发安全。

public void put(E e) throws InterruptedException {
    // 非空判断
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            // 一直阻塞,知道队列非满时,被唤醒
            notFull.await();
        }
        // 进队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 不一样于 ArrayBlockingQueue,它若是不指定容量,默认为 Integer.MAX_VALUE,也就是无界队列。因此为了不队列过大形成机器负载或者内存爆满的状况出现,咱们在使用的时候建议手动传入一个队列的大小。

源码分析

/**
 * 节点类,用于存储数据
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;

/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞队列的头结点
 */
transient Node<E> head;

/**
 * 阻塞队列的尾节点
 */
private transient Node<E> last;

/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();

从上面的属性咱们知道,每一个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,添加的链表队列中,其中 head 和 last 分别指向队列的头结点和尾结点。与 ArrayBlockingQueue 不一样的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操做并非互斥操做,能够同时进行,这样也就能够大大提升吞吐量。

put方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁
    putLock.lockInterruptibly();
    try {
        //判断队列是否已满,若是已满阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 把node放入队列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判断队列是否有可用空间,若是有唤醒下一个线程进行添加操做
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 若是队列中有一条数据,唤醒消费线程进行消费
    if (c == 0)
        signalNotEmpty();
}
  • 队列已满,阻塞等待。
  • 队列未满,建立一个 node 节点放入队列中,若是放完之后队列还有剩余空间,继续唤醒下一个添加线程进行添加。若是放以前队列中没有元素,放完之后要唤醒消费线程进行消费。

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认状况下元素采用天然顺序升序排列。也能够自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但须要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue 也是基于最小二叉堆实现,使用基于 CAS 实现的自旋锁来控制队列的动态扩容,保证了扩容操做不会阻塞 take 操做的执行。

SynchronousQueue

SynchronousQueue 是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另外一个线程从队列中获取了队列中存储的元素。一样,若是线程尝试获取元素而且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

SynchronousQueue 没有 peek 等函数,由于 peek 的含义是取出头结点,可是 SynchronousQueue 容量是 0,因此没有头结点。

SynchronousQueue 是线程池 Executors.newCachedThreadPool()使用的阻塞队列。

DelayQueue

DelayQueue 是一个没有边界 BlockingQueue 实现,加入其中的元素必需实现 Delayed 接口。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最先到期的,越日后到期时间赿晚。底层基于前面说过的 PriorityBlockingQueue 实现的。

ConcurrentLikedQueue

是一个适用于高并发场景下的队列,经过无锁的方式,底层使用 CAS,实现了高并发状态下的高性能,一般 ConcurrentLikedQueue 性能好于 BlockingQueue。

它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最早加入的,尾是最近加入的,该队列不容许 null 元素。

控制并发流程

什么是控制并发流程

控制并发流程的工具类,做用就是帮助咱们更容易的让线程之间合做,相互配合,来知足业务逻辑。

好比线程 A 等待线程 B 执行完成后再执行某段代码。

经常使用的控制并发流程的工具类

做用 说明
Semaphore 信号量,能够经过控制"许可证"的数量,来保证线程之间的配合 线程只有在拿到"许可证"后才能继续运行,更加灵活
CyclicBarrier 线程会等待,直到足够多线程达到了事先规定的数目,一旦到达触发条件,就能够进行下一步操做 是用于线程间相互等待处理结果就绪的状况
Phaser 和 CyclicBarrier 相似,但计数可变 java1.7 中加入
CountDownLatch 和 CyclicBarrier 相似,数量递减到 0 时候触发 不能够重复使用
Exchanger 让两个对象在合适时候交换对象 适用于在两个线程工做同一个类的不一样实例时,交换数据
Condition 能够控制线程的等待和唤醒 是 Object.wait()升级版

CountDownLatch

什么是 CountDownLatch

CountDownLatch 这个类使一个线程等待其余线程各自执行完毕后再执行。
是经过一个计数器来实现的,传入须要倒数的值。每当一个线程执行完毕后,计数器的值就减 1,当计数器的值为 0 时,表示全部线程都执行完毕,而后在闭锁上等待的线程就能够恢复工做了。

主要方法介绍

  • CountDownLatch(int count):仅有一个构造函数,参数为 count 须要倒数的值。
  • await():调用 await()方法的线程会被挂起,他会等待直到 count 为 0 才会继续执行。
  • countDown():将 count 值减 1.直到为 0 时,其余等待的线程就会被唤醒。

用法一:一个线程等待多个线程都执行完毕,再继续本身的工做

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 已经上车!");
            countDownLatch.countDown();
        };
        for (int i = 0; i < 5; i++) {
            executorService.execute(r);
        }
        System.out.println("等待你们上车");
        countDownLatch.await();
        System.out.println("5我的都已经上车,能够出发咯");
        executorService.shutdown();
    }
}

运行结果:

等待你们上车
pool-1-thread-2 已经上车!
pool-1-thread-1 已经上车!
pool-1-thread-3 已经上车!
pool-1-thread-4 已经上车!
pool-1-thread-5 已经上车!
5我的都已经上车,能够出发咯

用途二:多个线程同时等待结束后一块儿工做

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {
                System.out.println(Thread.currentThread().getName() + " 已经就绪!");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + " 开始跑步!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 5; i++) {
            executorService.execute(r);
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println("信号枪!biu!");
        countDownLatch.countDown();
        executorService.shutdown();
    }
}

运行结果:

pool-1-thread-1 已经就绪!
pool-1-thread-2 已经就绪!
pool-1-thread-3 已经就绪!
pool-1-thread-4 已经就绪!
pool-1-thread-5 已经就绪!
信号枪!biu!
pool-1-thread-1 开始跑步!
pool-1-thread-2 开始跑步!
pool-1-thread-5 开始跑步!
pool-1-thread-4 开始跑步!
pool-1-thread-3 开始跑步!

注意点

CountDownLatch 是不可以重用的,若是须要从新计数能够使用 CyclicBarrier,或者建立新的 CountDownLatch 实例。

Semaphore 信号量

Semaphore 能够用来限制和管理数量优先资源的使用状况。

信号量的做用是维护一个许可证的计数,线程能够获取许可证,那信号量剩余的许可证就减一,线程也能够释放一个许可证,那就会加一,当信号量所拥有的许可证为 0 的时候,则须要等待,直到有线程释放了许可证。

主要方法介绍

  • new Semaphore(int permits,boolean fair):第一个参数为许可证数量,第二个是否为公平策略,即等待的线程放到 FIFO 队列中。
  • acquire():获取一个许可证,若是没有则等待,容许被中断。
  • acquireUninterruptibly():取一个许可证,若是没有则等待,不容许被中断。
  • tryAcquire():看看目前有没有空闲的许可证,有就获取,无则干别的事,不阻塞。
  • tryAcquire(long timeout):若是在 timeout 时间段内拿不到,就作别的事。
  • release():归还许可证。

代码演示:每次只有三我的的作任务

public class SemaphoreTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Semaphore semaphore = new Semaphore(3, true);
        Runnable r = () -> {
            try {
                semaphore.acquire(); //acquire里面能够传入数值,好比传入3 也就是一下能够拿三个许可,同时释放时候也要传入对应的值
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("拿到许可证!开始作任务!");
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务结束!释放许可证!");
            semaphore.release();
        };
        for (int i = 0; i < 1000; i++) {
            executorService.submit(r);
        }
        executorService.shutdown();
    }

}

Condition

Condition 做用

当线程 1 须要等待某个条件的时候,它就去执行 condition.await()方法,一旦执行了 await()方法,线程就进入阻塞状态。

而后假设线程 2 执行condition.signal()方法,这时 JVM 就会从被阻塞的线程中找到那些被 condition.await()中的线程,这样线程 1 就会受到可执行信号,状态就变成Runnable

signalAll()和 signal()的区别

signalAll()会唤起全部的正在等待的线程。

可是 signal()是公平的,只会唤起等待时间最长的线程。

Condition 基本使用

public class ConditionTest {

    private static Lock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        ConditionTest test = new ConditionTest();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                test.methodB();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        test.methodA();
    }

    private void methodA() {
        lock.lock();
        try {
            System.out.println("开始阻塞");
            condition.await();
            System.out.println("我被唤醒了!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void methodB() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

运行结果:

开始阻塞
我被唤醒了!

使用 Condition 实现生产者消费者

public class ConditionTest {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionTest test = new ConditionTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满,等待有空余");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

Condition 注意点

实际上,Condition 就是用来代替 Object.wait/nofity 的,因此用法上和性质上几乎同样。

await()方法会自动释放 Lock 锁,和 Object.wait 同样,不须要本身手动释放锁。

调用 await()的时候,必须持有锁,不然会抛出异常。

CyclicBarrier 循环栅栏

CyclicBarrier 和 CountDownLatch 很像,都能阻塞一组线程。

当有大量的线程相互配合,分别计算不一样任务,最后统一汇总时候,咱们能够使用 CyclicBarrier,CyclicBarrier 能够构造一个集结点,当某一个线程完毕后,就会到达集结点等待,等全部线程都到了以后,栅栏就会被撤销,而后全部线程统一出发,继续执行剩下的任务。

代码演示

public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("全部人都到场了, 你们统一出发!"));
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "如今前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到了集合地点,开始等待其余人到达");
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

线程0如今前往集合地点
线程1如今前往集合地点
线程2如今前往集合地点
线程3如今前往集合地点
线程4如今前往集合地点
线程5如今前往集合地点
线程6如今前往集合地点
线程7如今前往集合地点
线程8如今前往集合地点
线程9如今前往集合地点
线程9到了集合地点,开始等待其余人到达
线程8到了集合地点,开始等待其余人到达
线程6到了集合地点,开始等待其余人到达
线程5到了集合地点,开始等待其余人到达
线程0到了集合地点,开始等待其余人到达
全部人都到场了, 你们统一出发!
线程0出发了
线程9出发了
线程6出发了
线程8出发了
线程5出发了
线程1到了集合地点,开始等待其余人到达
线程4到了集合地点,开始等待其余人到达
线程2到了集合地点,开始等待其余人到达
线程3到了集合地点,开始等待其余人到达
线程7到了集合地点,开始等待其余人到达
全部人都到场了, 你们统一出发!
线程7出发了
线程1出发了
线程2出发了
线程3出发了
线程4出发了

CyclicBarrier 和 CountDownLatch 的区别

做用不一样:CyclicBarrier 要等待固定线程数量都到了栅栏位置才能继续执行;而 CountDownLatch 只须要等待数字为 0,也就是说 CountDownLatch 用于事件,可是 CyclicBarrier 用于线程。

可重用性不一样:CountDownLatch 在到达 0 后打开门闩,就不能在使用了,除非用新的实例,而 CyclicBarrier 能够重复使用。

AQS

AQS 全名:AbstractQueuedSynchronizer,是并发容器 java.lang.concurrent 下 locks 包内的一个类。它实现了一个 FIFO 的队列。底层实现的数据结构是一个双向链表

AQS 核心思想是,若是被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工做线程,而且将共享资源设置为锁定状态。若是被请求的共享资源被占用,那么就须要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

AQS 内部维护了一个 CLH 队列来管理锁。线程会首先尝试获取锁,若是失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync queue里。接着会不断的循环尝试获取锁,条件是当前节点为 head 的直接后继才会尝试。若是失败就会阻塞本身直到本身被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

AQS 内部核心部分

AQS 最核心的三大部分:

  • state
  • 控制线程抢锁和配合的 FIFO 队列
  • 指望协做工具类去实现的获取/释放等重要方法

state 状态

/**
* The synchronization state.
*/
private volatile int state;

这个 state 具体含义,会根据具体实现类不一样而不一样,好比在 Semaphore 里,它表示剩余的许可证数量,而在 CountDownLatch 中,表示还须要倒数的数量

state 是 volatile 修饰的,会被并发的修改,因此全部修改 state 的方法都须要保证线程安全,好比 getState、setState 以及 compareAndSetState 操做来读取和更新这个状态。这些方法都依赖与 atomic 包的支持。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

控制线程抢锁和配合的 FIFO 队列

这个队列用来存放等待的线程,AQS 就是排队管理器,当多个线程争用同一把锁时,必须有排队机制将没有拿到线程的锁串在一块儿,当锁释放的时候,管理器就会挑选一个合适的线程来占有释放的锁。

AQS 会维护一个等待的线程队列,把线程都放到队列中。

指望协做工具类去实现的获取/释放等重要方法

这里的获取和释放方法,是利用 AQS 的写做工具类中最重要的方法,是由协做类本身去实现的,而且含义各不相同

获取方法

  • 会依赖 state 变量,常常会阻塞
  • 在 Semaphore 中,获取就是 acquire 方法,做用就是获取一个许可证
  • 在 CountDownLatch 中,获取就是 await 方法,做用就是等待直到 0 结束

释放方法

  • 释放操做不会阻塞
  • 在 Semaphore 中,释放就是 release 方法,做用就是释放一个许可证
  • 在 CountDownLatch 中,获取就是 CountDown 方法,做用就是减小一个数

而且子类还须要重写 tryAcquire 和 tryRelease 方法。

AQS 源码分析

AQS 用法

第一步:写一个类,想好协做的逻辑,实现获取/释放方法。

第二步:内部写一个Sync类继承AbstractQueuedSynchronizer

第三步:根据是否独占来重写 tryAcquire/tryRelease 或 tryAcquireShared(int acquires)和 tryReleaseShared(int releases)等方法,在以前写的获取/释放方法中调用 AQS 的 acquire 或者 shared 方法。

AQS 在 CountDownLatch 中的应用

  • 内部类 Sync 继承了 AQS

首先咱们看一下构造方法

底层建立了一个 Sync 对象。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
}

getCount 方法中也只是返回了 state 的值。

public long getCount() {
        return sync.getCount();
}

int getCount() {
    return getState();
}

await 方法解析:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())//判断当前线程是否中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)  //tryAcquireShared主要判断当前状态是否==0,若是返回1 能够直接放行,不然返回-1 进入队列
            doAcquireSharedInterruptibly(arg);
    }
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //加入到node节点中,SHARED表示共享模式
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  //阻塞当前线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

countDown 方法解析:

public void countDown() {
        sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); //若是返回true,则会调用此方法唤醒全部等待中的线程。
            return true;
        }
        return false;
    }
protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0) // == 0 说明已经释放
                    return false;
                int nextc = c-1; // 将state - 1
                if (compareAndSetState(c, nextc)) //经过CAS更新state
                    return nextc == 0; //若是== 0说明门闸打开
            }
}

Future 和 Callable

基本用法

首先看一下 Runnable 的缺陷

  • 没有返回值
  • 没法抛出异常

Callable 接口

  • 相似于 Runnable,被其余线程执行的任务
  • 实现 call 方法
  • 有返回值
  • 能够抛出异常
@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}

Future 类

在并发编程中,咱们常常用到非阻塞的模型,在以前的多线程的三种实现中,无论是继承 Thread 类仍是实现 Runnable 接口,都没法保证获取到以前的执行结果。经过实现 Callable 接口,并用 Future 能够来接收多线程的执行结果。

Future 表示一个可能尚未完成的异步任务的结果,针对这个结果能够添加 Callable 以便在任务执行成功或失败后做出相应的
操做。

Future 接口定义了主要的 5 个接口方法,有 RunnableFuture 和 SchedualFuture 继承这个接口,以及 CompleteFuture 和 ForkJoinTask 继承这个接口。

Callable 和 Future 的关系

  • 咱们能够用 Future.get()方法来获取 Callable 接口返回的执行结果,还能够经过 Future.isDone()来判断任务是否以及执行完了,以及取消这个任务,限时获取任务的结果等。
  • 在 call()未执行完毕以前,调用 get()的线程会被阻塞,知道 call()方法返回告终果后,才会获得结果,而后线程切换至 Runnable 状态。

因此 Future 是一个存储器,它存储了call()这个任务的结果,而这个任务的执行时间是没法提早肯定的,由于这彻底取决于 call()方法执行的状况。

主要方法介绍

  • get():获取结果,get 方法的行为取决于 Callable 任务的状态,只有如下五种状况:
  1. 任务正常完成,get 方法当即返回结果
  2. 任务没有完成,get 方法会阻塞到任务完成
  3. 任务执行中抛出异常,get 方法就会抛出 ExecutionException:这里抛出的异常是 call()执行时产生的异常,不论里面 call()抛出的是什么异常
  4. 任务被取消,get 方法抛出 CancellationException
  5. 任务超时,get 方法能够传入超时时间,若是时间到了还没获取到结果,get 方法就会抛出 TimeoutException
  • get(long timeout,TimeUnit unit):有超时的获取
  • cancel():取消任务的执行
  • isDone():判断线程是否执行完毕
  • isCancelled():判断是否被取消

基本使用

在阻塞一秒后获取到返回值。

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        Callable<Integer> callable = () -> {
            TimeUnit.SECONDS.sleep(1);
            return 10;
        };
        Future<Integer> future = service.submit(callable);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

运行结果:

10

异常捕获演示

无论里面发生什么异常,咱们只能捕获到 ExecutionException 异常。

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());
        try {
            System.out.println(future.isDone()); //并不关心是否抛出异常
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("InterruptedException异常");
        } catch (ExecutionException e) {
            e.printStackTrace();
            System.out.println("ExecutionException异常");
        }finally {
            service.shutdown();
        }
    }

    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            throw new IllegalArgumentException("Callable抛出异常");
        }
    }
}

运行结果:

true
ExecutionException异常
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.concurrent.FutureTest.main(FutureTest.java:12)
Caused by: java.lang.IllegalArgumentException: Callable抛出异常
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:27)
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:24)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

cancel 方法:取消任务的执行

  1. 若是这个任务还没开始执行,那么这种状况最简单,任务被正常的取消,将来也不会被执行,方法返回 true。
  2. 若是任务已经完成,或者已经取消,返回 false。
  3. 若是已经开始了,那么不会取消该任务,而是根据咱们填入的参数 MayInterruptIfRunning 作判断。若是传入 true 则发出中断信号,false 则不发送。

FutureTask

咱们也能够使用 FutureTask 来获取 Future 的任务结果,FutureTask 能够把 Callable 转化成 Future 和 Runnable,它同时实现了两者的接口。

把 Callable 实例当作参数,生成 FutureTask 对象,而后把这个对象当作一个 Runnable 对象,用线程池或另起线程去执行 Runnable 对象,最后经过 FutureTask 获取刚才执行的结果。

代码演示

public class FutureTest {

    public static void main(String[] args) {
        Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(integerFutureTask);
        try {
            System.out.println("task运行结果:"+integerFutureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子线程正在计算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}

运行结果:

子线程正在计算
task运行结果:5050

FutureTask 注意点

  • Future 的生命周期不能后退,一旦完成后,就停留在完成状态。
  • 当 for 循环批量获取 future 的结果时,容易发生一部分线程慢的状况,get 方法调用时应使用 timeout 限制。也能够使用 CompletableFuture 工具类,它的做用是哪一个线程先完成就先获取哪一个结果。