JDK高性能编程之多线程

多线程

j360-jdk调试功能html

https://github.com/xuminwlt/j360-jdkjava


如下内容部分选摘自互联网及前人总结,若有问题请指正,我会及时更正,谢谢!git

实现

 - 继承Thread
程序员

 - 实现Runnable接口github

 - 使用ExecutorService、Callable、Future实现有返回结果的多线程数据库

        - ExecutorService、Callable、Future这个对象实际上都是属于Executor框架中的功能类
编程

concurrent类图

一、继承Thread

public class MyThread extends Thread{

    @Override
    public void run(){
        System.out.println("MyThread.run()");
    }

}

MyThread myThread = new MyThread();
    myThread.start();


二、实现Runnable接口:若是本身的类已经extends另外一个类,就没法直接extends Thread,此时,必须实现一个Runnable接口

public class MyThreadRun implements Runnable{

    @Override
    public void run() {
        System.out.println("MyThreadRun.run()");
    }
}

MyThreadRun myThreadRun = new MyThreadRun();
    Thread thread = new Thread(myThreadRun);
    thread.start();

Runnable target参数给Thread后,Thread的run()方法就会调用target.run(),参考JDK源代码:
public void run() {
  if (target != null) {
   target.run();
  }
}
小程序

三、使用ExecutorService、Callable、Future实现有返回结果的多线程

在JDK1.5以前,Java中要进行业务并发时,一般须要有程序员独立完成代码实现,固然也有一些开源的框架提供了这些功能,可是这些依然没有JDK自带的功能使用起来方便。而当针对高质量Java多线程并发程序设计时,为防止死蹦等现象的出现,好比使用java以前的wait()、notify()和synchronized等,往往须要考虑性能、死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等诸多因素,每每会采用一些较为复杂的安全策略,加剧了程序员的开发负担.万幸的是,在JDK1.5出现以后,Sun大神(Doug Lea)终于为咱们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减小竞争条件(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为咱们提供了更实用的并发程序模型。数组

package me.j360.jdk.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Created with j360-jdk -> me.j360.jdk.thread.
 * User: min_xu
 * Date: 2015/10/17
 * Time: 12:22
 * 说明:
 */
public class ExecutorTest {

    public static void main(String[] args){

        System.out.println("----程序开始运行----");
        Date date1 = new Date();

        int taskSize = 5;
        // 建立一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(taskSize);
        // 建立多个有返回值的任务
        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < taskSize; i++) {
            Callable c = new MyCallable(i + " ");
            // 执行任务并获取Future对象
            Future f = pool.submit(c);
            //System.out.println(">>>" + f.get().toString());
            list.add(f);
        }
        // 关闭线程池
        pool.shutdown();

        for (Future f : list) {
            System.out.println(">>>" + f.isDone());
        }

        Date date2 = new Date();
        System.out.println("----程序结束运行----,程序运行时间【"
                + (date2.getTime() - date1.getTime()) + "毫秒】");
    }

}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum + "任务启动");
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum + "任务终止");
        return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】";
    }
}

可返回值的任务必须实现Callable接口,相似的,无返回值的任务必须Runnable接口。执行Callable任务后,能够获取一个Future的对象,在该对象上调用get就能够获取到Callable任务返回的Object了,再结合线程池接口ExecutorService就能够实现传说中有返回结果的多线程了。缓存

代码说明:


上述代码中Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads) 
建立固定数目线程的线程池。
public static ExecutorService newCachedThreadPool() 
建立一个可缓存的线程池,调用execute 将重用之前构造的线程(若是线程可用)。若是现有线程没有可用的,则建立一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor() 
建立一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
建立一个支持定时及周期性的任务执行的线程池,多数状况下可用来替代Timer类。

ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。若是Executor后台线程池尚未完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。


生命周期

ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。Executor建立时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不该该再想Executor中添加任务,全部已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。

若是Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

ExecutorService executorService = (ExecutorService) executor;
while (!executorService.isShutdown()) {
	try {
		executorService.execute(task);
	} catch (RejectedExecutionException ignored) {
		
	}
}
executorService.shutdown();


返回值

Future<V>表明一个异步执行的操做,经过get()方法能够得到操做的结果,若是异步操做尚未完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable表明一个有返回值得操做。

Callable<Integer> func = new Callable<Integer>(){
			public Integer call() throws Exception {
				System.out.println("inside callable");
				Thread.sleep(1000);
				return new Integer(8);
			}		
		};		
		FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);
		Thread newThread = new Thread(futureTask);
		newThread.start();
		
		try {
			System.out.println("blocking here");
			Integer result = futureTask.get();
			System.out.println(result);
		} catch (InterruptedException ignored) {
		} catch (ExecutionException ignored) {
		}


java concurrent 探秘[转]

