从0学习java并发编程实战-读书笔记-基础构建模块(4)

同步容器类

同步容器类包括Vector和Hashtable,这两个是早期JDK的一部分。此外还包括在JDK1.2中添加Collections.synchronizedXxx等工厂方法,这些类实现线程安全的方式是:将它们的状态封装起来,并对公有方法进行同步,使得每次只有一个线程能访问容器的状态。java

同步容器类的问题

同步容器类都是线程安全的,可是某些复合操做须要额外的客户端加锁来保护,常见的复合操做:算法

  • 迭代:反复访问元素,直到遍历完容器中的全部元素
  • 跳转:根据指定顺序找到当前元素的下一个元素
  • 条件运算:例如“若没有就添加”

因为同步容器类要遵照同步策略,即支持客户端加锁,所以可能会创造一些新的操做,只要咱们知道应该使用哪一锁,那么这些新操做就与其余操做同样都是原子操做。数组

for (int i = 0; i < vector.size();i++){
    doSomething(vector.get(i))
 }

一个线程访问,一个线程删除,在这两个线程交替执行的时候,代码就可能出现问题,将抛出ArrayIndexOutOfBoundsException。
解决方式是将怎么for循环加锁,锁为vector对象。安全

迭代器与ConcurrentModificationException

不管是直接迭代仍是jdk5引入的for-each语法,对容器的标准访问方式就是使用Iterator(迭代器),可是Iterator并无考虑到并发修改的问题,迭代器使用的策略是快速失败(fail-fast)。当迭代器发现容器在迭代过程被修改了,就会抛出一个ConcurrentModificationException异常。
这种快速失败机制并不算是完备的处理机制,只是捕获了可能会出现并发错误,只能做为一个并发问题预警指示器。要想避免ConcurrentModificationException异常,就必须在迭代过程持有容器的锁。多线程

隐藏迭代器

