高级程序员需知的并发编程知识(二)

说明

本篇是继上一篇并发编程未讨论完的内容的续篇。上一篇传送门:java

Java并发编程一万字总结(吐血整理)编程

活跃性问题

在上一篇咱们讨论并发编程带来的风险的时候,说到其中 一个风险就是活跃性问题。活跃性问题其实就是咱们的程序在某些场景或条件下执行不下去了。在这个话题下咱们会去了解什么是死锁、活锁以及饥饿,该如何避免这些状况的发生。缓存

死锁

咱们通常使用加锁来保证线程安全,可是过分地使用加锁,可能致使死锁发生。安全

哲学家进餐问题bash

“哲学家进餐”问题能很好地描述死锁的场景。5个哲学家去吃火锅,坐在一张圆桌上。它们有5根筷子(不是5双),这5根筷子放在每一个人的中间。哲学家时而思考,时而进餐。每一个人都要取到一双筷子才能吃到东西,而且在吃完后将筷子放回原处。网络

在这里插入图片描述
能够考虑一下这种状况,若是每一个人都当即抓住本身左边的筷子,而后等待本身右边的筷子空出来,但同时都不放手本身已经拿到的筷子。会出现什么状况。能够想到,每一个人都吃不上火锅了,只等凉凉了。多线程

什么是死锁并发

每一个人都拥有其余人须要的资源,同时又等待其余人已经拥有的资源,而且每一个人在得到所需资源以前都不会放弃已经拥有的资源。这就是一种死锁。app

再使用线程的术语描述一下。在线程A持有锁L并想得到锁M的同时,线程B持有锁M并尝试获取锁L,那么这两个线程将永远地等待下去。框架

简单死锁代码示例

public class LeftRightDeadLock {
    private final Object left = new Object();
    private final Object right = new Object();
    
    public void leftRight(){
        synchronized (left){
            synchronized (right){
                doSomething();
            }
        }
    }
    
    public void rightLeft(){
        synchronized (right){
            synchronized (left){
                doSomething();
            }
        }
    }
}

上面的代码中,若是一个线程执行leftRight()方法,另外一个线程调用rightLeft()方法,则会发生死锁。

上面生产死锁的缘由是,两个线程视图以不一样的顺序来得到相同的锁。若是按照相同的顺序请求锁,那么就不会出现循环的加锁依赖性,所以就不会产生死锁。

产生死锁的四个条件

有个叫Coffman的牛人帮咱们总结了产生死锁的四个条件:

  1. 互斥,共享资源X和Y只能被一个线程占用
  2. 占用且等待,线程T1已经得到了共享资源X,在等待共享资源Y的时候,不释放共享资源X;
  3. 不可抢占,其余线程不能强行抢占线程T1占用的资源;
  4. 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。

反过来讲,咱们只要破坏掉四个条件中的一个,就能够避免死锁的发生。

首先第一个互斥条件无法破坏,由于加锁就是互斥的语义。

  1. 对于“占用且等待”的条件,咱们能够一次性申请全部资源;
  2. 对于“不可抢占”这个条件,占用部分资源的线程在申请其余资源时,若是申请不到,能够主动释放它占有的资源。
  3. 对于“循环等待”这个条件,能够按照固定的顺序申请资源,全部线程都按照规定的顺序得到锁,这样就不存在循环等待了。

活锁

活锁是另外一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行下去,由于线程将不断重复执行相同的操做,并且总会失败。

当多个相互协做的线程都对彼此进行响应从而修改各自的状态,并使得任何一个线程都没法继续执行时,就发生了活锁。就好比路上两我的相遇,出于礼貌,都给对方让路,结果每次都碰到一块儿。

要解决这种活锁问题,须要在重试机制中加入随机性。好比,在网络上,两台机器使用相同的载波来发送数据包,那么这些数据包就会发生冲突。这两台机器都检查到了冲突,并都在稍后再次重发。若是两者都选择了在1秒后重试,那么又会发生冲突,而且不断地冲突下去,于是即便有大量闲置的带宽,也没法将数据包发送出去。为避免这种状况的发生,须要让他们分别等待一段随机的时间,这样就能避免活锁的发生了。