咱们都知道,在JDK1.5以前,Java中要进行业务并发时,一般须要有程序员独立完成代码实现,固然也有一些开源的框架提供了这些功能,可是这些依然没有JDK自带的功能使用起来方便。而当针对高质量Java多线程并发程序设计时,为防止死蹦等现象的出现,好比使用java以前的wait()、notify()和synchronized等,往往须要考虑性能、死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等诸多因素,每每会采用一些较为复杂的安全策略,加剧了程序员的开发负担.万幸的是,在JDK1.5出现以后,Sun大神(Doug Lea)终于为咱们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减小竞争条件(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为咱们提供了更实用的并发程序模型。


Executor                  :具体Runnable任务的执行者。
ExecutorService           :一个线程池管理者,其实现类有多种,我会介绍一部分。咱们能把Runnable,Callable提交到池中让其调度。
Semaphore                 :一个计数信号量
ReentrantLock             :一个可重入的互斥锁定 Lock,功能相似synchronized,但要强大的多。
Future                    :是与Runnable,Callable进行交互的接口,好比一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。
BlockingQueue             :阻塞队列。
CompletionService         : ExecutorService的扩展,能够得到线程执行结果的
CountDownLatch            :一个同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待。 
CyclicBarrier             :一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点 
Future                    :Future 表示异步计算的结果。
ScheduledExecutorService :一个 ExecutorService,可安排在给定的延迟后运行或按期执行的命令。

接下来逐一介绍

Executors主要方法说明
newFixedThreadPool固定大小线程池)
建立一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程(只有要请求的过来,就会在一个队列里等待执行)。若是在关闭前的执行期间因为失败而致使任何线程终止,那么一个新线程将代替它执行后续的任务(若是须要)。

newCachedThreadPool(无界线程池,能够进行自动线程回收)
建立一个可根据须要建立新线程的线程池,可是在之前构造的线程可用时将重用它们。对于执行不少短时间异步任务的程序而言,这些线程池一般可提升程序性能。调用 execute 将重用之前构造的线程(若是线程可用)。若是现有线程没有可用的,则建立一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。所以,长时间保持空闲的线程池不会使用任何资源。注意,可使用 ThreadPoolExecutor 构造方法建立具备相似属性但细节不一样(例如超时参数)的线程池。

newSingleThreadExecutor(单个后台线程)
建立一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,若是由于在关闭前的执行期间出现失败而终止了此单个线程,那么若是须要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的。与其余等效的 newFixedThreadPool(1) 不一样,可保证无需从新配置此方法所返回的执行程序便可使用其余的线程。

这些方法返回的都是ExecutorService对象,这个对象能够理解为就是一个线程池。
这个线程池的功能仍是比较完善的。能够提交任务submit()能够结束线程池shutdown()。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutor extends Thread {
private int index;
public MyExecutor(int i){
    this.index=i;
}
public void run(){
    try{
     System.out.println("["+this.index+"] start....");
     Thread.sleep((int)(Math.random()*1000));
     System.out.println("["+this.index+"] end.");
    }
    catch(Exception e){
     e.printStackTrace();
    }
}
public static void main(String args[]){
    ExecutorService service=Executors.newFixedThreadPool(4);
    for(int i=0;i<10;i++){
     service.execute(new MyExecutor(i));
     //service.submit(new MyExecutor(i));
    }
    System.out.println("submit finish");
    service.shutdown();
}
}

虽然打印了一些信息,可是看的不是很是清晰,这个线程池是如何工做的,咱们来将休眠的时间调长10倍。
Thread.sleep((int)(Math.random()*10000));

再来看,会清楚看到只能执行4个线程。当执行完一个线程后,才会又执行一个新的线程,也就是说,咱们将全部的线程提交后,线程池会等待执行完最后shutdown。咱们也会发现,提交的线程被放到一个“无界队列里”。这是一个有序队列(BlockingQueue,这个下面会说到)。

另外它使用了Executors的静态函数生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即便它是Idle。
这就会产生性能问题,好比若是线程池的大小为200,当所有使用完毕后,全部的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增长。
若是要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。能够像通用的线程池同样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。


这个就是线程池基本用法。

Semaphore
一个计数信号量。从概念上讲,信号量维护了一个许可集合。若有必要,在许可可用前会阻塞每个 acquire(),而后再获取该许可。每一个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。可是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采起相应的行动。

Semaphore 一般用于限制能够访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:

这里是一个实际的状况,你们排队上厕所,厕所只有两个位置,来了10我的须要排队。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MySemaphore extends Thread {
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){
    this.id=i;
    this.position=s;
}
public void run(){
    try{
     if(position.availablePermits()>0){
      System.out.println("顾客["+this.id+"]进入厕所,有空位");
     }
     else{
      System.out.println("顾客["+this.id+"]进入厕所,没空位,排队");
     }
     position.acquire();
     System.out.println("顾客["+this.id+"]得到坑位");
     Thread.sleep((int)(Math.random()*1000));
     System.out.println("顾客["+this.id+"]使用完毕");
     position.release();
    }
    catch(Exception e){
     e.printStackTrace();
    }
}
public static void main(String args[]){
    ExecutorService list=Executors.newCachedThreadPool();
    Semaphore position=new Semaphore(2);
    for(int i=0;i<10;i++){
     list.submit(new MySemaphore(i+1,position));
    }
    list.shutdown();
    position.acquireUninterruptibly(2);
    System.out.println("使用完毕,须要清扫了");
    position.release(2);
}
}