在某些状况下,迭代器会隐藏起来。例如`javaSystem.out.println("iterm:"+set);,编译器将字符串的链接操做转换为调用StringBuilder.append(Object),而这个方法将会调用容器的toString方法,标准容器的toString方法将迭代容器。并发

正如封装对象的状态有助于维持不变性条件同样,封装对象的同步机制一样有助于确保实施同步策略。

并发容器

同步容器经过对全部容器状态的访问都串行化,以实现它们的线程安全性。固然这种办法的代价是严重下降并发性,当多个线程竞争容器的锁时,吞吐量将严重下降。app

经过并发容器来代替同步容器,能够极大的提升伸缩性并下降风险。
  • JDK5中增长了ConcurrentHashMap,用来代替同步且基于散列的Map。
  • 以及CopyOnWriteArrayList,用于在遍历操做为主要操做的状况下代替同步的List。
  • 在新的ConCurrentMap接口中增长了一些经常使用的复合条件,如“若没有就添加”,替换,以及有条件删除等。
  • JDK5中增长了两种新容器,QueueBlockingQueue,用来临时保存一组等待处理的元素。它提供了几种实现,包括:函数

    • ConcurrentLinkedQueue,这是一个传统的先进先出队列。
    • PriorityQueue,这是一个(非并发的)优先队列。
  • Queue上的操做不会阻塞,若是队列为空,那么获取元素的操做将会返回空值。
  • 能够用List来模拟Queue的行为(Queue自己就是由LinkedList实现的),可是仍是须要一个Queue类,由于它能去掉List的随机访问需求,实现更加高效的并发。
  • BlockingQueue拓展了Queue,增长了可阻塞的插入和获取等操做。工具

    • 若是队列为空,那么获取元素的操做将一直阻塞。
    • 若是队列已满,那么插入元素的操做将一直阻塞,直到队列中出现可用的空间。
  • JDK6引入了:性能

    • ConcurrentSkipListMap:同步的SortedMap的并发替代品
    • ConcurrentSkipListSet:同步的SortedSet的并发替代品

ConcurrentHashMap

同步容器类在执行每一个操做期间都持有一个锁。与HashMap同样,ConcurrentHashMap也是一个基于散列的Map,使用了一种不一样的策略来提供更高的并发性和伸缩性:

  • ConcurrentHashMap并非将每一个方法都在同一个锁上同步并使得同时只能由一个线程访问容器,而是利用分段锁(Locking Striping)作更细粒度的加锁机制来实现更大程度的共享。这样的好处是,在多线程并发访问下将实现更高的吞吐量,而单线程环境只损失很是小的性能。
  • ConcurrentHashMap与其余容器加强了同步容器类:他们提供的迭代器不会抛出ConcurrentModificationException异常,ConcurrentHashMap返回的迭代器具备弱一致性(Weakly Consistent)弱一致性的迭代器能够容忍并发的修改,当建立迭代器的时候会遍历已有的元素,能够(并不保证)在迭代器在被构造后将修改操做反映给容器。
与Hashtable和synconizedMap相比,ConcurrentHashMap有着更多的优点和更少的劣势,在大多数并发状况下,用ConcurrentHashMap来代替同步Map能提升代码的可伸缩性,只有当须要加锁Map进行访问时,才应该放弃使用concurrentHashMap。

额外的原子Map操做

因为ConcurrentMap并非经过持有锁来控制对象的独占访问,因此咱们没法靠加锁新建原子操做。可是常见的如:“若没有则添加”,“若相等则移除”,“若相等则替换” 等复合操做都已经实现(具体能够看接口描述)。

CopyOnWriteArraryList

CopyOnWriteArraryList 用于替代同步List,在某些状况下它能提供更好的并发性能,在迭代期间并不须要对容器进行复制或者加锁。

  • CopyOnWrite(写入时复制)容器的线程安全性在于,只要正确发布一个事实不变的对象,那么在访问该对象的时候就不须要进一步的同步。
  • 在每次修改的时候,都会建立并从新发布一个新的容器副本,从而实现可变性。 容器的迭代器保留一个指向底层数组的引用,这个数组当且位于迭代器的起始位置,因为每次都会建立新对象,因此和读进程互不干扰。
  • 因为每次修改都须要进行复制,因此不适合须要常常修改或者容器规模很大的状况。

阻塞队列和生产者-消费者模式

阻塞队列提供了:

  • 可阻塞的puttake方法:若是队列满了,那么put队列将阻塞至有空间可用;若是队列为空,take方法将会阻塞至有元素可用。
  • 定时的offerpoll方法

队列能够是有界的也能够是无界的,无界队列永远也不会充满,因此put方法永远也不会阻塞。

阻塞队列支持生产者-消费者模式,把 找出须要完成的工做执行工做这两个过程分开,并将工做放入一个 待完成列表,以便后续处理,而不是找出当即处理。
  • 生产者-消费者模式能简化开发过程,它消除了生产者和消费者直接的代码依赖性,该模式还将生产数据的过程与使用数据的过程解耦来简化工做负载的管理,由于这两个过程在处理数据的速率不一样。
  • 若是生产者生成工做的速率比消费者处理工做的速度快,那么工做就会在队列中累计起来,直到耗尽内存。put的阻塞特性极大的简化了生产者的编码。若是使用有界队列,那么当队列充满时,生产者将阻塞而且不能继续生成工做,而消费者就有时间来遇上工做处理进度。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工做项,使应用程序在负荷过载的状况下变得更加健壮。

类库中的BlockingQueue实现

  • LinkedBlockingQueueArrayBlockingQueueFirst In,First Out(FIFO)队列,分别和LinkedList和ArrayList类似。但比同步List有更好的并发性能。
  • PriorityBlockingQueue是一个优先队列,能够经过元素的天然顺序比较,也可使用Comparator方法。
  • SynchronousQueue,实际上并不算是一个真正的队列,由于它不会为队列中的元素维护存储空间。与其余队列不一样的是,它维护一组线程,这些线程在等着把元素加入或者移出队列。这种实现队列的方式看起来很奇怪,等于直接将工做交付给消费者线程,从而下降了将数据从生产者移动到消费者的延迟(避免串行入列和出列)。而且一旦工做被交付,生产者能够当即获得反馈。

串行线程封闭

  • 对于可变对象,生产者-消费者这种设计与阻塞队列一块儿,促进了串行线程封闭,从而将对象全部权从生产者交付给消费者。
  • 线程封闭对象只能由单个线程拥有,可是能够经过安全发布该对象来“转移”全部权,而且转移后,发布对象的线程不会再访问它,对象被封闭在新的线程里。
  • 对象池利用了串行线程封闭,将对象“借给”一个请求线程,重要对象池保护足够的内部同步来安全发布池中的对象,而且客户代码自己不会发布池中的对象,且在将对象返回给对象池以后不会再使用它,那么就能够安全地在线程之间传递全部权。

双端队列与工做密取

jdk6增长了两种容器类型:

  • Deque:对Queue进行了拓展
  • BlockingDeque:对BlockingQueue进行了拓展
Deque是一个 双端队列,实现了在队列头和队列尾的高效插入和移除,具体实现有ArrayDeque和LinkedBlockingDeque。
正如阻塞队列适用于生产者-消费者模式,双端队列适用于工做密取模式。
在生产者-消费者模式中,全部消费者都有一个共享的工做队列。而工做密取中,每一个消费者都有一个本身的双端队列,若是一个消费者完成了本身双端队列中的所有工做,那么它能够从其余消费者队列的尾部来秘密获取工做(G1中的处理DCQ就是使用工做密取模式)。

工做密取比生产者消费者模式具备更好的伸缩性,消费者基本不会在单个共享的任务队列上发生竞争。当一个消费者线程要访问另外一个队列时,是从尾部而不是头部获取,进一步下降了队列的竞争程度。

阻塞方法与中断方法

线程可能会阻塞或暂停执行,缘由有多种:

  • 等待I/O操做结束
  • 等待得到一个锁
  • 等待从Thread.sleep方法中醒来
  • 等待另外一个线程的计算结果

当线程阻塞时,它一般被挂起,并处于某种阻塞状态:

  • BLOCK
  • WAITING
  • TIMED_WAITING

被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待的I/O操做已经完成,锁可用了等等。当某个外部事件发生时,线程被置回RUNNABLE状态,而且能够执行调度。

Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断,每一个线程都有个boolean来表示线程的中断状态。

中断是一种协做机制,一个线程不能强制其余线程中止正在执行的操做而去执行其余操做。
当线程A中断B,A仅仅是要求B在执行到某个能够暂停的地方中止正在执行的操做(前提是若是B愿意停下)。最常使用的中断的状况就是取消某个操做。
当在代码中调用了一个将抛出InterruptedException异常的方法时,你的方法就变成了阻塞方法,而且必须处理中断的响应:

  • 传递InterruptedExcption:避开这个异常一般是最好的选择,只须要将这个异常传递给调用者,并不捕获或者恢复该异常,而后在执行某种简单的清理工做后再次抛出这个异常。
  • 恢复中断:有时候不能抛出这个异常,例如代码是Runnable的一部分时,必须捕获该异常,并经过当前线程上的interrupt方法恢复中断。

同步工具类

闭锁

闭锁是一种同步工具类,能够延迟线程的进度直到其到达终止状态。闭锁能够用来确保某些活动直到其余活动都完成后才继续执行。
CountDownLatch是一种灵活的闭锁实现,它可使一个或多个线程等待一组事件发生。闭锁状态有一个计数器,这个计数器被初始化为一个正数,表示等待的事件数量。countDown方法递减计数器,表示已经有一个事件发生了,而await方法会一直阻塞,直到计数器为0,或者等待的线程中断或者超时。
public class TestHarness {
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(){
              public void run(){
                  try {
                      startGate.await();
                      try{
                          task.run();
                      }finally {
                          endGate.countDown();
                      }
                  }catch (InterruptedException ignored){}
              }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;

    }
}
  • 第一个闭锁:startGate,确保全部的线程都准备就绪后才开始执行任务。
  • 第二个闭锁:endGate,等到全部线程都执行完成后计算消耗的时间。

FutureTask

FutureTask也能够用作为闭锁。FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是经过Callable来实现的。
FutureTask能够处于如下三种状态:
  • 等待运行(Waiting to run)
  • 正在运行 (Running)
  • 运行完成(Completed)

当FutureTask进入完成状态后,它会永远停在这个状态上。

Future.get行为取决于任务的状态:

  • 任务完成:get当即返回结果
  • 任务还未完成:get将阻塞直到任务进入完成状态,而后返回结果或是抛出异常

FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而且FutureTask可以保证这种传递过程能实现结果的安全发布

FutureTask类实现了 RunnableFuture接口, RunnableFuture继承了 Runnable接口和Future接口,因此它既能够做为Runnable被线程执行,又能够做为Future获得Callable的返回值。 事实上,FutureTask是Future接口的一个惟一实现类。

Callable表示的任务能够抛出受检查的或未受检查的异常,而且任何代码均可能抛出一个Error。不管任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中从新抛出。因此当ExcutionException时,多是如下三种状况之一:

  • Callable抛出的受检查异常
  • RuntimeException
  • Error

信号量

计数信号量(Counting Semaphore):用来控制访问某个特定资源的操做数量,或者同时执行某个制定操做的数量。计数信号量还能够用来实现某些资源池,或者对容器加边界。

Semaphore中管理一组许可,许可的初始数量能够经过构造函数来指定。在执行操做的时候,首先得到许可,在使用之后释放许可。若是没有许可,acquire将阻塞直到有许可(或者被中断或超时)。

public class BoundedHashSet<T> {

    private final Set<T> set;

    private final Semaphore semaphore;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T t) throws InterruptedException {
        semaphore.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(t);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            semaphore.release();
        }
        return wasRemoved;
    }
}

这里实现了一个有界阻塞Set容器,信号量的计数值初始化为容器的最大值。每次添加元素以前,要得到一个许可,若是add失败,并无成功添加上元素,就释放许可。删除成功也释放许可,底层的set实现并不知道关于边界的信息,都是由BoundedHashSet来处理的。

栅栏(Barrier)

闭锁和栅栏的区别

  • 闭锁是一次性对象,一旦进入终止状态,就不能被重制
  • 栅栏能阻塞一组线程直到某个事件发生,全部线程必须同时到达栅栏位置,才能继续执行。

闭锁用于等待事件,而栅栏用来等待其余线程。

CyclicBarrier可使必定数量的参与方反复地在栅栏位置聚集,它在并行迭代算法中很是有用:

  • 一般能将一个问题拆分红若干个相互独立的子问题。
  • 当线程到达了栅栏时将调用await方法,这个方法将阻塞直到全部线程都到达栅栏位置。若是全部线程都到达了栅栏位置,那么将释放全部线程,重制栅栏以备下次使用。
  • 若是对await调用超时,或者await阻塞的线程被中断,那么认为栅栏被破坏,全部阻塞的await调用都将终止并抛出BrokenBarrierException.
  • 若是成功经过栅栏,那么await将为每一个线程都返回一个惟一的到达索引号,利用这些索引来选举产生一个新的领导线程,并在下一次迭代中由这个领导线程执行新的工做。
  • CyclicBarrier还能够将一个栅栏操做传递给构造函数,这是一个Runnable,当成功经过栅栏,会从其中一个子任务线程(领导线程)中执行它,可是在阻塞线程被释放以前是不会执行的。
相关文章
相关标签/搜索