Java 基础(十五)并发工具包 concurrent

本文目录:

  • java.util.concurrent - Java 并发包简介
  • 阻塞队列 BlockingQueue
  • 数组阻塞队列 ArrayBlockingQueue
  • 延迟队列 DelayQueue
  • 链阻塞队列 LinkedBlockingQueue
  • 具备优先级的阻塞队列 PriorityBlockingQueue
  • 同步队列 SynchronousQueue
  • 阻塞双端队列 BlockingDeque
  • 链阻塞双端队列 LinkedBlockingDeque
  • 并发 Map ConcurrentMap
  • 并发导航映射 ConcurrentNavigableMap
  • 闭锁 ConutDownLatch
  • 栅栏 CyclicBarrier
  • 交换机 Exchanger
  • 信号量 Semaphore
  • 执行器服务 ExecutorService
  • 线程池执行者 ThreadPoolExecutor
  • 定时执行者服务 ScheduledExecutorService
  • 使用 ForkJoinPool 进行分叉和合并
  • 锁 Lock
  • 读写锁 ReadWriteLock
  • 原子性布尔 AtomicBoolean
  • 原子性整型 AtomicInteger
  • 原子性长整型 AtomicLong
  • 原子性引用型 AtomicReference

本章内容比较多,我本身也是边学边总结,因此拖到今天才出炉。另外,建议学习本文的小伙伴在学习的过程当中,把代码 copy 出去run 一下,有助于理解。html

1.java.util.concurrent Java 并发工具包

这是 Java5 添加的一个并发工具包。这个包包含了一系列可以让 Java 的并发编程变得更加简单轻松的类。在这以前,你须要本身手动去实现相关的工具类。java

本文将和你们一块儿学习 java.util.concurrent包里的这些类,学完以后咱们能够尝试如何在项目中使用它们。算法

2.阻塞队列 BlockingQueue

java.util.concurrent 包里面的 BlockingQueue 接口表示一个线程安放如和提取实例的队列。编程

这里咱们不会讨论在 Java 中实现一个你本身的 BlockingQueue。api

BlockingQueue 用法

BlockingQueue 一般用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:数组

一个线程往里边放,另一个线程从里边取的一个 BlockingQueue.png
一个线程往里边放,另一个线程从里边取的一个 BlockingQueue.png

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。若是该阻塞队列到达了其临界点,负责生产的线程将会在往里面插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。负责消费的线程将会一直从该阻塞队列中拿出对象。若是消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。浏览器

BlockingQueue 的方法

BlockingQueue 具备4组不一样的方法用于插入、移除以及对队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下:bash

~ 抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o,timeout,timeUnit)
移除 remove(o) poll(o) take(o) poll(timeout,timeunit)
检查 element(o) peek(o) ~ ~

四组不一样的行为方式解释:多线程

1.抛异常:若是试图的操做没法当即执行,抛一个异常
2.特定值:若是试图的操做没法当即执行,返回一个特定的值(通常是 true/false)
3.阻塞:若是试图的操做没法当即执行,该方法将会发生阻塞,直到能执行
4.超时:若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行,但等待时间不会超过给定值。返回一个特定的值以告知该操做是否成功。并发

没法向一个 BlockingQueue 中插入 null。若是你试图插入 null,BlockingQueue 会抛出一个 NullPointerException。

能够访问到 BlockingQueue 中的全部元素,而不只仅是开始和结束的元素。好比说你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉,那么你能够调用诸如remove(o)方法啦将队列中的特定对象进行移除。可是这么干相率并不高,所以尽可能不要用这一类方法,除非无可奈何。

BlockingQueue 的实现

BlockingQueue 是个借口,你能够经过它的实现之一来使用 BlockingQueue。concurrent 包里面有以下几个类实现了 BlockingQueue:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

Java 中使用 BlockingQueue 的例子

这是一个 Java 钟使用 BlockingQueue 的示例,本示例使用的是 BlockingQueue 借口的 ArrayBlockingQueue 实现。
首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 Consumer 。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {
    private BlockingQueue<String> queue = null;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException ignored) {
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue queue = null;

    Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}复制代码

这个例子很简单呐,我就不加文字描述了。

3.数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。

ArrayBlockingQueue 是衣蛾有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不可以存储无限多数量的原色。它有一个同一时间存储元素数量的上线。你能够在对其初始化的时候设定这个上限,但以后就没法对这个上限进行修改了。

ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在全部元素之中是放入时间最久的那个,而尾元素则是最短的那个。

如下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);
try {
    queue.put("1");
    Object object = queue.take();
} catch (InterruptedException e) {
    e.printStackTrace();
}复制代码

4.延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口
DelayQueue 对元素进行持有知道一个特定的延迟到期。注入其中的元素必须实现 concurrent.Delay 接口,该接口定义:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit var1);
}复制代码

DelayQueue 将会在每一个元素的 getDelay()方法返回的值的时间段以后才释放掉该元素。若是返回的是 0 或者负值,延迟将被认为过时,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。

传递给 getDelay 方法的 getDelay 实例是一个枚举型,它代表了将要延迟的时间段。

TimeUnit 枚举的取值单位都能顾名思义,这里就带过了。

上面咱们能够看到 Delayed 接口继承了 Comparable 接口,这也就意味着 Delayed 对象之间能够进行对比。这个可能在对 DelayeQueue 队列中的元素进行排序时有用,所以它们能够根据过时时间进行有序释放。