ReentrantLock
一个可重入的互斥锁定 Lock,它具备与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。

ReentrantLock 将由最近成功得到锁定,而且尚未释放该锁定的线程所拥有。当锁定没有被另外一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。若是当前线程已经拥有该锁定,此方法将当即返回。可使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此状况是否发生。

此类的构造方法接受一个可选的公平参数。
当设置为 true时,在多个线程的争用下,这些锁定倾向于将访问权授予等待时间最长的线程。不然此锁定将没法保证任何特定访问顺序。
与采用默认设置(使用不公平锁定)相比,使用公平锁定的程序在许多线程访问时表现为很低的整体吞吐量(即速度很慢,经常极其慢),可是在得到锁定和保证锁定分配的均衡性时差别较小。不过要注意的是,公平锁定不能保证线程调度的公平性。所以,使用公平锁定的众多线程中的一员可能得到多倍的成功机会,这种状况发生在其余活动线程没有被处理而且目前并未持有锁定时。还要注意的是,未定时的 tryLock 方法并无使用公平设置。由于即便其余线程正在等待,只要该锁定是可用的,此方法就能够得到成功。

建议老是 当即实践,使用 try 块来调用 lock,在以前/以后的构造中,最典型的代码以下: 
class X {
    private final ReentrantLock lock = new ReentrantLock();
    // ...

    public void m() { 
      lock.lock(); // block until condition holds
      try {
        // ... method body
      } finally {
        lock.unlock()
      }
    }
}

个人例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock extends Thread{
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){
    this.id=i;
    this.lock=test;
}
public void run(){
    lock.print(id);
}
public static void main(String args[]){
    ExecutorService service=Executors.newCachedThreadPool();
    TestReentrantLock lock=new TestReentrantLock();
    for(int i=0;i<10;i++){
     service.submit(new MyReentrantLock(i,lock));
    }
    service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){
    try{
     lock.lock();
     System.out.println(str+"得到");
     Thread.sleep((int)(Math.random()*1000));
    }
    catch(Exception e){
     e.printStackTrace();
    }
    finally{
     System.out.println(str+"释放");
     lock.unlock();
    }
}
}

BlockingQueue
支持两个附加操做的 Queue,这两个操做是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用做指示 poll 操做失败的警惕值。

BlockingQueue 能够是限定容量的。它在任意给定时间均可以有一个 remainingCapacity,超出此容量,便没法无阻塞地 put 额外的元素。
没有任何内部容量约束的 BlockingQueue 老是报告 Integer.MAX_VALUE 的剩余容量。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。所以,举例来讲,使用 remove(x) 从队列中移除任意一个元素是有可能的。
然而,这种操做一般不 会有效执行,只能有计划地偶尔使用,好比在取消排队信息时。

BlockingQueue 实现是线程安全的。全部排队方法均可以使用内部锁定或其余形式的并发控制来自动达到它们的目的。
然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。
所以,举例来讲,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操做来指示再也不添加任何项。
这种功能的需求和使用有依赖于实现的倾向。例如,一种经常使用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

下面的例子演示了这个阻塞队列的基本功能。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
private int index;
public MyBlockingQueue(int i) {
   this.index = i;
}
public void run() {
   try {
    queue.put(String.valueOf(this.index));
    System.out.println("{" + this.index + "} in queue!");
   } catch (Exception e) {
    e.printStackTrace();
   }
}
public static void main(String args[]) {
   ExecutorService service = Executors.newCachedThreadPool();
   for (int i = 0; i < 10; i++) {
    service.submit(new MyBlockingQueue(i));
   }
   Thread thread = new Thread() {
    public void run() {
     try {
      while (true) {
       Thread.sleep((int) (Math.random() * 1000));
       if(MyBlockingQueue.queue.isEmpty())
        break;
       String str = MyBlockingQueue.queue.take();
       System.out.println(str + " has take!");
      }
     } catch (Exception e) {
      e.printStackTrace();
     }
    }
   };
   service.submit(thread);
   service.shutdown();
}
}

---------------------执行结果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!

-----------------------------------------


CompletionService

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,
并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 能够用来管理异步 IO ,执行读操做的任务做为程序或系统的一部分提交,
而后,当完成读操做时,会在程序的不一样部分执行其余操做,执行操做的顺序可能与所请求的顺序不一样。