饥饿

“饥饿”就是当线程因为没法访问它所须要的资源而不能继续执行时的场景。所谓“不患寡而患不均”。当某些线程一直获取不到CPU执行资源的时候,就发生了“饥饿”。

一些容易致使饥饿的场景:

  1. 在应用中对Java线程优先级的使用不当。(由于JVM会将Thread API中的10个优先级映射到操做系统的调度优先级上,这就可能存在两个不一样的优先级被映射到了操做系统层的同一个优先级,所以尽可能不要改变线程优先级)
  2. 持有锁的线程,若是执行的时间过程或者存在无限循环,也可能致使“饥饿”问题。

解决“饥饿”问题的通常方案就是使用公平锁(注意synchronized术语非公平锁)。

JUC工具类库

Java并发包给咱们提供了很是丰富的构建并发程序的基础模块,例如线程安全容器类、同步工具类,阻塞队列等。

这些工具类都在java.util.concurrent包下面,因此简称J.U.C工具包。

同步容器和并发容器

同步容器类有哪些

主要有Vector和Hashtable,这两个都是早期JDK的一部分,还有一些封装器类是由Collections.sychronizedXxx等工厂方法建立的。
这些类实现线程安全的方式都是:将他们的状态封装起来,并对每一个公有方法都进行同步,也就是使用synchronized内置锁的方式,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

同步容器类虽然是线程安全的类,可是在某些场景下可能须要额外的客户端加锁来保护复合操做的线程安全性。好比迭代(反复访问元素,直到遍历完容器中的全部元素)、条件运算(若是没有则添加)。下面给出一个示例说明下:

public class GetLastElement implements Runnable{
    private List<Object> list;

    public GetLastElement(List<Object> list) {
        this.list = list;
    }

    public Object getLast(){
        int index = list.size() - 1;
        return list.get(index);
    }

    public void run() {
        getLast();
    }
}

public class RemoveLastElement implements Runnable{
    private List<Object> list;

    public RemoveLastElement(List<Object> list) {
        this.list = list;
    }

    public void deleteLast(){
        int index = list.size() - 1;
        list.remove(index);
    }

    public void run() {
        deleteLast();
        System.out.println(list);
    }

    public static void main(String[] args) {
        List<Object> list = new Vector<Object>();
        list.add(123);
        list.add("hello");

        Thread thread1 = new Thread(new GetLastElement(list));
        Thread thread2 = new Thread(new RemoveLastElement(list));
        thread1.start();
        thread2.start();
    }
}

上面的代码中,可能会出现,线程1在获取get(2)的时候,2位置上的元素已经被线程2给remove(2)掉了。此时线程1会抛出异常,但这并非咱们指望获得的结果。

所以为了保证复合操做的原子性,就须要在get和remove方法上加锁。像下面这样:

public Object getLast(){
        synchronized (list){
            int index = list.size() - 1;
            return list.get(index);
        }        
    }

    public void deleteLast(){
        synchronized (list){
            int index = list.size() - 1;
            list.remove(index);
        }
    }

这样就保证了在调用size()和get()方法之间,不会有机会让另一个线程进来remove掉元素了。

迭代器和ConcurrentModificationException

Java中的集合类库都是实现了Iterator迭代器接口。不管是使用标准的迭代仍是for-each循还语法中,对容器类进行迭代遍历的方式都是Iterator。然而,若是有其余线程并发地修改容器,那么即便是使用迭代器也没法避免在迭代期间对容器加锁。

快速失败(fail-fast)机制

所谓fail-fast,其实就是一种警示机制,就是当它们发现容器在迭代过程当中被修改是,就会抛出一个ConcurrentModificationException异常。

好比下面这段代码:

List<String> list = new Vector<String>();
        list.add("java");
        for(String str : list){
            list.remove(str);
        }