如下是使用 DelayQueue 的例子:

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedElement> queue = new DelayQueue<>();
        DelayedElement element1 = new DelayedElement(1000);
        DelayedElement element2 = new DelayedElement(0);
        DelayedElement element3 = new DelayedElement(400);
        queue.put(element1);
        queue.put(element2);
        queue.put(element3);
        DelayedElement e = queue.take();
        System.out.println("e1:" + e.delayTime);
        DelayedElement e2 = queue.take();
        System.out.println("e2:" + e2.delayTime);
        DelayedElement e3 = queue.take();
        System.out.println("e3:" + e3.delayTime);
    }
}

class DelayedElement implements Delayed {
    long delayTime;
    long tamp;

    DelayedElement(long delay) {
        delayTime = delay;
        tamp = delay + System.currentTimeMillis();
    }

    @Override
    public long getDelay(@NonNull TimeUnit unit) {
        return tamp - System.currentTimeMillis();
//        return -1;
    }

    @Override
    public int compareTo(@NonNull Delayed o) {
        return tamp - ((DelayedElement) o).tamp > 0 ? 1 : -1;
    }
}复制代码

运行结果:

e1:0
e2:400
e3:1000复制代码

在 take 取出 e2的时候,会阻塞。
compareTo 决定队列中的取出顺序
getDelay 决定是否能取出元素,若是没法取出则阻塞线程。

具体玩法,你们能够自行思考,我看了一下别人用DelayQueue,能玩出不少花样,在某些特定的需求很方便。

5.链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类也实现了 BlockingQueue接口。
LinkedBlockingQueue 内部以一个链式结构对其元素进行存储。若是须要的话,这一链式结构能够选择一个上线。若是没有定义上线,将使用 Ingeter.MAX_VALUE 做为上线。

LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在全部元素之中是放入时间最久的那个。

使用方式同 ArrayBlockingQueue。

6.具备优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类也实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和 PriorityQueue 同样的排序规则。你没法向这个队列中插入 null 值。PriorityQueue 的代码分析在集合中讲了,感兴趣的小伙伴能够回头去阅读。

全部插入到 PriorityBlockingQueue 的元素必须实现 Comparable 接口或者在构造方法中传入Comparator。

注意:PriorityBlockingQueue 对于具备相等优先级的元素并不强制任何特定的行为。

同时注意:若是你从一个 PriorityBlockingQueue 得到一个 Iterator 的话,该 Iterator并不能保证它对元素的遍历是按照优先顺序的。原理在以前的文章中分析过~

使用方法同上。

7.同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只能容纳单个元素。若是该队列已有一个元素的话,试图向队列中插入一个新元素的线程将会阻塞,知道另外一个线程将该元素从队列中抽走。一样,若是该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另外一个线程向队列中插入了一条新的元素。
据此,把这个类称做一个队列显然是夸大其词,它更多像是一个汇合点。

使用方法和 ArrayBlockingQueue 同样吧,区别就是 SynchronousQueue 只能保存一个元素。

能够理解成这个,哈哈哈new ArrayBlockingQueue<>(1);

8.阻塞双端队列 BlockingDeque

BlockingDeque 接口在 concurrent 包里,表示一个线程安放入和提取实例的双端队列。

BlockingDeque 类是一个双端队列,在不可以插入元素时,它将阻塞住试图插入元素的线程;在不可以抽取元素时,它将阻塞住试图抽取的线程。

deque 是“Double Ended Queue”的缩写。所以,双端队列是一个你能够从任意一段插入或者抽取元素的队列。

BlockingDeque 的使用

在线程既是生产者又是这个队列的消费者的时候能够用到 BlockingDeque。若是生产者线程须要在队列的两端均可以插入数据,消费者线程须要在队列的两端均可以移除数据,这时候也能够用 BlockingDeque。BlockingDeque 图解:

一个线程生产元素,并把它们插入到队列的任意一段。若是双端队列已满,插入线程将被阻塞,知道一个移除线程从队列中移除了一个元素。

BlockingDeque 的方法

BlockingDeque 具备4组不一样的方法用于插入、移除以及对双端队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下:

~ 抛异常 特定值 阻塞 超时
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o,timeout,timeUnit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout,timeunit)
检查 getFirst(o) peekFirst(o) ~ ~
~ 抛异常 特定值 阻塞 超时
插入 addLast(o) offerLast(o) putLast(o) offerLast(o,timeout,timeUnit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout,timeunit)
检查 getLast(o) peekLast(o) ~ ~

1.抛异常:若是试图的操做没法当即执行,抛一个异常
2.特定值:若是试图的操做没法当即执行,返回一个特定的值(通常是 true/false)
3.阻塞:若是试图的操做没法当即执行,该方法将会发生阻塞,直到能执行
4.超时:若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行,但等待时间不会超过给定值。返回一个特定的值以告知该操做是否成功。

这一段文字有没有感受特别眼熟,hahah~其实它和 BlockingQueue 同样。

BlockingDeque 继承自 BlockingQueue

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你能够像使用一个 BlockingQueue 那样使用 BlockingDeque。若是你这么干的话,各类插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端元素移除。正如 BlockingQueue 接口的插入和移除方法同样。

BlockingDeque 的实现

既然 BlockingDeque 是一个接口,那么确定有实现类,它的实现类很少,就一个:

  • LinkedBlockingDeque

BlockingDeque 代码示例

这个真没什么好说的。。。

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();复制代码

9.链阻塞双端队列LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。

不想写描述了,跳过了昂~

10.并发 Map(映射)ConcurrentMap

ConcurrentMap 接口表示了一个可以对别人的访问(插入和提取)进行并发处理的 Map。
ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法以外还有一些额外的原子性方法。

ConcurrentMap 的实现

concurrent 包里面就一个类实现了 ConcurrentMap 接口

  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 HashTable 类很类似,但 ConcurrentHashMap 能提供比 HashTable 更好的并发性能。在你从中读取对象的时候,ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map,它的内部只是把 Map 中正在被写入的部分锁定。