一般,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种状况下,
CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyCompletionService implements Callable<String> {
private int id;

public MyCompletionService(int i){
   this.id=i;
}
public static void main(String[] args) throws Exception{
   ExecutorService service=Executors.newCachedThreadPool();
   CompletionService<String> completion=new ExecutorCompletionService<String>(service);
   for(int i=0;i<10;i++){
    completion.submit(new MyCompletionService(i));
   }
   for(int i=0;i<10;i++){
    System.out.println(completion.take().get());
   }
   service.shutdown();
}
public String call() throws Exception {
   Integer time=(int)(Math.random()*1000);
   try{
    System.out.println(this.id+" start");
    Thread.sleep(time);
    System.out.println(this.id+" end");
   }
   catch(Exception e){
    e.printStackTrace();
   }
   return this.id+":"+time;
}
}


CountDownLatch


一个同步辅助类,在完成一组正在其余线程中执行的操做以前,它容许一个或多个线程一直等待。

用给定的计数 初始化 CountDownLatch。因为调用了 countDown() 方法,因此在当前计数到达零以前,await 方法会一直受阻塞。
以后,会释放全部等待的线程,await 的全部后续调用都将当即返回。这种现象只出现一次——计数没法被重置。若是须要重置计数,请考虑使用 CyclicBarrier。

CountDownLatch 是一个通用同步工具,它有不少用途。将计数 1 初始化的 CountDownLatch 用做一个简单的开/关锁存器,
或入口:在经过调用 countDown() 的线程打开入口前,全部调用 await 的线程都一直在入口处等待。
用 N 初始化的 CountDownLatch 可使一个线程在 N 个线程完成某项操做以前一直等待,或者使其在某项操做完成 N 次以前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,
而在全部线程都能经过以前,它只是阻止任何线程继续经过一个 await。 
一下的例子是别人写的,很是形象。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
   // 开始的倒数锁
   final CountDownLatch begin = new CountDownLatch(1);
   // 结束的倒数锁
   final CountDownLatch end = new CountDownLatch(10);
   // 十名选手
   final ExecutorService exec = Executors.newFixedThreadPool(10);
  
   for (int index = 0; index < 10; index++) {
    final int NO = index + 1;
    Runnable run = new Runnable() {
     public void run() {
      try {
       begin.await();//一直阻塞
       Thread.sleep((long) (Math.random() * 10000));
       System.out.println("No." + NO + " arrived");
      } catch (InterruptedException e) {
      } finally {
       end.countDown();
      }
     }
    };
    exec.submit(run);
   }
   System.out.println("Game Start");
   begin.countDown();
   end.await();
   System.out.println("Game Over");
   exec.shutdown();
}
}

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,若是没有到达0,就只有阻塞等待了。


CyclicBarrier

一个同步辅助类,它容许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。
在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 颇有用。由于该 barrier 在释放等待线程后能够重用,因此称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达以后(但在释放全部线程以前),
该命令只在每一个屏障点运行一次。若在继续全部参与线程以前更新共享状态,此屏障操做 颇有用。

示例用法:下面是一个在并行分解设计中使用 barrier 的例子,很经典的旅行团例子:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
  // 徒步须要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
  private static int[] timeWalk = { 5, 8, 15, 15, 10 };
  // 自驾游
  private static int[] timeSelf = { 1, 3, 4, 4, 5 };
  // 旅游大巴
  private static int[] timeBus = { 2, 4, 6, 6, 7 };
  
  static String now() {
     SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
     return sdf.format(new Date()) + ": ";
  }
  static class Tour implements Runnable {
     private int[] times;
     private CyclicBarrier barrier;
     private String tourName;
     public Tour(CyclicBarrier barrier, String tourName, int[] times) {
       this.times = times;
       this.tourName = tourName;
       this.barrier = barrier;
     }
     public void run() {
       try {
         Thread.sleep(times[0] * 1000);
         System.out.println(now() + tourName + " Reached Shenzhen");
         barrier.await();
         Thread.sleep(times[1] * 1000);
         System.out.println(now() + tourName + " Reached Guangzhou");
         barrier.await();
         Thread.sleep(times[2] * 1000);
         System.out.println(now() + tourName + " Reached Shaoguan");
         barrier.await();
         Thread.sleep(times[3] * 1000);
         System.out.println(now() + tourName + " Reached Changsha");
         barrier.await();
         Thread.sleep(times[4] * 1000);
         System.out.println(now() + tourName + " Reached Wuhan");
         barrier.await();
       } catch (InterruptedException e) {
       } catch (BrokenBarrierException e) {
       }
     }
  }
  public static void main(String[] args) {
     // 三个旅行团
     CyclicBarrier barrier = new CyclicBarrier(3);
     ExecutorService exec = Executors.newFixedThreadPool(3);
     exec.submit(new Tour(barrier, "WalkTour", timeWalk));
     exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//当咱们把下面的这段代码注释后,会发现,程序阻塞了,没法继续运行下去。
     exec.submit(new Tour(barrier, "BusTour", timeBus));
     exec.shutdown();
  }
}

CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当全部线程都调用了await()后,就表示这些线程均可以继续执行,不然就会等待。