运行时就会抛出ConcurrentModificationException异常,事实上咱们换成ArrayLlist也同样存在这种fail-fast机制。

Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
	at java.util.Vector$Itr.next(Vector.java:1137)
	at thread.syncontainer.FailFast.main(FailFast.java:10)

另一点,同步容器类将全部对容器状态的访问都串行化,以此来实现线程安全性,但这种方法严重下降了并发性,当多个线程竞争容器的锁时,吞吐量将严重下降。

并发容器

Java5提供了多种并发容器用来改进同步容器的性能问题。

  • 增长了ConcurrentHashMap,用来代替同步且基于散列的Map;
  • CopyOnWriteArrayList,用于在遍历操做为主要操做的状况下代替同步的List。
  • 新增了Queue和BlockingQueue;
  • Java6又引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分别做为同步的SortedMap和SortedSet的并发替代品。

ConcurrentHashMap

与HashMap同样,ConcurrentHashMap也是一个基于散列的Map,可是它使用了一种彻底不一样的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并非将每一个方法都在同一把锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制成为分段锁。在这种机制中,多个读线程能够并发访问Map,读线程和写线程也能够并发地访问Map。而且必定数量的写入线程能够并发地修改Map。ConcurrentHashMap带来的结果就是,在并发访问环境下实现更高的吞吐量。

同时ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException异常。另一点是实现了ConcurrentMap,该接口提供了一些诸如“如没有则添加”,“若相等则移除”,“若相等则替换”等复合操做,而且能保证是原子操做。

CopyOnWriteArrayList

“写入时复制(copy-on-write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象那个时就再也不须要进一步的同步。在每次修改时,都会建立并从新发布一个新的容器副本,从而实现可变性。

阻塞队列

阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。若是队列已经满了,那么put方法将阻塞知道有空间可用;若是队列为空,那么take方法将会阻塞直到有元素可用。队列可使有界的,也能够是无界的,无界队列永远都不会充满,所以无界队列上的put方法也永远不会阻塞。

Java类库中阻塞队列BlockingQueue是一个接口,它有多种实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO(先进先出)队列,两者分别和LinkedList和ArrayList相似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当你但愿按照某种顺序而不是FIFO来处理元素时,可使用这个队列。

基于阻塞队列,能够很是容易地实现生产者-消费者模型。当数据生成时,生产者只须要把数据放入阻塞队列中,而消费者从队列中拿数据消费就能够了。相比wait/notify实现起来要简单许多。

CountDownLatch

CountDownLatch是一种闭锁的实现。那什么是闭锁呢?闭锁就是一个同步工具类,能够延迟线程的进度直到其到达终止状态。闭锁就至关于一扇门:在闭锁到达结束状态以前,这扇门一直是关闭的,而且没有任何线程能经过,当到达结束状态时(通常就是计数器为0),这扇门会打开并容许全部线程经过。

你能够向CountDownLatch对象设置一个初始的计数值,任何在这个对象上调用await()方法都将阻塞,知道这个计数值为0。而其余任务(或者说线程)在结束其工做时,能够在该对象上调用countDown()方法来减少这个计数器值。

另外须要注意的一点是,CountDownLatch被设计为只触发一次,意思就是计数器减到0以后不会再重置了。

看个图能更清楚点:

在这里插入图片描述

能够看到Thread-0在CountDownLatch实体对象上调用了await()方法后阻塞,直到其余两个线程的运行将计数器count的值减为0,Thread-0线程才继续执行。

CountDownLatch的典型用法是将一个任务分红N个独立的子任务,N个子任务分别由N个线程执行,建立一个初始值为N的CountDownLatch闭锁,主线程调用await()方法阻塞等待子任务的执行结果,子线程在执行完任务是调用countDown递减计数器值。

public class TaskPortion implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private final CountDownLatch countDownLatch;

    public TaskPortion(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        doWork();
        countDownLatch.countDown();
    }

    public void doWork(){
        System.out.println(this + "完成");
    }

    @Override
    public String toString() {
        return "TaskPortion{" +
                "id=" + id +
                '}';
    }
}

public class MainTask implements Runnable{
    private CountDownLatch countDownLatch;