其实就是把 synchronized 同步整个方法改成了同步方法里面的部分代码。

另一个不一样点是,在被遍历的时候,即便是 ConcurrentHashMap 被改动,它也不会抛 ConcurrentModificationException。尽管 Iterator 的设计不是为多个线程同时使用。

使用例子:

public class ConcurrentHashMapExample {

    public static void main(String[] args) {
//        HashMap<String, String> map = new HashMap<>();
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("1", "a");
        map.put("2", "b");
        map.put("3", "c");
        map.put("4", "d");
        map.put("5", "e");
        map.put("6", "f");
        map.put("7", "g");
        map.put("8", "h");
        new Thread1(map).start();
        new Thread2(map).start();

    }

}

class Thread1 extends Thread {

    private final Map map;

    Thread1(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.remove("6");
    }
}

class Thread2 extends Thread {

    private final Map map;

    Thread2(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        Set set = map.keySet();
        for (Object next : set) {
            System.out.println(next + ":" + map.get(next));
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}复制代码

打印结果:

1:a
2:b
3:c
4:d
5:e
7:g
8:h复制代码

思考题:用这个 Map 行不行?Map map = Collections.synchronizedMap(new HashMap());

哈哈哈~答案很简单,思考一下就好了。

11.并发导航映射 ConcurrentNaviagbleMap

ConcurrentNavigableMap 是一个支持并发的 NavigableMap,它还能让他的子 Map 具有并发访问的能力,所谓的“子 map”指的是诸如 headMap(),subMap(),tailMap()之类的方法返回的 map.

NavigableMap

NavigableMap 这个接口以前集合中遗漏了。
这里稍微补充一下吧,首先继承关系是 NavigableMap继承自 SortedMap 继承自 Map。

SortedMap从名字就能够看出,在Map的基础上增长了排序的功能。它要求key与key之间是能够相互比较的,从而达到排序的目的。

而NavigableMap是继承于SortedMap,目前只有TreeMap和ConcurrentNavigableMap两种实现方式。它本质上添加了搜索选项到接口,主要为红黑树服务。先来了解下它新增的几个方法

主要方法

  • lowerEntry(K key)返回小于 key 的最大值的节点
  • lowerKey(K key)返回小于 key 的最大值节点的 key
  • floorEntry(K key)返回小于等于 key 的最大值节点
  • floorKey(K key)返回小于等于 key 的最大值节点 key
  • ceilingEntry(K key)返回大于等于 key 的最小节点
  • ceilingkey(K key)返回大于等于 key 的最小节点的 key
  • higherEntry(K key)返回大于 key 的最小节点
  • higherKey(K key)返回大于 key 的最小节点 key
  • firstEntry()返回最小key 节点
  • lastEntry()返回最大 key 节点
  • descendingMap()获取反序的 map
  • navigableKeySet()获取升序迭代器
  • decendingKeySet()获取降序的迭代器
  • subMap(K from,K to)截取 map
  • headMap(K toKey)截取小于等于 toKey 的 map
  • tailMao(K fromKey)截取大于等于 key 的 map

额,讲完了。。。。。就不举🌰了吧~

12.闭锁 CountDownLatch

java.util.concurrent.CountDownLatch 是一个并发构造,它容许一个或多个线程等待一系列指定操做的完成。

CountDownLatch 以一个给定的数量初始化。countDown()每被调用一次,这一数量就建议。经过调用 await()方法之一,线程能够阻塞等待这一数量到达零。

下面是一个简单的示例,Decrementer 三次调用 countDown()以后,等待中的 Waiter 才会从 await()调用中释放出来。

public class CountDownLatchExample {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);

        Waiter waiter = new Waiter(latch);
        Decrementer decrementer = new Decrementer(latch);

        new Thread(waiter).start();
        new Thread(decrementer).start();

    }

}

class Waiter implements Runnable {

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

class Decrementer implements Runnable {

    CountDownLatch latch = null;

    Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}复制代码

运行结果

Waiter Released复制代码

嗯,用法大概就是酱紫。我再给你们举个实际的例子吧~

有时候会有这样的需求,多个线程同时工做,而后其中几个能够随意并发执行,但有一个线程须要等其余线程工做结束后,才能开始。举个例子,开启多个线程分块下载一个大文件,每一个线程只下载固定的一截,最后由另一个线程来拼接全部的分段,那么这时候咱们能够考虑使用CountDownLatch来控制并发。

13.栅栏 CyclicBarrier

CyclicBarrier 类是一种同步机制,它能对处理一些算法的线程实现同步。换句话说,它就是一个全部线程必须等待的一个栅栏,直到全部线程都到达这里,而后全部线程才能够继续作其余事情。

这个文字很好理解吧,没理解的把上面这段话再读一遍。

图示以下:

两个线程在栅栏旁等待对方
两个线程在栅栏旁等待对方

经过调用 CuclicBarrier 对象的 await()方法,两个线程能够实现互相等待。一旦 N 个线程在等待 CyclicBarrier 达成,全部线程将被释放掉去继续执行。

建立一个 CyclicBarrier

在建立一个 CyclicBarrier 的时候你须要定义有多少线程在被释放以前等待栅栏。建立 CyclicBarrier 示例:

CyclicBarrier barrier = new CyclicBarrier(2);复制代码

等待一个 CyclicBarrier

如下演示了如何让一个线程等待一个 CyclicBarrier:
barrier.await();
固然,你也能够为等待线程设定一个超时时间。等待超过了超时时间以后,即使尚未达成 N 个线程等待 CyclicBarrier 的条件,该线程也会被释放出来。如下是定义超时时间示例:
barrier.await(10,TimeUnit.SECONDS);

固然,知足如下条件也可让等待 CyclicBarrier 的线程释放:

  • 最后一个线程也到达 CyclicBarrier(调用 await()方法)
  • 当前线程被其余线程打断(其余线程调用了这个线程的 interrupt()方法)
  • 其余等待栅栏的线程被打断
  • 其余等待栅栏的线程因超时而被释放
  • 外部线程调用了栅栏的 CyclicBarrier.reset()方法

CyclicBarrier 行动

CyclicBarrier 支持一个栅栏行动,栅栏行动是一个 Runnable 实例,一旦最后等待栅栏的线程抵达,该实例将被执行。你能够在 CyclicBarrier 的构造方法中将 Runnable 栅栏行动传给它:

CyclicBarrier barrier = new CyclicBarrier(2,barrierAction);

CyclicBarrier 示例代码

public class CyclicBarrierExample {

    public static void main(String[] args) {
        Runnable barrier1Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 1 executed ");
            }
        };
        Runnable barrier2Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 2 executed ");
            }
        };

        CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
        CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

        CyclicBarrierRunnable barrierRunnable1 =
                new CyclicBarrierRunnable(barrier1, barrier2);

        new Thread(barrierRunnable1).start();
        new Thread(barrierRunnable1).start();


    }

}

class CyclicBarrierRunnable implements Runnable {

    CyclicBarrier barrier1 = null;
    CyclicBarrier barrier2 = null;

    CyclicBarrierRunnable(
            CyclicBarrier barrier1,
            CyclicBarrier barrier2) {

        this.barrier1 = barrier1;
        this.barrier2 = barrier2;
    }

    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 1");
            this.barrier1.await();

            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 2");
            this.barrier2.await();

            System.out.println(Thread.currentThread().getName() +
                    " done!");

        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}复制代码

思考一下程序的运行结果~

Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed 
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed 
Thread-0 done!
Thread-1 done!复制代码

14.交换机 Exchanger

Exchanger 类表示一种两个线程能够进行互相交换对象的会和点。这种机制图以下:

两个线程经过一个 Exchanger 交换对象
两个线程经过一个 Exchanger 交换对象

交换对象的动做由Exchanger 的两个 exchange()方法中的其中一个完成。如下是一个示例:

public class ExchangerExample {

    public static void main(String[]args){
        Exchanger exchanger = new Exchanger();

        ExchangerRunnable exchangerRunnable1 =
                new ExchangerRunnable(exchanger, "Thread-0数据");

        ExchangerRunnable exchangerRunnable2 =
                new ExchangerRunnable(exchanger, "Thread-1数据");

        new Thread(exchangerRunnable1).start();
        new Thread(exchangerRunnable2).start();

    }

}
 class ExchangerRunnable implements Runnable{

    Exchanger exchanger = null;
    Object    object    = null;

    ExchangerRunnable(Exchanger exchanger, Object object) {
        this.exchanger = exchanger;
        this.object = object;
    }

    public void run() {
        try {
            Object previous = this.object;

            this.object = exchanger.exchange(this.object);

            System.out.println(
                    Thread.currentThread().getName() +
                            " exchanged " + previous + " for " + this.object
            );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}复制代码

输出结果:

Thread-1 exchanged Thread-1数据 for Thread-0数据
Thread-0 exchanged Thread-0数据 for Thread-1数据复制代码

当一个线程到达exchange调用点时,若是它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,而后各自返回。

在常见的 生产者-消费者 模型中用于同步数据。

15.信号量 Semaphore

Semaphore类是一个计数信号量。这就意味着它具有两个主要方法:

  • acquire()得到
  • release()释放

计数信号量由一个指定数量的“许可”初始化。每调用一次 acquire(),一个许可会被调用线程取走。没调用一次 release(),一个许可会被还给信号量。所以,在没有任何 release()调用时,最多有 N 个线程可以经过 acquire()方法,N 是该信号量初始化时的许可的指定数量。这些许可只是一个简单的计数器。没有啥奇特的地方。

Semaphore 用途

信号量主要有两种用途:

1.保护一个重要(代码)部分防止一次超过 N 个线程进入
2.在两个线程之间发送信号

Semaphore 用法

若是你将信号量用于保护一个重要部分,试图进入这一部分的代码一般会首先尝试得到一个许可,而后才能进入重要部分(代码块),执行完以后,再把许可释放掉:

Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
//重要部分代码块
semaphore.release();复制代码

在线程之间发生信号

若是你将一个信号量用于在两个线程之间发送信号,一般你应该用一个线程调用 acquire()方法,而另外一个线程调用 release()方法。

若是没有可用的许可,acquire()调用将会阻塞,知道一个许可被另外一个线程释放出来。同理,若是没法往信号量释放更多许可时,一个 release()方法调用也会阻塞。

经过这个能够对多个线程进行协调。好比,若是线程1将一个对象插入到了一个共享列表(list)以后调用了 acquire(),而线程2则从该列表中获取一个对象以前调用了release(),这时你其实已经建立了一个阻塞队列。信号量中可用的许可的数量也就等同于该则是队列可以持有的元素个数。

公平

没有办法保证线程可以公平地从信号量中得到许可。也就是说,没法担保第一个调用 acquire()的线程会是第一个得到许可的线程。若是第一个线程在等待一个许可时发生阻塞,而第二个线程来索要一个许可的时候恰好有一个许可被释放出来,那么它就可能在第一个线程以前得到许可。
若是须要强制公平,Semaphore 类有一个具备一个布尔类型的参数的构造子,经过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,因此除非你确实须要它,不然不要启动它。

如下是如何在公平模式建立一个 Semaphore 的示例:

Semaphore semaphore = new Semaphore(1,ture);

更多方法

  • acquire()获取一个许可
  • availablePermits()返回当前可用许可数
  • drainPermits()获取并返回当即可用的全部许可
  • getQueueThreads()返回一个集合,包含可能等待获取的数量
  • hasQueueThreads()返回正在等待获取的线程的估计数目
  • isFair()若是此信号量的公平设置为 true,则返回 true
  • reducePermits(int)根据指定的缩减量减少可用许可的数量
  • relaese()释放一个许可

具体参考 JDK 文档吧

使用案例

public class SemaphoreExample {

    public static void main(String[]args){
        Semaphore semaphore = new Semaphore(3);

        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();

    }

}

 class ThreadSemaphore implements Runnable{

    private final Semaphore semaphore;

     ThreadSemaphore(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"获取到锁进来");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();

    }
}复制代码

打印结果是先打印三条线程,1秒后再打印两条线程。

脑补一下,咱们当年抢第一代红米手机的时候,抢购界面是否是一直在排队,是否是用信号量能够实现呢,10w 台手机,同时只容许1000个用户购买(Semaphore 的许可为1000个),而后运气好忽然排队进去了(有人购买成功,释放了许可,运气好分配给了你),而后红米抢购到手。。。

16.执行器服务 ExecutorService

ExecutorService 接口表示一个异步执行机制,使咱们可以在后台执行任务。所以一个 ExecutorService 很相似一个线程池。实际上,存在于 concurrent 包里的 ExecutorService 实现就是一个线程池实现。

ExecutorService 例子

如下是一个简单的 ExecutorService 例子:

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
executorService.shutdown();复制代码

首先使用 newFixedThreadPool 工厂方法建立一个 ExecutorService。这里建立了一个是个线程执行任务的线程池。
而后,将一个 Runnable 接口的匿名实现类传给 execute()方法。这将致使 ExecutorService 中的某个线程执行该 Runnable。

任务委派

下图说明了一个线程是如歌将一个任务委托给一个 ExecutorService 去异步执行的:

一个线程将一个任务委派给一个 ExecutorService 去异步执行
一个线程将一个任务委派给一个 ExecutorService 去异步执行

一旦该线程将任务委派给 ExecutorService,该线程将继续它本身的执行,独立于该任务的执行。

ExecutorService 实现

既然 ExecutorService 是个接口,若是你想用它的话,还得去使用它的实现类。

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

建立一个 ExecutorService

ExecutorService 的建立依赖于你使用的具体实现。可是你也可使用 Executors 工厂类来建立 ExecutorService 实例。如下是几个建立 ExecutorService 实例的例子:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);复制代码