Future

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
计算完成后只能使用 get 方法来检索结果,若有必要,计算完成前能够阻塞此方法。取消则由 cancel 方法来执行。
还提供了其余方法,以肯定任务是正常完成仍是被取消了。一旦计算完成,就不能再取消计算。
若是为了可取消性而使用 Future但又不提供可用的结果,则能够声明 Future<?> 形式类型、并返回 null 做为基础任务的结果。

这个咱们在前面CompletionService已经看到了,这个Future的功能,并且这个能够在提交线程的时候被指定为一个返回对象的。


ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或按期执行的命令。

schedule 方法使用各类延迟建立任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法建立并执行某些在取消前一直按期运行的任务。

用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,经过所请求的 0 延迟进行安排。
schedule 方法中容许出现 0 和负数延迟(但不是周期),并将这些视为一种当即执行的请求。

全部的 schedule 方法都接受相对 延迟和周期做为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。
例如,要安排在某个之后的日期运行,可使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
可是要注意,因为网络时间同步协议、时钟漂移或其余因素的存在,所以相对延迟的期满日期没必要与启用任务的当前 Date 相符。
Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。

一下的例子也是网上比较流行的。

import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
public class TestScheduledThread {
public static void main(String[] args) {
   final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
   final Runnable beeper = new Runnable() {
    int count = 0;
    public void run() {
     System.out.println(new Date() + " beep " + (++count));
    }
   };
   // 1秒钟后运行,并每隔2秒运行一次
   final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
   // 2秒钟后运行,并每次在上次任务运行完后等待5秒后从新运行
   final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
   // 30秒后结束关闭任务,而且关闭Scheduler
   scheduler.schedule(new Runnable() {
    public void run() {
     beeperHandle.cancel(true);
     beeperHandle2.cancel(true);
     scheduler.shutdown();
    }
   }, 30, SECONDS);
}
}

这样咱们就把concurrent包下比较重要的功能都已经总结完了,但愿对咱们理解能有帮助。


关于 java.util.concurrent 您不知道的 5 件事,第 1 部分

http://www.ibm.com/developerworks/cn/java/j-5things4.html 

关于 java.util.concurrent 您不知道的 5 件事,第 2 部分

http://www.ibm.com/developerworks/cn/java/j-5things5.html

Concurrent Collections 是 Java™ 5 的巨大附加产品,可是在关于注释和泛型的争执中不少 Java 开发人员忽视了它们。此外(或者更老实地说),许多开发人员避免使用这个数据包,由于他们认为它必定很复杂,就像它所要解决的问题同样。

事实上,java.util.concurrent 包含许多类,可以有效解决普通的并发问题,无需复杂工序。阅读本文,了解 java.util.concurrent 类,好比 CopyOnWriteArrayList 和BlockingQueue 如何帮助您解决多线程编程的棘手问题。

1. TimeUnit

尽管本质上 不是 Collections 类,但 java.util.concurrent.TimeUnit 枚举让代码更易读懂。使用 TimeUnit 将使用您的方法或 API 的开发人员从毫秒的 “暴政” 中解放出来。

TimeUnit 包括全部时间单位,从 MILLISECONDS 和 MICROSECONDS 到 DAYS 和 HOURS,这就意味着它可以处理一个开发人员所需的几乎全部的时间范围类型。同时,由于在列举上声明了转换方法,在时间加快时,将 HOURS 转换回 MILLISECONDS 甚至变得更容易。

回页首

2. CopyOnWriteArrayList

建立数组的全新副本是过于昂贵的操做,不管是从时间上,仍是从内存开销上,所以在一般使用中不多考虑;开发人员每每求助于使用同步的ArrayList。然而,这也是一个成本较高的选择,由于每当您跨集合内容进行迭代时,您就不得不一样步全部操做,包括读和写,以此保证一致性。

这又让成本结构回到这样一个场景:需多读者都在读取 ArrayList,可是几乎没人会去修改它。

CopyOnWriteArrayList 是个巧妙的小宝贝,能解决这一问题。它的 Javadoc 将 CopyOnWriteArrayList 定义为一个 “ArrayList 的线程安全变体,在这个变体中全部易变操做(添加,设置等)能够经过复制全新的数组来实现”。

集合从内部将它的内容复制到一个没有修改的新数组,这样读者访问数组内容时就不会产生同步成本(由于他们历来不是在易变数据上操做)。

本质上讲,CopyOnWriteArrayList 很适合处理 ArrayList 常常让咱们失败的这种场景:读取频繁,但不多有写操做的集合,例如 JavaBean 事件的 Listeners。

回页首

3. BlockingQueue

BlockingQueue 接口表示它是一个 Queue,意思是它的项以先入先出(FIFO)顺序存储。在特定顺序插入的项以相同的顺序检索 — 可是须要附加保证,从空队列检索一个项的任未尝试都会阻塞调用线程,直到这个项准备好被检索。同理,想要将一个项插入到满队列的尝试也会致使阻塞调用线程,直到队列的存储空间可用。