    public MainTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        try {
            countDownLatch.await();
            System.out.println("主任务完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for(int i = 0; i < 10; i++){
            exec.execute(new TaskPortion(countDownLatch));
        }
        exec.execute(new MainTask(countDownLatch));
        exec.shutdown();
    }
}

运行结果:

TaskPortion{id=1}完成
TaskPortion{id=3}完成
TaskPortion{id=0}完成
TaskPortion{id=5}完成
TaskPortion{id=9}完成
TaskPortion{id=4}完成
TaskPortion{id=8}完成
TaskPortion{id=6}完成
TaskPortion{id=2}完成
TaskPortion{id=7}完成
主任务完成

能够看到当全部子任务都完成了,主任务才能完成。

CyclicBarrier

栅栏(Barrier)相似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏和闭锁的关键区别在于,全部线程必须同时到达栅栏位置,才能继续执行。

CyclicBarrier是栅栏的一种实现,并且从名字中Cycilc能够判断是能够循环利用的栅栏(Barrier)。它适合这种一种场景:你但愿建立一组任务,它们并行地执行工做,而后在进行下一个步骤前等待,直至全部任务都完成。它使得全部的并行任务都将在栅栏处列队,所以能够一致地向前移动。

下面引用《Java编程思想》中的赛马游戏中的例子作为示例:

public class Horse implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(7);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b){ barrier = b; }
    public synchronized int getStrides(){  return strides; }

    public void run() {
        try {
            while (!Thread.interrupted()){
                synchronized (this){
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public String toString(){ return "Horse " + id + " "; }

    public String tracks(){
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++ ){
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}

public class HorseRace {
    static final int FINISH_LINE = 15;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;

    public HorseRace(int nHorses, final int pause){
        barrier = new CyclicBarrier(nHorses, new Runnable() {
            public void run() {
                StringBuilder s = new StringBuilder();
                for(int i = 0; i< FINISH_LINE; i++){
                    s.append("=");
                }
                System.out.println(s);

                for(Horse horse : horses){
                    System.out.println(horse.tracks());
                }
                for(Horse horse : horses){
                    if(horse.getStrides() >= FINISH_LINE){
                        System.out.println(horse + "won!");
                        exec.shutdownNow();
                        return;
                    }
                }

                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        for (int i = 0; i < nHorses; i++){
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }

    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        new HorseRace(nHorses,pause);
    }
}

这个程序挺有意思,这里我把马的匹数调小到3匹,FINISH_LINE 的值调小一些,运行结果以下:

===============
*0
1
**2
===============
**0
*1
***2
===============
****0
*1
****2
===============
****0
*1
****2
===============
****0
**1
*****2
===============
****0
**1
******2
===============
*****0
**1
******2
===============
*******0
***1
******2
===============
*********0
*****1
******2
===============
***********0
*****1
*******2
===============
***********0
******1
*******2
===============
***********0
******1
*********2
===============
***********0
******1
**********2
===============
*************0
********1
************2
===============
*************0
********1
*************2
===============
*************0
*********1
**************2
===============
*************0
*********1
***************2
Horse 2 won!

CyclicBarrier的构造器中有两个参数,第一个是并行任务数量,第二个是一个Runnable类型的栅栏任务,就是当咱们的计数器减为0的时候,会自动执行这个栅栏任务。

这里的计数器递减的动做不是咱们作的(代码中咱们只调用了CyclicBarrier的await()方法),而是CyclicBarrier自动帮咱们作了,在CyclicBarrier中有个count域,经过看下面注释咱们知道当全部并行的线程都到达栅栏处了,就会减为0,而后在新一轮开始时又会被重置。

/**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
     */
    private int count;

Semaphore

正常的锁(Lock显示锁或Jvm内建锁synchronized)在任什么时候刻都只容许一个任务访问一项资源,而计数信号量(Semaphore)容许n个任务同时访问这个资源。

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量能够经过构造函数指定。在执行操做时能够首先得到许可(只要还有剩余的许可),并在使用之后释放许可。若是没有许可,那么acquire()方法将阻塞直到有许可(或者直到被中断或操做超时)。release()方法将返回一个许可给信号量。

Semaphore通常能够用来实现“对象池”,它管理着数量有限的对象,当要使用对象时能够签出它们,而在用户使用完毕时,能够将它们签回。

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<T>();
    private volatile boolean [] checkedOut;
    private Semaphore available;
    public Pool(Class<T> classObject, int size){
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(size,true);
        for (int i = 0; i < size; i++){
            try {
                items.add(classObject.newInstance());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
    }
    public void checkIn(T x){
        if(releaseItem(x)){
            available.release();
        }
    }
    
    private synchronized T getItem(){
        for (int i = 0; i < size; i++){
            if(!checkedOut[i]){
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }
    
    private synchronized boolean releaseItem(T item){
        int index = items.indexOf(item);
        if(index == -1) return false;
        if(checkedOut[index]){ 
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

线程池

不要在建立野生线程了

像下面这样建立的线程,咱们叫作“野生线程”,做为测试demo没有问题,若是要上生产的系统,最好不要这么作。

Thread t = new Thread(r)//r是runnable对象

由于这样去建立线程,很容易形成无限制的线程建立,并且线程管理起来很是不便。

咱们知道建立线程的开销是比较高的,所以线程须要复用,须要池化管理。

Executor框架

Executor框架实际上是Java类库提供给咱们的任务管理框架,线程池是其中的一部分。

在Java类库中,任务执行的主要抽象不是Thread,而是Executor。Executor是一个接口,以下:

public interface Executor {
    void execute(Runnable command);
}

Executor它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现类还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视机制。

将任务的提交和执行解耦开来以后,就能够灵活地为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的方方面面,这包括:

  • 在什么线程中执行任务?
  • 任务按照什么顺序执行(FIFO、LIFO、优先级)?
  • 有多少个任务能并发执行?
  • 在队列中有多少个任务在等待执行?
  • 若是系统因为过载而须要拒绝一个任务,那么该选择哪个任务?另外,如何通知应用程序有任务被拒绝?
  • 在执行一个任务以前或以后,应该进行哪些动做?

经过Executors工具类中的静态工厂方法建立线程池

首先要知道Executors是一个工具类,是帮助咱们建立一些经常使用的线程池的,这些线程池提供了一些默认的配置。

好比咱们能够经过Executors的静态工厂方法建立下面几种经常使用线程池:

newFixedThreadPool

建立一个固定长度的线程池,每当提交一个任务时就建立一个线程,直到达到线程池的最大数量,这时线程池的规模将再也不变化。

newCachedThreadPool

建立一个能够缓存的线程池,若是线程池的当前规模超过了吹需求时,那么将会回收空闲的线程,而当需求增长时,则能够添加新的线程,线程池的规模不存在任何限制。

newSingleThreadExecutor

是一个单线程的Executor,它建立单个工做线程来执行任务,若是这个线程异常结束,会建立另外一个线程来代替。

ExecutorService接口提供生命周期管理方法

ExecutorService是一个接口,继承自Executor,扩展了Executor接口的功能,提供了服务的生命周期管理的一些方法:

public interface ExecutorService extends Executor {

    /**
     * 关闭执行器, 主要有如下特色:
     * 1. 已经提交给该执行器的任务将会继续执行, 可是再也不接受新任务的提交;
     * 2. 若是执行器已经关闭了, 则再次调用没有反作用.
     */
    void shutdown();

    /**
     * 当即关闭执行器, 主要有如下特色:
     * 1. 尝试中止全部正在执行的任务, 没法保证可以中止成功, 但会尽力尝试(例如, 经过 Thread.interrupt中断任务, 可是不响应中断的任务可能没法终止);
     * 2. 暂停处理已经提交但未执行的任务;
     *
     * @return 返回已经提交但未执行的任务列表
     */
    List<Runnable> shutdownNow();

    /**
     * 若是该执行器已经关闭, 则返回true.
     */
    boolean isShutdown();

    /**
     * 判断执行器是否已经【终止】.
     * <p>
     * 仅当执行器已关闭且全部任务都已经执行完成, 才返回true.
     * 注意: 除非首先调用 shutdown 或 shutdownNow, 不然该方法永远返回false.
     */
    boolean isTerminated();

    /**
     * 阻塞调用线程, 等待执行器到达【终止】状态.
     *
     * @return {@code true} 若是执行器最终到达终止状态, 则返回true; 不然返回false
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 提交一个具备返回值的任务用于执行.
     * 注意: Future的get方法在成功完成时将会返回task的返回值.
     *
     * @param task 待提交的任务
     * @param <T>  任务的返回值类型
     * @return 返回该任务的Future对象
     * @throws RejectedExecutionException 若是任务没法安排执行
     * @throws NullPointerException       if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个 Runnable 任务用于执行.
     * 注意: Future的get方法在成功完成时将会返回给定的结果(入参时指定).
     *
     * @param task   待提交的任务
     * @param result 返回的结果
     * @param <T>    返回的结果类型
     * @return 返回该任务的Future对象
     * @throws RejectedExecutionException 若是任务没法安排执行
     * @throws NullPointerException       if the task is null
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个 Runnable 任务用于执行.
     * 注意: Future的get方法在成功完成时将会返回null.
     *
     * @param task 待提交的任务
     * @return 返回该任务的Future对象
     * @throws RejectedExecutionException 若是任务没法安排执行
     * @throws NullPointerException       if the task is null
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定集合中的全部任务, 当全部任务都执行完成后, 返回保持任务状态和结果的 Future 列表.
     * <p>
     * 注意: 该方法为同步方法. 返回列表中的全部元素的Future.isDone() 为 true.
     *
     * @param tasks 任务集合
     * @param <T>   任务的返回结果类型
     * @return 任务的Future对象列表,列表顺序与集合中的迭代器所生成的顺序相同,
     * @throws InterruptedException       若是等待时发生中断, 会将全部未完成的任务取消.
     * @throws NullPointerException       任一任务为 null
     * @throws RejectedExecutionException 若是任一任务没法安排执行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    /**
     * 执行给定集合中的全部任务, 当全部任务都执行完成后或超时期满时(不管哪一个首先发生), 返回保持任务状态和结果的 Future 列表.
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 执行给定集合中的任务, 只有其中某个任务率先成功完成(未抛出异常), 则返回其结果.
     * 一旦正常或异常返回后, 则取消还没有完成的任务.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    /**
     * 执行给定集合中的任务, 若是在给定的超时期满前, 某个任务已成功完成(未抛出异常), 则返回其结果.
     * 一旦正常或异常返回后, 则取消还没有完成的任务.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

详解线程池ThreadPoolExecutor

计算线程池的大小

线程池的理想大小取决于被提交的任务类型以及所部署系统的特性。在代码中一般不会硬编码写死一个值,而应该是经过某种配置机制提供。

要避免过大或者太小。设置过大,那么大量的线程将在相对不多的CPU和内存资源上发生竞争,这不只会致使更高的内存使用量,并且还可能耗尽资源。若是设置太小,那么将致使许多空闲的处理器没法执行工做,形成资源浪费,从而下降吞吐率。

《并发编程实践》中给我提供了一个经验公式:

要是处理器达到指望的使用率,线程池的最优大小等于:

  Nthreads = Ncpu * Ucpu * (1 + W/C),其中

  Ncpu = CPU核心数

  Ucpu = CPU使用率,0~1

  W/C = 等待时间与计算时间的比率

能够经过Runtime来得到CPU的数目:

int N_CPUS = Runtime.getRuntime().availableProcessors();

配置ThreadPoolExecutor

前面咱们经过Executors工具类中的工厂方法建立的newFixedThreadPool、newCachedThreadPool这些线程池,其池返回的就是ThreadPoolExecutor对象。

在这里插入图片描述

当默认的执行策略不能知足需求时,咱们就能够经过ThreadPoolExecutor来定制本身的线程池。

咱们来看下ThreadPoolExecutor的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

总共有4个重载的构造函数,咱们选择参数最多的进行分析,其余几个都调用了上面这个。

先看下参数:

corePoolSize:表示线程池的基本大小,也叫核心线程数,而且当工做队列满了的时候,若是在有任务提交上来,会建立超过这个数量的线程。

maximumPoolSize:最大线程数,表示该线程池最多能够建立这么多线程。

keepAliveTime:空闲线程的存活时间,若是某个线程的空闲时间超过了存活时间,将会被回收。

unit:存活时间的时间单位。

workQueue:阻塞队列,提交上来的任务若是没有可用的线程执行时,会被放入该队列中等待执行。

threadFactory:线程工厂,用于指定如何建立一个线程,好比能够指定一个有意义的名字,指定一个UncaughtExceptionHandler等。

RejectedExecutionHandler :拒绝策略或者饱和策略处理器。当有界队列被填满后,该如何处理提交上来的任务。JDK提供了四种RejectedExecutionHandler的实现。AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。

AbortPolicy策略是默认的饱和策略,该策略会抛出未检查的RejectedExecutionExcetion异常。调用者能够捕获异常进行相应地处理。

DiscardPolicy策略会抛弃任务,DiscardOldestPolicy会抛弃下一个将要被回字形的任务,而后尝试从新提交新的任务。

CallerRunsPolicy不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而下降新任务的流量。

下图描述了线程池的整体工做流程:

在这里插入图片描述

JAVA内存模型

Java内存模型是个很复杂的规范,这里简单介绍下。

Java内存模型规定了全部的变量都存储在主内存中,每条线程还有本身的工做内存,线程的工做内存中保存了该线程中是用到的变量的主内存副本拷贝,线程对变量的全部操做都必须在工做内存中进行,而不能直接读写主内存。不一样的线程之间也没法直接访问对方工做内存中的变量,线程间变量的传递均须要本身的工做内存和主存之间进行数据同步进行。

JMM是一种规范,目的是解决因为多线程经过共享内存进行通讯时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。目的是保证并发编程场景中的原子性、可见性和有序性。

happen-before规则

happen-before规则是JMM中限制指令重排序的一些规则。意思是前面一个操做的结果对后续操做是可见的。这里总结6条happen-before规则:

一、程序的顺序性规则。

意思就是程序前面对某个变量的修改必定是对后续操做可见的。

public void read(){
        int x = 20;
        boolean b = true;
    }

x = 20 happen-before b = true,这也符合咱们的直觉。

二、volatile变量规则

对volatile白你两的写操做 happen-before 于对该变量的读操做。这也是volatile类型的变量能保证可见性的缘由。

三、传递性规则

若是A happen-before B,B happen-before C。则A happen-before C。

四、监视器锁规则

监视器锁指的就是synchronized。对一个锁的解锁 happen-before 于后续对这个锁的加锁。

public void read(){
        int x = 20;
        boolean b = true;
        synchronized (this){// 这里会自动加锁
            x = 10;
        }//执行结束,自动释放锁
    }

这条规则说的就是,若是线程A执行完释放锁后将x改成10了,线程B在得到锁进入临界区后是能够看到线程A对x的写操做的,也就是B能看到x=10。

五、线程启动规则

它是指主线程A启动子线程B后,子线程B可以看到主线程在启动子线程B前的操做。

int x = 10;
        Thread B = new Thread(()->{
            // 此处是能够看到主线程修改的x=30的
        });
        x = 30;
        B.start();

六、线程join规则

这条是关于线程等待的。它是指主线程A等待子线程B完成(主线程A经过调用子线程B的join()方法实现),当子线程B完成后(主线程A中join()方法返回),主线程可以看到子线程的操做。

int x = 10;
        Thread B = new Thread(()->{
            // 对共享变量的修改
            x = 15;
        });
        B.start();
        B.join();
        
        // 主线程在调用了join以后是能
        // 看到x = 15的

后记

CAS原理,AQS和显式锁ReentrantLock原理性的东西后期单独总结。

相关文章
相关标签/搜索