ExecutorService 使用

有几种不一样的方式来将任务委托给 ExecutorService 去执行:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(Collection)
  • invokeAll(Collection)

接下来咱们来一个一个看这些方法

  • execute(Runnable)

execute 方法要求一个 Runnable 对象,而后对它进行异步执行。如下是使用 ExecutorService 执行一个 Runnable 的示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Asynchronous task");
    }
});
System.out.println("Asynchronous task");
executorService.shutdown();复制代码

没有办法得知被执行的 Runnable 的执行结果。若是须要的话,得使用 Callable

  • submit(Runnable)

sunmit 方法也要求一个 Runnable 实现类,但它返回一个 Future 对象。这个 Future 对象能够用来检查 Runnable 是否已经执行完毕。

如下是 ExecutorService submit 示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
future.get(); //returns null if the task has finished correctly.复制代码
  • submit(Callable)
    submit(Callable)方法相似于 submit(Runnable)方法,除了它所要求的参数类型以外。Callable 实例除了它的 call()方法可以返回一个结果以外和一个 Runnable 很像。Runnable.run()不能返回一个结果。

Callable 的结果能够经过 submit(Callable)方法返回的 Future 对象进行获取。如下是一个 ExecutorService Callable 示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Object> future = executorService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
        }
    });
System.out.println("future.get() = " + future.get());复制代码

以上代码输出:

Asynchronous Callable
future.get() = Callable Result复制代码

注意:future.get()会阻塞线程直到 Callable 执行结束。你能够把这个当成是一个有返回值的线程。

  • invokeAny()

invokeAny()方法要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。没法保证返回的是哪一个 Callable 的结果,只能代表其中一个已经执行结束。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
String result = executorService.invokeAny(set);
System.out.println("result = " + result);
executorService.shutdown();复制代码

执行结果就不看了,自行测试吧

ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
List<Future<String>> list = executorService.invokeAll(set);
for (Future<String> future : list)
    System.out.println("result = " + future.get());
executorService.shutdown();复制代码

执行结果自行测试。。。

ExecutorService 关闭

使用完 ExecutorService 以后,应该将其关闭,以使其中的线程再也不容许。
好比,若是你的应用是经过一个 main 方法启动的,以后 main 方法退出了你的应用,若是你的应用有一个活动的 ExecutorService,它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。
要终止 ExecutorService 里的线程,你须要调用 ExecutorService 的 shutdown 方法。

ExecutorService 并不会当即关闭,但它将再也不接受新的任务,并且一旦全部线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown 被调用以前全部提交给ExecutorService 的任务都被执行。
若是你想当即关闭 ExecutorService,你能够调用 shutdownNow 方法,这样会当即尝试中止全部执行中的任务,并忽略掉那些已提交但还没有开始处理的任务。没法保证执行任务的正确执行。可能它们被中止了,也可能已经执行结束。

17.线程池执行者 ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 接口的一个实现。
ThreadPoolExecutor 使用其内部池中的线程执行给定任务(Callable 或者 Runnable)。ThreadPoolExecutor 包含的线程池可以包含不一样数量的线程,池中线程的数量由如下变量决定:

  • corePoolSize
  • maximumPoolSize
    当一个任务委托给线程池时,若是池中线程数量低于 corePoolSize,一个新的线程将被建立,即便池中可能还没有有空闲线程。
    若是内部任务队列已满,并且有至少 corePoolSize 正在运行,可是运行线程的数量低于 maximumPoolSize,一个新的线程将被建立去执行该任务。