BlockingQueue 干净利落地解决了如何将一个线程收集的项“传递”给另外一线程用于处理的问题,无需考虑同步问题。Java Tutorial 的 Guarded Blocks 试用版就是一个很好的例子。它构建一个单插槽绑定的缓存,当新的项可用,并且插槽也准备好接受新的项时,使用手动同步和wait()/notifyAll() 在线程之间发信。(详见 Guarded Blocks 实现。)

尽管 Guarded Blocks 教程中的代码有效,可是它耗时久,混乱,并且也并不是彻底直观。退回到 Java 平台较早的时候,没错,Java 开发人员不得不纠缠于这种代码;但如今是 2010 年 — 状况难道没有改善?

清单 1 显示了 Guarded Blocks 代码的重写版,其中我使用了一个 ArrayBlockingQueue,而不是手写的 Drop

清单 1. BlockingQueue
import java.util.*;
import java.util.concurrent.*;

class Producer
    implements Runnable
{
    private BlockingQueue<String> drop;
    List<String> messages = Arrays.asList(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "Wouldn't you eat ivy too?");
        
    public Producer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            for (String s : messages)
                drop.put(s);
            drop.put("DONE");
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }    
}

class Consumer
    implements Runnable
{
    private BlockingQueue<String> drop;
    public Consumer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            String msg = null;
            while (!((msg = drop.take()).equals("DONE")))
                System.out.println(msg);
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }
}

public class ABQApp
{
    public static void main(String[] args)
    {
        BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

ArrayBlockingQueue 还体现了“公平” — 意思是它为读取器和编写器提供线程先入先出访问。这种替代方法是一个更有效,但又冒穷尽部分线程风险的政策。(即,容许一些读取器在其余读取器锁定时运行效率更高,可是您可能会有读取器线程的流持续不断的风险,致使编写器没法进行工做。)

注意 Bug!

顺便说一句,若是您注意到 Guarded Blocks 包含一个重大 bug,那么您是对的 — 若是开发人员在 main() 中的Drop 实例上同步,会出现什么状况呢?

BlockingQueue 还支持接收时间参数的方法,时间参数代表线程在返回信号故障以插入或者检索有关项以前须要阻塞的时间。这么作会避免非绑定的等待,这对一个生产系统是致命的,由于一个非绑定的等待会很容易致使须要重启的系统挂起。

回页首

4. ConcurrentMap

Map 有一个微妙的并发 bug,这个 bug 将许多不知情的 Java 开发人员引入歧途。ConcurrentMap 是最容易的解决方案。

当一个 Map 被从多个线程访问时,一般使用 containsKey() 或者 get() 来查看给定键是否在存储键/值对以前出现。可是即便有一个同步的Map,线程仍是能够在这个过程当中潜入,而后夺取对 Map 的控制权。问题是,在对 put() 的调用中,锁在 get() 开始时获取,而后在能够再次获取锁以前释放。它的结果是个竞争条件:这是两个线程之间的竞争,结果也会因谁先运行而不一样。

若是两个线程几乎同时调用一个方法,二者都会进行测试,调用 put,在处理中丢失第一线程的值。幸运的是,ConcurrentMap 接口支持许多附加方法,它们设计用于在一个锁下进行两个任务:putIfAbsent(),例如,首先进行测试,而后仅当键没有存储在 Map 中时进行 put。

回页首

5. SynchronousQueues

根据 Javadoc,SynchronousQueue 是个有趣的东西:

这是一个阻塞队列,其中,每一个插入操做必须等待另外一个线程的对应移除操做,反之亦然。一个同步队列不具备任何内部容量,甚至不具备 1 的容量。

本质上讲,SynchronousQueue 是以前提过的 BlockingQueue 的又一实现。它给咱们提供了在线程之间交换单一元素的极轻量级方法,使用 ArrayBlockingQueue 使用的阻塞语义。在清单 2 中,我重写了 清单 1 的代码,使用 SynchronousQueue 替代ArrayBlockingQueue

清单 2. SynchronousQueue
import java.util.*;
import java.util.concurrent.*;

class Producer
    implements Runnable
{
    private BlockingQueue<String> drop;
    List<String> messages = Arrays.asList(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "Wouldn't you eat ivy too?");
        
    public Producer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            for (String s : messages)
                drop.put(s);
            drop.put("DONE");
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }    
}

class Consumer
    implements Runnable
{
    private BlockingQueue<String> drop;
    public Consumer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            String msg = null;
            while (!((msg = drop.take()).equals("DONE")))
                System.out.println(msg);
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }
}