ThreadPoolExecutor 图解:

一个ThreadPoolExecutor
一个ThreadPoolExecutor

建立一个 ThreadPoolExecutor

ThreadPoolExecutor 有若干个可用构造方法。好比:

int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());复制代码

可是,除非你确实须要显式为 ThreadPoolExecutor 定义全部参数,使用 Executors 类中的额工厂方法之一会更加方便。

18.定时执行者服务 ScheduleExecutorService

ScheduleExecutorService 是一个 ExecutorService,它可以将任务延后执行,或者间隔固定时间屡次执行。任务由一个工做者线程异步执行,而不是由提交任务给 ScheduleExecutorService 的那个线程执行。

ScheduleExecutorService 例子

如下是一个简单的 ScheduleExecutorService 示例:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();复制代码

首先一个内置5个线程的 ScheduleExecutorService 被建立,以后一个 Callable 接口的匿名类示例被建立而后传递给 schedule()方法。后边的两参数定义了 Callable 将在5秒钟以后被执行。

ScheduleExecutorService 实现

既然是一个接口,要使用它的话就得使用 concurrent 包下的实现类

  • ScheduleThreadPoolExecutor

建立一个 ScheduleExecutorService

如何建立一个 ScheduleExecutorService,取决于你采用它的实现类。可是你也可使用 Executors 工厂类来建立一个 ScheduleExecutorService 实例。好比:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);复制代码

ScheduleExecutorService 使用

一旦你建立了一个 ScheduleExecutorService,你能够经过调用它的如下方法:

  • shcedule(Callable task,long delay,TimeUnit timeunit)
  • shcedule(Runnable task,long delay,TimeUnit timeunit)
  • shceduleAtFixedRate(Runnable task,long initialDelay,long period,TimeUtil timeutil)
  • shceduleWithFixedDelay(Runnable task,long initialDelay,long period,TimeUtil timeutil)

下面咱们就简单看一下这些方法。

  • schedule(Callable task,long delay,TimeUnit timeUnit)
    这个方法计划指定的 Callable 在给定的延迟以后执行。
    这个方法返回一个 ScheduleFuture,经过它你能够在它被执行前对它进行取消,或者在它执行以后获取结果。

如下是一个示例:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
ScheduledFuture<Object> schedule = scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
System.out.println(schedule.get());
scheduledExecutorService.shutdown();复制代码

输出结果:

Executed!
Called!复制代码
  • shcedule(Runnable task,long delay,TimeUnit timeUnit)

除了 Runnable 没法返回一个结果以外,这一方法工做起来就像一个 Callable 做为一个参数的那个版本的方法同样,所以 ScheduleFuture.get()在任务执行结束以后返回 null。

  • scheduleAtFixedRate(Runnable,long initialDelay,long period,TimeUnit tomeUnit)
    这一方法规划一个任务将被按期执行。该任务将会在某个 initialDelay 以后获得执行,而后每一个 period 时间以后重复执行。
    若是给的任务的执行抛出了异常,该任务将再也不执行。若是没有任何异常的话,这个任务将会持续循环执行到 ScheduleExecutorService 被关闭。
    若是一个任务占用了比计划的时间间隔更长的时候,下一次执行将在当前执行结束执行才开始。计划任务在同一时间不会有多个线程同时执行。

  • scheduleWithFixedDelay(Runnable r,long initalDelay,long period,TimeUnit timeUnit)

除了 period 有不一样的解释以外这个方法和 scheduleAtFixedRate()很是像。
scheduleAtFixedRate()方法中,period 被解释为前一个执行的开始和下一个执行的开始之间的间隔时间。
而在本方法中,period 则被解释为前一个执行的结束和下一个执行开始之间的间隔。

ShceduleExecutorService 关闭

正如 ExecutorService,在你使用结束以后,你须要吧 ScheduleExecutorService 关闭掉。不然他将致使 JVM 继续运行,即便全部其余线程已经所有被关闭。
你能够从 ExecutorService 接口继承来的 shutdown()或 shutdownNow()方法将 ScheduleExecutorService 关闭。

19.使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java7中被引入。它和 ExecutorService 很类似,除了一点不一样。
ForkJoinPool 让咱们能够很方便把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务能够继续分割成更小的子任务,只要它还能分割。可能听起来有点抽象,所以本节中咱们将会解释 ForkJoinPool 是如何工做的,还有任务分割是如何进行的。

分叉和合并解释

在咱们开始看 ForkJoinPool 以前,咱们先来简要解释一下分叉和合并的原理。
分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

  • 分叉

一个使用了分叉和合并原理的任务能够将本身分叉(分割)为更小的子任务,这些子任务能够被并发执行。以下图所示:

分叉
分叉

经过把本身分割成多个子任务,每一个子任务能够由不一样的 CPU 并发执行,或者被同一个 CPU 上的不一样线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一点的开销,所以对于小型任务,这个分割的消耗可能比每一个子任务并发执行的消耗还要大。

何时把一个任务分割成子任务是有意义的,这个界限也称做一个阈值。折腰看每一个任务对有意义阈值的决定。很大程度取决于它要作的工做的种类。

  • 合并

当一个任务将本身分割成若干子任务以后,该任务将进入等待全部子任务的结束之中。
一旦子任务执行结束,该任务能够把全部结果合并到同一结果。图示以下:

合并
合并

固然,并不是全部类型的任务都会返回一个结果。若是这个任务并不返还一个结果,它只需等待全部子线程执行完毕。也就不须要结果合并。

ForkJoinPool

ForkJoinPool 是一个特殊的线程池,她的设计是为了更好的配合 分叉-合并 任务分割的工做。ForkJoinPool 也在 concurrent 包中。

能够经过其构造方法建立一个 ForkJoinPool。 ForkJoinPool 构造函数的参数定义了 ForkJoinPool 的并行级别,并行级别表示分叉的线程或 CPU 数量。

建立示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

提交任务到 ForkJoinPool

就像提交任务到 ExecutorService那样,把任务提交到 ForkJoinPool。你能够提交两种类型的任务。一种是没有任何返回值的,另外一种是有返回值的。这两周任务分别有 RecursiveAction 和 RecursiveTask 表示。接下来介绍如何使用这两种类型的任务,以及如何对它们进行提交。

RecursiveAction

RecursiveAction 是一种没有返回值的任务。它只是作一些工做,好比写数据到磁盘,而后就退出了。
一个 RecursiveAction 能够把本身的工做分割成更小的几块,这样它们能够由独立的线程或者 CPU 执行。
你能够经过集成来实现一个 RecursiveAction。示例以下:

public class RecursiveActionExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(240);
        forkJoinPool.invoke(myRecursiveAction);

    }
}

class MyRecursiveAction extends RecursiveAction {
    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());
            for (RecursiveAction subtask : subtasks) {
                subtask.fork();
            }
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<>();
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);
        return subtasks;
    }
}复制代码

例子跟简单。MyRecursiveAction 将一个虚构的 workLoad 做为参数传给本身的构造方法。若是 wrokLoad 高于一个特定的阈值,该工做将分割为几个子工做,子工做继续分割。若是 workLoad 高于一个特定阈值,该工做将被分割为几个子工做,子工做继续分割。若是 workLoad 低于特定阈值,该工做将有 MyRecursiveAction 本身执行。

运行结果:

Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15复制代码

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它能够将本身的工做分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。能够有几个水平的分割和合并。如下是一个 RecursiveTask 示例:

public class RecursiveTaskExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveTask myRecursiveAction = new MyRecursiveTask(240);
        Object invoke = forkJoinPool.invoke(myRecursiveAction);
        System.out.println("mergedResult = " + invoke);
    }
}

class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveTask> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());

            for (MyRecursiveTask subtask : subtasks) {
                subtask.fork();
            }

            long result = 0;
            for (MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}复制代码

注意是如何经过 ForkJoinPool.invoke()方法的调用来获取最终执行结果的。

运行结果:

Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 60
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
mergedResult = 720复制代码

ForkJoinPool 评论

貌似并不是每一个人都对 Java7里面的 ForkJoinPool 满意,也就是说,这里面会有坑,在你计划在本身的项目里使用 ForkJoinPool 以前最好阅读一下这篇文章《一个 Java 分叉-合并 带来的灾祸》

haha...文章是英文版本的,能够用浏览器插件翻译,或者自行百度吧。

20.锁 Lock

Lock 是一个相似于 Synchronized 块的线程同步机制。可是 Lock 比 Synchronized 块更加灵活、精细。

Lock 例子

既然 Lock 是一个接口,在程序中总须要使用它的实现类之一来使用它。如下是一个简单示例:

Lock lock = new ReentrantLock();
lock.lock();
//同步代码
lock.unLock();复制代码

首先建立了一个 Lock 对象。以后调用了它的 lock()方法。这时候这个 lock 实例就被锁住啦。任何其余再过来调用 lock()方法的线程将会被锁阻塞住,直到锁定 lock 线程的实例的线程调用了 unlock()方法。最后 unlock()被调用了,lock 对象解锁了,其余线程能够对它进行锁定了。

Lock 实现

concurrent 包下 Lock 的实现类以下:

  • ReentrantLock

Lock 和 Synchronized 代码块的主要不一样点

  • Synchronized 代码块不可以保证进入访问等待的线程的前后顺序
  • 你不能传递任何参数给一个 Synchronized 代码块的入口。所以,对于 Synchronized 代码块的访问等待设置超时时间是不可能的事情。
  • Synchronized 块必须被完整地包含在单个方法里。而一个 Lock 对象能够把它的 lock()和 unLock()方法的调用放在不一样的方法里。

Lock 的方法

Lock 接口主要有如下几个方法

  • lock()
  • lockInterruptibly()
  • tryLock()
  • tryLock(long timeout,TimeUnit unit)
  • unLock()

lock()将 Lock 实例锁定。若是该 Lock 实例已被锁定,调用lock()方法的线程将会被阻塞,直到 Lock 实例解锁。
lockInterruptibly()方法将会被调用线程锁定,除非该线程将被打断。此外,若是一个线程在经过这个方法来锁定 Lock 对象时进入阻塞等待,而它被打断了的话,该线程将会退出这个方法调用。
tryLock()方法视图当即锁定 Lock 实例。若是锁定成功,它将返回 true,若是 Lock 实例已经被锁定,则返回 false。这一方法用不阻塞。
tryLock(long timeout,TimeUnit unit)的工做相似于 tryLock()方法,除了它在放弃锁定 Lock 以前等待一个给定的超时时间以外。
unlock()方法对 Lock 实例解锁。一个 Lock 实现将只容许锁定了该对象的线程来调用此方法。其余线程对 unlock()方法调用将会抛出异常。

21.读写锁 ReadWriteLock

读写锁是一种先进的线程锁机制。它可以容许多个线程在同一时间对某特定资源进行读取,但同一时间内只能有一个线程对其进行写入。
读写锁的理念在于多个线程可以对一个共享资源进行读取,而不会致使并发问题。并发问题的发生场景在于对一个共享资源的读和写操做的同时进行,或者多个读写操做并发进行。

ReadWrite Lock 锁规则