public class SynQApp
{
    public static void main(String[] args)
    {
        BlockingQueue<String> drop = new SynchronousQueue<String>();
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

实现代码看起来几乎相同,可是应用程序有额外获益:SynchronousQueue 容许在队列进行一个插入,只要有一个线程等着使用它。

在实践中,SynchronousQueue 相似于 Ada 和 CSP 等语言中可用的 “会合通道”。

并发 Collections 提供了线程安全、通过良好调优的数据结构,简化了并发编程。然而,在一些情形下,开发人员须要更进一步,思考如何调节和/或限制线程执行。因为 java.util.concurrent 的整体目标是简化多线程编程,您可能但愿该包包含同步实用程序,而它确实包含。

本文是 第 1 部分 的延续,将介绍几个比核心语言原语(监视器)更高级的同步结构,但它们还未包含在 Collection 类中。一旦您了解了这些锁和门的用途,使用它们将很是直观。

关于本系列

您以为本身懂 Java 编程?事实是,大多数开发人员都只领会到了 Java 平台的皮毛,所学也只够应付工做。在本系列 中,Ted Neward 深度挖掘 Java 平台的核心功能,揭示一些不为人知的事实,帮助您解决最棘手的编程困难。

1. Semaphore

在一些企业系统中,开发人员常常须要限制未处理的特定资源请求(线程/操做)数量,事实上,限制有时候可以提升系统的吞吐量,由于它们减小了对特定资源的争用。尽管彻底能够手动编写限制代码,但使用 Semaphore 类能够更轻松地完成此任务,它将帮您执行限制,如清单 1 所示:

清单 1. 使用 Semaphore 执行限制
import java.util.*;import java.util.concurrent.*;

public class SemApp
{
    public static void main(String[] args)
    {
        Runnable limitedCall = new Runnable() {
            final Random rand = new Random();
            final Semaphore available = new Semaphore(3);
            int count = 0;
            public void run()
            {
                int time = rand.nextInt(15);
                int num = count++;
                
                try
                {
                    available.acquire();
                    
                    System.out.println("Executing " + 
                        "long-running action for " + 
                        time + " seconds... #" + num);
                
                    Thread.sleep(time * 1000);

                    System.out.println("Done with #" + 
                        num + "!");

                    available.release();
                }
                catch (InterruptedException intEx)
                {
                    intEx.printStackTrace();
                }
            }
        };
        
        for (int i=0; i<10; i++)
            new Thread(limitedCall).start();
    }
}

即便本例中的 10 个线程都在运行(您能够对运行 SemApp 的 Java 进程执行 jstack 来验证),但只有 3 个线程是活跃的。在一个信号计数器释放以前,其余 7 个线程都处于空闲状态。(实际上,Semaphore 类支持一次获取和释放多个 permit,但这不适用于本场景。)

回页首

2. CountDownLatch

若是 Semaphore 是容许一次进入一个(这可能会勾起一些流行夜总会的保安的记忆)线程的并发性类,那么 CountDownLatch 就像是赛马场的起跑门栅。此类持有全部空闲线程,直到知足特定条件,这时它将会一次释放全部这些线程。

清单 2. CountDownLatch:让咱们去赛马吧!
import java.util.*;
import java.util.concurrent.*;

class Race
{
    private Random rand = new Random();
    
    private int distance = rand.nextInt(250);
    private CountDownLatch start;
    private CountDownLatch finish;
    
    private List<String> horses = new ArrayList<String>();
    
    public Race(String... names)
    {
        this.horses.addAll(Arrays.asList(names));
    }
    
    public void run()
        throws InterruptedException
    {
        System.out.println("And the horses are stepping up to the gate...");
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch finish = new CountDownLatch(horses.size());
        final List<String> places = 
            Collections.synchronizedList(new ArrayList<String>());
        
        for (final String h : horses)
        {
            new Thread(new Runnable() {
                public void run() {
                    try
                    {
                        System.out.println(h + 
                            " stepping up to the gate...");
                        start.await();
                        
                        int traveled = 0;
                        while (traveled < distance)
                        {
                            // In a 0-2 second period of time....
                            Thread.sleep(rand.nextInt(3) * 1000);
                            
                            // ... a horse travels 0-14 lengths
                            traveled += rand.nextInt(15);
                            System.out.println(h + 
                                " advanced to " + traveled + "!");
                        }
                        finish.countDown();
                        System.out.println(h + 
                            " crossed the finish!");
                        places.add(h);
                    }
                    catch (InterruptedException intEx)
                    {
                        System.out.println("ABORTING RACE!!!");
                        intEx.printStackTrace();
                    }
                }
            }).start();
        }

        System.out.println("And... they're off!");
        start.countDown();        

        finish.await();
        System.out.println("And we have our winners!");
        System.out.println(places.get(0) + " took the gold...");
        System.out.println(places.get(1) + " got the silver...");
        System.out.println("and " + places.get(2) + " took home the bronze.");
    }
}

public class CDLApp
{
    public static void main(String[] args)
        throws InterruptedException, java.io.IOException
    {
        System.out.println("Prepping...");
        
        Race r = new Race(
            "Beverly Takes a Bath",
            "RockerHorse",
            "Phineas",
            "Ferb",
            "Tin Cup",
            "I'm Faster Than a Monkey",
            "Glue Factory Reject"
            );
        
        System.out.println("It's a race of " + r.getDistance() + " lengths");
        
        System.out.println("Press Enter to run the race....");
        System.in.read();
        
        r.run();
    }
}

注意,在 清单 2 中,CountDownLatch 有两个用途:首先,它同时释放全部线程,模拟马赛的起点,但随后会设置一个门闩模拟马赛的终点。这样,“主” 线程就能够输出结果。 为了让马赛有更多的输出注释,能够在赛场的 “转弯处” 和 “半程” 点,好比赛马跨过跑道的四分之1、二分之一和四分之三线时,添加 CountDownLatch

回页首

3. Executor

清单 1 和 清单 2 中的示例都存在一个重要的缺陷,它们要求您直接建立 Thread 对象。这能够解决一些问题,由于在一些 JVM 中,建立Thread 是一项重量型的操做,重用现有 Thread 比建立新线程要容易得多。而在另外一些 JVM 中,状况正好相反:Thread 是轻量型的,能够在须要时很容易地新建一个线程。固然,若是 Murphy 拥有本身的解决办法(他一般都会拥有),那么您不管使用哪一种方法对于您最终将部署的平台都是不对的。

JSR-166 专家组(参见 参考资料)在必定程度上预测到了这一情形。Java 开发人员无需直接建立 Thread,他们引入了 Executor 接口,这是对建立新线程的一种抽象。如清单 3 所示,Executor 使您没必要亲自对 Thread 对象执行 new 就可以建立新线程:

清单 3. Executor
Executor exec = getAnExecutorFromSomeplace();
exec.execute(new Runnable() { ... });

使用 Executor 的主要缺陷与咱们在全部工厂中遇到的同样:工厂必须来自某个位置。不幸的是,与 CLR 不一样,JVM 没有附带一个标准的 VM 级线程池。

Executor 类实际上 充当着一个提供 Executor 实现实例的共同位置,但它只有 new 方法(例如用于建立新线程池);它没有预先建立实例。因此您能够自行决定是否但愿在代码中建立和使用 Executor 实例。(或者在某些状况下,您将可以使用所选的容器/平台提供的实例。)

ExecutorService 随时可使用

尽管没必要担忧 Thread 来自何处,但 Executor 接口缺少 Java 开发人员可能指望的某种功能,好比结束一个用于生成结果的线程并以非阻塞方式等待结果可用。(这是桌面应用程序的一个常见需求,用户将执行须要访问数据库的 UI 操做,而后若是该操做花费了很长时间,可能但愿在它完成以前取消它。)

对于此问题,JSR-166 专家建立了一个更加有用的抽象(ExecutorService 接口),它将线程启动工厂建模为一个可集中控制的服务。例如,无需每执行一项任务就调用一次 execute()ExecutorService 能够接受一组任务并返回一个表示每项任务的将来结果的将来列表

回页首

4. ScheduledExecutorServices

尽管 ExecutorService 接口很是有用,但某些任务仍须要以计划方式执行,好比以肯定的时间间隔或在特定时间执行给定的任务。这就是ScheduledExecutorService 的应用范围,它扩展了 ExecutorService

若是您的目标是建立一个每隔 5 秒跳一次的 “心跳” 命令,使用 ScheduledExecutorService 能够轻松实现,如清单 4 所示:

清单 4. ScheduledExecutorService 模拟心跳
import java.util.concurrent.*;

public class Ping
{
    public static void main(String[] args)
    {
        ScheduledExecutorService ses =
            Executors.newScheduledThreadPool(1);
        Runnable pinger = new Runnable() {
            public void run() {
                System.out.println("PING!");
            }
        };
        ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);
    }
}

这项功能怎么样?不用过于担忧线程,不用过于担忧用户但愿取消心跳时会发生什么,也不用明确地将线程标记为前台或后台;只需将全部的计划细节留给 ScheduledExecutorService

顺便说一下,若是用户但愿取消心跳,scheduleAtFixedRate 调用将返回一个 ScheduledFuture 实例,它不只封装告终果(若是有),还拥有一个 cancel 方法来关闭计划的操做。

回页首

5. Timeout 方法

为阻塞操做设置一个具体的超时值(以免死锁)的能力是 java.util.concurrent 库相比起早期并发特性的一大进步,好比监控锁定。

这些方法几乎老是包含一个 int/TimeUnit 对,指示这些方法应该等待多长时间才释放控制权并将其返回给程序。它须要开发人员执行更多工做 — 若是没有获取锁,您将如何从新获取? — 但结果几乎老是正确的:更少的死锁和更加适合生产的代码。(关于编写生产就绪代码的更多信息,请参见 参考资料 中 Michael Nygard 编写的 Release It!。)

回页首

结束语

java.util.concurrent 包还包含了其余许多好用的实用程序,它们很好地扩展到了 Collections 以外,尤为是在 .locks 和 .atomic 包中。深刻研究,您还将发现一些有用的控制结构,好比 CyclicBarrier 等。

相关文章
相关标签/搜索