一个线程在对受保护资源在读或者写以前对 ReadWriteLock 锁定的规则以下:

  • 读锁:若是没有任何写操做线程锁定 ReadWriteLock,而且没有任何写操做线程要求一个写锁(但尚未得到该锁)。所以,能够有多个读操做线程对该锁进行锁定。
  • 写锁:若是没有任何读操做或者写操做。所以,在写操做的时候,只能有一个线程对该锁进行锁定。

ReadWriteLock 实现

ReadWriteLock 是个接口,若是你想使用它的话就得去使用它的实现类之一。concurrent 包提供了一个实现类:

  • ReentrantReadWriteLock

ReadWriteLock 代码示例
如下是 ReadWriteLock 的建立以及如何使用它进行读、写锁定的简单示例代码:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().locl();
//干点事情
readWriteLock.readLock().unlock();

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.writeLock().locl();
//干点事情
readWriteLock.writeLock().unlock();复制代码

注意如何使用 ReadWriteLock 对两种锁示例的持有。一个对读访问进行保护,一个对写访问进行保护。

固然,这里的“读写”你能够根据需求灵活变化。

22.原子性布尔 AtomicBoolean

AtomicBoolean 类为咱们提供了一个能够用原子方式进行读和谐的布尔值,它还拥有一些先进的原子性操做,好比 compareAndSet()。AtomicBoolean 类位于 concurrent.atomic 包。

建立一个 AtomicBoolean

你能够这样建立一个 AtomicBoolean。
AtomicBoolean atomicBoolean = new AtomicBoolean();
以上示例新建了一个默认值为 false 的 AtomicBoolean。
若是你想要为 AtomicBoolean 示例设置一个显示的初始值,那么你能够将初始值传给 AtomicBoolean 的构造参数。
AtomicBoolean atomicBoolean = new AtomicBoolean(true);

获取 AtomicBoolean 的值

你能够经过使用 get()方法来获取一个 AtomicBoolean 的值。示例以下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();复制代码

以上代码执行后 value 的值为 true。

设置 AtomicBoolean 的值

你能够经过 set() 方法来设置 AtomicBoolean 的值:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false);复制代码

交互 AtomicBoolean 的值

你能够经过 getAndSet()方法来交换一个 AtomicBoolean 实例的值。getAndSet()方法将返回 AtomicBoolean 当前的值,并将为 AtomicBoolean 设置一个新值,示例以下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);复制代码

以上代码执行后 oldValue 变量的值为 true,atomicBoolean 实例将持有 false 值,代码成功将 AtomicBoolean 当前值 true 交换为 false。

比较并设置 AtomicBoolean 的值

compareAndSet()方法容许你对 AtomicBoolean 的当前值与与一个指望值进行比较,若是当前值等于指望值的话,将会对 AtomicBoolean 设定一个新值。compareAndSet()方法是原子性质的,所以在同一时间以内有耽搁线程执行她。所以 compareAndSet()方法可被用于一些相似于锁的同步的简单实现。
如下是一个 compareAndSet()示例:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newVaule = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue,newValue);复制代码

本示例针对 AtomicBoolean 的当前值与 true 值进行比较,若是相等,将 AtomicBoolean 的值更新为 false。

有什么用?

可能有些小伙伴到这里仍是有点懵逼,这个原子布尔到底有什么用,给你们看一个示例代码:

class XxxService {

    private static AtomicBoolean initState = new AtomicBoolean(false);
    private static AtomicBoolean initFinish = new AtomicBoolean(false);
    private static XxxService instance;

    private XxxService() {
    }

    public static XxxService getInstance() {
        if (initState.compareAndSet(false, true)) {
            //TODO 写初始化代码
            initFinish.set(true);
        }
        while(!initFinish.get()){
            Thread.yield();
        }

        return instance;
    }
}复制代码

假如程序须要在多线程的状况下初始化一个类,而且保证只初始化一次,完美解决并发问题。

23.原子性整形 AtomicIngteger

同22,略

24.原子性长整型 AtomicBooleanLong

同22,略

25.原子性引用型 AtomicReference

AtomicReference 提供了一个能够被原子性读和写的对象引用变量。原子性的意思是多个想要改变同一个 AtomicReference 的线程不会致使 AtomicReference 处于不一致的状态。AtomicReference 还有一个 compareAndSet()方法,经过它你能够将当前引用于一个指望值(引用)进行比较,若是相等,在该 AtomicReference 对象内部设置一个新的引用。

建立一个 AtomicReference

建立 AtomicReference 以下:
AtomicReference atomicReference = new AtomicReference();
若是你须要使用一个指定引用建立 AtomicReference,能够:
String initialReference = "the initialyl reference string"; AtomicReference atomicReference = new AtomicReference(initialReference);

建立泛型 AtomicReference

你可使用 Java 泛型来建立一个泛型 AtomicReference。示例:
AtomicReference<String> atomicReference = new AtomicReference();
你也能够为泛型 AtomicReference 设置一个初始值。示例:
String initialReference = "the initialyl reference string"; AtomicReference<String> atomicReference = new AtomicReference<>(initialReference);

获取 AtomicReference 引用

你能够经过 AtomicReference 的 get()方法来获取保存在 AtomicReference 里的引用.

设置 AtomicReference 引用

AtomicReference.set(V newValue);

比较并设置 AtomicReference 引用

使用 compareAndSet()

和 volatile 关键字的区别

敲黑板!!!

Atomic 和 volatile的区别很简单,Atomic 保证读写操做同步,可是 volatile 只保证写的操做,并无保证读的操做同步。

具体原理牵涉到虚拟机的层次了,感兴趣的小伙伴可自行学习。

参考资料

本文主要参考了Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf, 点击可下载资源。

相关文章
相关标签/搜索