Java中多线程并发体系知识点汇总

1、多线程html

一、操做系统有两个容易混淆的概念,进程和线程。java

进程:一个计算机程序的运行实例,包含了须要执行的指令;有本身的独立地址空间,包含程序内容和数据;不一样进程的地址空间是互相隔离的;进程拥有各类资源和状态信息,包括打开的文件、子进程和信号处理。算法

线程:表示程序的执行流程,是CPU调度执行的基本单位;线程有本身的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其余资源。编程

二、Java标准库提供了进程和线程相关的API,进程主要包括表示进程的java.lang.Process类和建立进程的java.lang.ProcessBuilder类;数组

表示线程的是java.lang.Thread类,在虚拟机启动以后,一般只有Java类的main方法这个普通线程运行,运行时能够建立和启动新的线程;还有一类守护线程(damon thread),守护线程在后台运行,提供程序运行时所需的服务。当虚拟机中运行的全部线程都是守护线程时,虚拟机终止运行。缓存

三、线程间的可见性:一个线程对进程中共享的数据的修改,是否对另外一个线程可见安全

可见性问题:数据结构

a、CPU采用时间片轮转等不一样算法来对线程进行调度多线程

[java] view plain copy 架构

  1. public class IdGenerator{

  2. private int value = 0;

  3. public int getNext(){

  4. return value++;

  5. }

  6. }

public class IdGenerator{



   private int value = 0;



   public int getNext(){



     return value++;



   }



}

对于IdGenerator的getNext()方法,在多线程下不能保证返回值是不重复的:各个线程之间相互竞争CPU时间来获取运行机会,CPU切换可能发生在执行间隙。

以上代码getNext()的指令序列:CPU切换可能发生在7条指令之间,多个getNext的指令交织在一块儿。

[java] view plain copy

  1. aload_0

  2. dup

  3. getfield #12

  4. dup_x1

  5. iconst_1

  6. iadd

  7. putfield #12


aload_0



dup



getfield #12



dup_x1



iconst_1



iadd



putfield #12

b、CPU缓存:

目前CPU通常采用层次结构的多级缓存的架构,有的CPU提供了L一、L2和L3三级缓存。当CPU须要读取主存中某个位置的数据时,会一次检查各级缓存中是否存在对应的数据。若是有,直接从缓存中读取,这比从主存中读取速度快不少。当CPU须要写入时,数据先被写入缓存中,以后再某个时间点写回主存。因此某些时间点上,缓存中的数据与主存中的数据多是不一致。

c、指令顺序重排

出行性能考虑,编译器在编译时可能会对字节代码的指令顺序进行从新排列,以优化指令的执行顺序,在单线程中不会有问题,但在多线程可能产生与可见性相关的问题。

2、Java内存模型(Java Memory Model)

屏蔽了CPU缓存等细节,只关注主存中的共享变量;关注对象的实例域、静态域和数组元素;关注线程间的动做。

一、volatile关键词:用来对共享变量的访问进行同步,上一次写入操做的结果对下一次读取操做是确定可见的。(在写入volatile变量值以后,CPU缓存中的内容会被写回内存;在读取volatile变量时,CPU缓存中的对应内容会被置为失效,从新从主存中进行读取),volatile不使用锁,性能优于synchronized关键词。

用来确保对一个变量的修改被正确地传播到其余线程中。

例子:A线程是Worker,一直跑循环,B线程调用setDone(true),A线程即中止任务

[java] view plain copy

  1. public class Worker{

  2. private volatile boolean done;

  3. public void setDone(boolean done){

  4. this.done = done;

  5. }

  6. public void work(){

  7. while(!done){

  8. //执行任务;

  9. }

  10. }

  11. }


public class Worker{



   private volatile boolean done;



   public void setDone(boolean done){



     this.done = done;



   }



   public void work(){



     while(!done){



         //执行任务;



     }



   }



}

例子:错误使用。由于没有锁的支持,volatile的修改不能依赖于当前值,当前值可能在其余线程中被修改。(Worker是直接赋新值与当前值无关)

[java] view plain copy

  1. public class Counter {

  2. public volatile static int count = 0;

  3. public static void inc() {

  4. //这里延迟1毫秒,使得结果明显

  5. try {

  6. Thread.sleep(1);

  7. } catch (InterruptedException e) {

  8. }

  9. count++;

  10. }

  11. public static void main(String[] args) {

  12. //同时启动1000个线程,去进行i++计算,看看实际结果

  13. for (int i = 0; i < 1000; i++) {

  14. new Thread(new Runnable() {

  15. @Override

  16. public void run() {

  17. Counter.inc();

  18. }

  19. }).start();

  20. }

  21. //这里每次运行的值都有可能不一样,可能不为1000

  22. System.out.println("运行结果:Counter.count=" + Counter.count);

  23. }

  24. }


public class Counter {



   public volatile static int count = 0;



   public static void inc() {



       //这里延迟1毫秒,使得结果明显



       try {



           Thread.sleep(1);



       } catch (InterruptedException e) {



       }



       count++;



   }



   public static void main(String[] args) {



       //同时启动1000个线程,去进行i++计算,看看实际结果



       for (int i = 0; i < 1000; i++) {



           new Thread(new Runnable() {



               @Override



               public void run() {



                   Counter.inc();



               }



           }).start();



       }



       //这里每次运行的值都有可能不一样,可能不为1000



       System.out.println("运行结果:Counter.count=" + Counter.count);



   }



}

二、final关键词

final关键词声明的域的值只能被初始化一次,通常在构造方法中初始化。。(在多线程开发中,final域一般用来实现不可变对象)

当对象中的共享变量的值不可能发生变化时,在多线程中也就不须要同步机制来进行处理,故在多线程开发中应尽量使用不可变对象。

另外,在代码执行时,final域的值能够被保存在寄存器中,而不用从主存中频繁从新读取。

三、java基本类型的原子操做

1)基本类型,引用类型的复制引用是原子操做;(即一条指令完成)

2)long与double的赋值,引用是能够分割的,非原子操做;

3)要在线程间共享long或double的字段时,必须在synchronized中操做,或是声明成volatile

3、Java提供的线程同步方式

一、synchronized关键字

方法或代码块的互斥性来完成实际上的一个原子操做。(方法或代码块在被一个线程调用时,其余线程处于等待状态)

全部的Java对象都有一个与synchronzied关联的监视器对象(monitor),容许线程在该监视器对象上进行加锁和解锁操做。

a、静态方法:Java类对应的Class类的对象所关联的监视器对象。

b、实例方法:当前对象实例所关联的监视器对象。

c、代码块:代码块声明中的对象所关联的监视器对象。

注:当锁被释放,对共享变量的修改会写入主存;当活得锁,CPU缓存中的内容被置为无效。编译器在处理synchronized方法或代码块,不会把其中包含的代码移动到synchronized方法或代码块以外,从而避免了因为代码重排而形成的问题。

例:如下方法getNext()和getNextV2() 都得到了当前实例所关联的监视器对象

[java] view plain copy

  1. public class SynchronizedIdGenerator{

  2. private int value = 0;

  3. public synchronized int getNext(){

  4. return value++;

  5. }

  6. public int getNextV2(){

  7. synchronized(this){

  8. return value++;

  9. }

  10. }

  11. }


public class SynchronizedIdGenerator{



   private int value = 0;



   public synchronized int getNext(){



     return value++;



   }



   public int getNextV2(){



     synchronized(this){



         return value++;



     }



   }



}

二、Object类的wait、notify和notifyAll方法

生产者和消费者模式,判断缓冲区是否满来消费,缓冲区是否空来生产的逻辑。若是用while 和 volatile也能够作,不过本质上会让线程处于忙等待,占用CPU时间,对性能形成影响。

wait: 将当前线程放入,该对象的等待池中,线程A调用了B对象的wait()方法,线程A进入B对象的等待池,而且释放B的锁。(这里,线程A必须持有B的锁,因此调用的代码必须在synchronized修饰下,不然直接抛出java.lang.IllegalMonitorStateException异常)。

notify:将该对象中等待池中的线程,随机选取一个放入对象的锁池,当当前线程结束后释放掉锁, 锁池中的线程便可竞争对象的锁来得到执行机会。

notifyAll:将对象中等待池中的线程,所有放入锁池。

(notify锁唤醒的线程选择由虚拟机实现来决定,不能保证一个对象锁关联的等待集合中的线程按照所指望的顺序被唤醒,极可能一个线程被唤醒以后,发现他所要求的条件并无知足,而从新进入等待池。由于当等待池中包含多个线程时,通常使用notifyAll方法,不过该方法会致使线程在没有必要的状况下被唤醒,以后又立刻进入等待池,对性能有影响,不过能保证程序的正确性)

工做流程:

a、Consumer线程A 来 看产品,发现产品为空,调用产品对象的wait(),线程A进入产品对象的等待池并释放产品的锁。

b、Producer线程B得到产品的锁,执行产品的notifyAll(),Consumer线程A从产品的等待池进入锁池,Producer线程B生产产品,而后退出释放锁。

c、Consumer线程A得到产品锁,进入执行,发现有产品,消费产品,而后退出。

例子:

[java] view plain copy

  1. public synchronized String pop(){

  2. this.notifyAll();// 唤醒对象等待池中的全部线程,可能唤醒的就是 生产者(当生产者发现产品满,就会进入对象的等待池,这里代码省略,基本略同)

  3. while(index == -1){//若是发现没产品,就释放锁,进入对象等待池

  4. this.wait();

  5. }//当生产者生产完后,消费者从this.wait()方法再开始执行,第一次还会执行循环,万一产品仍是为空,则再等待,因此这里必须用while循环,不能用if

  6. String good = buffer[index];

  7. buffer[index] = null;

  8. index--;

  9. return good;// 消费完产品,退出。

  10. }


public synchronized String pop(){



this.notifyAll();// 唤醒对象等待池中的全部线程,可能唤醒的就是 生产者(当生产者发现产品满,就会进入对象的等待池,这里代码省略,基本略同)



   while(index == -1){//若是发现没产品,就释放锁,进入对象等待池



     this.wait();



   }//当生产者生产完后,消费者从this.wait()方法再开始执行,第一次还会执行循环,万一产品仍是为空,则再等待,因此这里必须用while循环,不能用if



   String good = buffer[index];



   buffer[index] = null;



   index--;



   return good;// 消费完产品,退出。



}

注:wait()方法有超时和不超时之分,超时的在通过一段时间,线程还在对象的等待池中,那么线程也会推出等待状态。

三、线程状态转换:

已经废弃的方法:stop、suspend、resume、destroy,这些方法在实现上时不安全的。

线程的状态:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超时的等待)、TERMINATED。

a、方法sleep()进入的阻塞状态,不会释放对象的锁(即你们一块儿睡,谁也别想执行代码),因此不要让sleep方法处在synchronized方法或代码块中,不然形成其余等待获取锁的线程长时间处于等待。

b、方法join()则是主线程等待子线程完成,再往下执行。例如main方法新建两个线程A和B

[java] view plain copy

  1. public static void main(String[] args) throws InterruptedException {

  2. Thread t1 = new Thread(new ThreadTesterA());

  3. Thread t2 = new Thread(new ThreadTesterB());

  4. t1.start();

  5. t1.join(); // 等t1执行完再往下执行

  6. t2.start();

  7. t2.join(); // 在虚拟机执行中,这句可能被忽略

  8. }


public static void main(String[] args) throws InterruptedException { 



Thread t1 = new Thread(new ThreadTesterA()); 



Thread t2 = new Thread(new ThreadTesterB()); 



t1.start(); 



t1.join(); // 等t1执行完再往下执行



t2.start(); 



t2.join(); // 在虚拟机执行中,这句可能被忽略



}

c、方法interrupt(),向被调用的对象线程发起中断请求。如线程A经过调用线程B的d的interrupt方法来发出中断请求,线程B来处理这个请求,固然也能够忽略,这不是必须的。Object类的wait()、Thread类的join()和sleep方法都会抛出受检异常java.lang.InterruptedException,经过interrupt方法中断该线程会致使线程离开等待状态。对于wait()调用来讲,线程须要从新获取监视器对象上的锁以后才能抛出InterruptedException异常,并致以异常的处理逻辑。

能够经过Thread类的isInterrupted方法来判断是否有中断请求发生,一般能够利用这个方法来判断是否退出线程(相似上面的volatitle修饰符的例子);

Thread类还有个方法Interrupted(),该方法不但能够判断当前线程是否被中断,还会清楚线程内部的中断标记,若是返回true,即曾被请求中断,同时调用完后,清除中断标记。

若是一个线程在某个对象的等待池,那么notify和interrupt 均可以使该线程从等待池中被移除。若是同时发生,那么看实际发生顺序。若是是notify先,那照常唤醒,没影响。若是是interrupt先,而且虚拟机选择让该线程中断,那么即便nofity,也会忽略该线程,而唤醒等待池中的另外一个线程。

e、yield(),尝试让出所占有的CPU资源,让其余线程获取运行机会,对操做系统上的调度器来讲是一个信号,不必定当即切换线程。(在实际开发中,测试阶段频繁调用yeid方法使线程切换更频繁,从而让一些多线程相关的错误更容易暴露出来)。

4、非阻塞方式

线程之间同步机制的核心是监视对象上的锁,竞争锁来得到执行代码的机会。当一个对象获取对象的锁,而后其余尝试获取锁的对象会处于等待状态,这种锁机制的实现方式很大程度限制了多线程程序的吞吐量和性能(线程阻塞),且会带来死锁(线程A有a对象锁,等着获取b对象锁,线程B有b对象锁,等待获取a对象锁)和优先级倒置(优先级低的线程得到锁,优先级高的只能等待对方释放锁)等问题。

若是能不阻塞线程,又能保证多线程程序的正确性,就能有更好的性能。

在程序中,对共享变量的使用通常遵循必定的模式,即读取、修改和写入三步组成。以前碰到的问题是,这三步执行中可能线程执行切换,形成非原子操做。锁机制是把这三步变成一个原子操做。

目前CPU自己实现 将这三步 合起来 造成一个原子操做,无需线程锁机制干预,常见的指令是“比较和替换”(compare and swap,CAS),这个指令会先比较某个内存地址的当前值是否是指定的旧指,若是是,就用新值替换,不然什么也不作,指令返回的结果是内存地址的当前值。经过CAS指令能够实现不依赖锁机制的非阻塞算法。通常作法是把CAS指令的调用放在一个无限循环中,不断尝试,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是全部CPU都支持CAS,在某些平台,java.util.concurrent.atomic的实现仍然是锁机制)

atomic包中提供的Java类分红三类:

一、支持以原子操做来进行更新的数据类型的Java类(AtomicBoolean、AtomicInteger、AtomicReference),在内存模型相关的语义上,这四个类的对象相似于volatile变量。

类中的经常使用方法:

a、compareAndSet:接受两个参数,一个是指望的旧值,一个是替换的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式读取和有条件地写入变量但不建立任何 happen-before 排序,但在源代码中和compareAndSet彻底同样,因此并无按JSR实现)

c、get和set:分别用来直接获取和设置变量的值。

d、lazySet:与set相似,但容许编译器把lazySet方法的调用与后面的指令进行重排,所以对值得设置操做有可能被推迟。

例:

[java] view plain copy

  1. public class AtomicIdGenerator{

  2. private final AtomicInter counter = new AtomicInteger(0);

  3. public int getNext(){

  4. return counter.getAndIncrement();

  5. }

  6. }

  7. // getAndIncrement方法的内部实现方式,这也是CAS方法的通常模式,CAS方法不必定成功,因此包装在一个无限循环中,直到成功

  8. public final int getAndIncrement(){

  9. for(;;){

  10. int current = get();

  11. int next = current +1;

  12. if(compareAndSet(current,next))

  13. return current;

  14. }

  15. }


public class AtomicIdGenerator{



   private final AtomicInter counter = new AtomicInteger(0);



   public int getNext(){



     return counter.getAndIncrement();



   }



}



// getAndIncrement方法的内部实现方式,这也是CAS方法的通常模式,CAS方法不必定成功,因此包装在一个无限循环中,直到成功



public final int getAndIncrement(){



   for(;;){



     int current = get();



     int next = current +1;



     if(compareAndSet(current,next))



         return current;



   }



}

二、提供对数组类型的变量进行处理的Java类,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray类。(同上,只是放在类数组里,调用时也只是多了一个操做元素索引的参数)

三、经过反射的方式对任何对象中包含的volatitle变量使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他们提供了一种方式把CAS的功能扩展到了任何Java类中声明为volatitle的域上。(灵活,但语义较弱,由于对象的volatitle可能被非atomic的其余方式被修改)

[java] view plain copy

  1. public class TreeNode{

  2. private volatile TreeNode parent;

  3. // 静态工厂方法

  4. private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");

  5. public boolean compareAndSetParent(TreeNode expect, TreeNode update){

  6. return parentUpdater.compareAndSet(this, expect, update);

  7. }

  8. }


public class TreeNode{



   private volatile TreeNode parent;



// 静态工厂方法



   private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");



public boolean compareAndSetParent(TreeNode expect, TreeNode update){



     return parentUpdater.compareAndSet(this, expect, update);



}



}

注:java.util.concurrent.atomic包中的Java类属于比较底层的实现,通常做为java.util.concurrent包中不少非阻塞的数据结构的实现基础。

比较多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在实现线程安全的计数器时,AtomicInteger和AtomicLong类时最佳的选择。

5、高级同步机制(比synchronized更灵活的加锁机制)

synchronized和volatile,以及wait、notify等方法抽象层次低,在程序开发中使用比较繁琐,易出错。

而多线程之间的交互来讲,存在某些固定的模式,如生产者-消费者和读者-写者模式,把这些模式抽象成高层API,使用起来会很是方便。

java.util.concurrent包为多线程提供了高层的API,知足平常开发中的常见需求。

经常使用接口

一、Lock接口,表示一个锁方法:

a、lock(),获取所,若是没法获取所锁,会处于等待状态

b、unlock(),释放锁。(通常放在finally代码块中)

c、lockInterruptibly(),与lock()相似,但容许当前线程在等待获取锁的过程当中被中断。(因此要处理InterruptedException)

d、tryLock(),以非阻塞方式获取锁,若是没法获取锁,则返回false。(tryLock()的另外一个重载能够指定超时,若是指定超时,当没法获取锁,会等待而阻塞,同时线程能够被中断)

二、ReadWriteLock接口,表示两个锁,读取的共享锁和写入的排他锁。(适合常见的读者--写者场景)

ReadWriteLock接口的readLock和writeLock方法来获取对应的锁的Lock接口的实现。

在多数线程读取,少数线程写入的状况下,能够提升多线程的性能,提升使用该数据结构的吞吐量。

若是是相反的状况,较多的线程写入,则接口会下降性能。

三、ReentrantLock类和ReentrantReadWriteLock,分别为上面两个接口的实现类。

他们具备重入性:即容许一个线程屡次获取同一个锁(他们会记住上次获取锁而且未释放的线程对象,和加锁的次数,getHoldCount())

同一个线程每次获取锁,加锁数+1,每次释放锁,加锁数-1,到0,则该锁被释放,能够被其余线程获取。

[java] view plain copy

  1. public class LockIdGenrator{

  2. //new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期没法得到锁

  3. private int ReentrantLock lock = ReentrantLock();

  4. privafte int value = 0;

  5. public int getNext(){

  6. lock.lock(); //进来就加锁,没有锁会等待

  7. try{

  8. return value++;//实际操做

  9. }finally{

  10. lock.unlock();//释放锁

  11. }

  12. }

  13. }


public class LockIdGenrator{



//new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期没法得到锁



   private int ReentrantLock lock = ReentrantLock();



   privafte int value = 0;



   public int getNext(){



     lock.lock();     //进来就加锁,没有锁会等待



     try{



         return value++;//实际操做



     }finally{



         lock.unlock();//释放锁



     }



   }



}

注:重入性减小了锁在各个线程之间的等待,例如便利一个HashMap,每次next()以前加锁,以后释放,能够保证一个线程一口气完成便利,而不会每次next()以后释放锁,而后和其余线程竞争,下降了加锁的代价, 提供了程序总体的吞吐量。(即,让一个线程一口气完成任务,再把锁传递给其余线程)。

四、Condition接口,Lock接口代替了synchronized,Condition接口替代了object的wait、nofity。

a、await(),使当前线程进入等待状态,知道被唤醒或中断。重载形式能够指定超时时间。

b、awaitNanos(),以纳秒为单位等待。

c、awaitUntil(),指定超时发生的时间点,而不是通过的时间,参数为java.util.Date。

d、awaitUninterruptibly(),前面几种会响应其余线程发出的中断请求,他会无视,直到被唤醒。

注:与Object类的wait()相同,await()会释放其所持有的锁。

e、signal()和signalAll, 至关于 notify和notifyAll

[java] view plain copy

  1. Lock lock = new ReentrantLock();

  2. Condition condition = lock.newCondition();

  3. lock.lock();

  4. try{

  5. while(/逻辑条件不知足/){

  6. condition.await();

  7. }

  8. }finally{

  9. lock.unlock();

  10. }


Lock lock = new ReentrantLock();



Condition condition = lock.newCondition();



lock.lock();



try{



   while(/*逻辑条件不知足*/){



     condition.await();  



   }



}finally{



   lock.unlock();



}

6、底层同步器

多线程程序中,线程之间存在多种不一样的同步方式。除了Java标准库提供的同步方式以外,程序中特有的同步方式须要由开发人员本身来实现。

常见的一种需求是 对有限个共享资源的访问,好比多台我的电脑,2台打印机,当多个线程在等待同一个资源时,从公平角度出发,会用FIFO队列。

若是程序中的同步方式能够抽象成对有限个资源的访问,那么可使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer类和AbstractQueuedLongSynchronizer类做为实现的基础,前者用int类型的变量来维护内部状态,然后者用long类型。(能够将这个变量理解为共享资源个数)

经过getState、setState、和compareAndSetState3个方法更新内部变量的值。

AbstractQueuedSynchronizer类是abstract的,须要覆盖其中包含的部分方法,一般作法是把其做为一个Java类的内部类,外部类提供具体的同步方式,内部类则做为实现的基础。有两种模式,排他模式和共享模式,分别对应方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在这些方法中,使用getState、setState、compareAndSetState3个方法来修改内部变量的值,以此来反应资源的状态。

[java] view plain copy

  1. public class SimpleResourceManager{

  2. private final InnerSynchronizer synchronizer;

  3. private static class InnerSynchronizer extends AbstractQueuedSynchronizer{

  4. InnerSynchronizer(int numOfResources){

  5. setState(numOfResources);

  6. }

  7. protected int tryAcquireShared(int acquires){

  8. for(;;){

  9. int available = getState();

  10. int remain = available - acquires;

  11. if(remain <0 || comapreAndSetState(available, remain){

  12. return remain;

  13. }

  14. }

  15. }

  16. protected boolean try ReleaseShared(int releases){

  17. for(;;){

  18. int available = getState();

  19. int next = available + releases;

  20. if(compareAndSetState(available,next){

  21. return true;

  22. }

  23. }

  24. }

  25. }

  26. public SimpleResourceManager(int numOfResources){

  27. synchronizer = new InnerSynchronizer(numOfResources);

  28. }

  29. public void acquire() throws InterruptedException{

  30. synchronizer.acquireSharedInterruptibly(1);

  31. }

  32. pubic void release(){

  33. synchronizer.releaseShared(1);

  34. }

  35. }


public class SimpleResourceManager{



   private final InnerSynchronizer synchronizer;



   private static class InnerSynchronizer extends AbstractQueuedSynchronizer{



     InnerSynchronizer(int numOfResources){



         setState(numOfResources);



     }



     protected int tryAcquireShared(int acquires){



         for(;;){



           int available = getState();



           int remain = available - acquires;



           if(remain <0 || comapreAndSetState(available, remain){



               return remain;



           }



         }



     }



     protected boolean try ReleaseShared(int releases){



         for(;;){



           int available = getState();



           int next = available + releases;



           if(compareAndSetState(available,next){



               return true;



           }



         }



     }



   }



   public SimpleResourceManager(int numOfResources){



     synchronizer = new InnerSynchronizer(numOfResources);



   }



   public void acquire() throws InterruptedException{



     synchronizer.acquireSharedInterruptibly(1);



   }     



   pubic void release(){   



     synchronizer.releaseShared(1);



   }



}

7、高级同步对象(提升开发效率)

atomic和locks包提供的Java类能够知足基本的互斥和同步访问的需求,但这些Java类的抽象层次较低,使用比较复杂。

更简单的作法是使用java.util.concurrent包中的高级同步对象。

一、信号量。

信号量通常用来数量有限的资源,每类资源有一个对象的信号量,信号量的值表示资源的可用数量。

在使用资源时,须要从该信号量上获取许可,成功获取许可,资源的可用数-1;完成对资源的使用,释放许可,资源可用数+1; 当资源数为0时,须要获取资源的线程以阻塞的方式来等待资源,或过段时间以后再来检查资源是否可用。(上面的SimpleResourceManager类实际上时信号量的一个简单实现)

java.util.concurrent.Semaphore类,在建立Semaphore类的对象时指定资源的可用数

a、acquire(),以阻塞方式获取许可

b、tryAcquire(),以非阻塞方式获取许可

c、release(),释放许可。

d、accquireUninterruptibly(),accquire()方法获取许能够的过程能够被中断,若是不但愿被中断,使用此方法。

[java] view plain copy

  1. public class PrinterManager{

  2. private final Semphore semaphore;

  3. private final List<Printer> printers = new ArrayList<>():

  4. public PrinterManager(Collection<? extends Printer> printers){

  5. this.printers.addAll(printers);

  6. //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿

  7. this.semaphore = new Semaphore(this.printers.size(),true);

  8. }

  9. public Printer acquirePrinter() throws InterruptedException{

  10. semaphore.acquire();

  11. return getAvailablePrinter();

  12. }

  13. public void releasePrinter(Printer printer){

  14. putBackPrinter(pinter);

  15. semaphore.release();

  16. }

  17. private synchronized Printer getAvailablePrinter(){

  18. printer result = printers.get(0);

  19. printers.remove(0);

  20. return result;

  21. }

  22. private synchronized void putBackPrinter(Printer printer){

  23. printers.add(printer);

  24. }

  25. }


public class PrinterManager{



   private final Semphore semaphore;



   private final List<Printer> printers = new ArrayList<>():



   public PrinterManager(Collection<? extends Printer> printers){



     this.printers.addAll(printers);



     //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿



     this.semaphore = new Semaphore(this.printers.size(),true);



   }



   public Printer acquirePrinter() throws InterruptedException{



     semaphore.acquire();



     return getAvailablePrinter();



   }



   public void releasePrinter(Printer printer){



     putBackPrinter(pinter);



     semaphore.release();



   }



   private synchronized Printer getAvailablePrinter(){



     printer result = printers.get(0);



     printers.remove(0);



     return result;



   }



   private synchronized void putBackPrinter(Printer printer){



     printers.add(printer);



   }



}

二、倒数闸门

多线程协做时,一个线程等待另外的线程完成任务才能继续进行。

java.util.concurrent.CountDownLatch类,建立该类时,指定等待完成的任务数;当一个任务完成,调用countDonw(),任务数-1。等待任务完成的线程经过await(),进入阻塞状态,直到任务数量为0。CountDownLatch类为一次性,一旦任务数为0,再调用await()再也不阻塞当前线程,直接返回。

例:

[java] view plain copy

  1. public class PageSizeSorter{

  2. // 并发性能远远优于HashTable的 Map实现,hashTable作任何操做都须要得到锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不一样线程访问不一样的数据段,彻底不受影响,忘记HashTable吧。

  3. private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();

  4. private static class GetSizeWorker implements Runnable{

  5. private final String urlString;

  6. public GetSizeWorker(String urlString , CountDownLatch signal){

  7. this.urlString = urlStirng;

  8. this.signal = signal;

  9. }

  10. public void run(){

  11. try{

  12. InputStream is = new URL(urlString).openStream();

  13. int size = IOUtils.toByteArray(is).length;

  14. sizeMap.put(urlString, size);

  15. }catch(IOException e){

  16. sizeMap.put(urlString, -1);

  17. }finally{

  18. signal.countDown()://完成一个任务 , 任务数-1

  19. }

  20. }

  21. }

  22. private void sort(){

  23. List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());

  24. Collections.slort(list, new Comparator<Entry<String,Integer>>(){

  25. public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){

  26. return Integer.compare(o2.getValue(),o1.getValue());

  27. };

  28. System.out.println(Arrays.deepToString(list.toArray()));

  29. }

  30. public void sortPageSize(Collection<String> urls) throws InterruptedException{

  31. CountDownLatch sortSignal = new CountDownLatch(urls.size());

  32. for(String url: urls){

  33. new Thread(new GetSizeWorker(url, sortSignal)).start();

  34. }

  35. sortSignal.await()://主线程在这里等待,任务数归0,则继续执行

  36. sort();

  37. }

  38. }


public class PageSizeSorter{



   // 并发性能远远优于HashTable的 Map实现,hashTable作任何操做都须要得到锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不一样线程访问不一样的数据段,彻底不受影响,忘记HashTable吧。



   private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();



   private static class GetSizeWorker implements Runnable{



     private final String urlString;



     public GetSizeWorker(String urlString , CountDownLatch signal){



         this.urlString = urlStirng;



         this.signal = signal;



     }



     public void run(){



         try{



           InputStream is = new URL(urlString).openStream();



           int size = IOUtils.toByteArray(is).length;



           sizeMap.put(urlString, size);



         }catch(IOException e){



           sizeMap.put(urlString, -1);



         }finally{



           signal.countDown()://完成一个任务 , 任务数-1



         }



     }



   }



   private void sort(){



     List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());



     Collections.slort(list, new Comparator<Entry<String,Integer>>(){



         public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){



           return Integer.compare(o2.getValue(),o1.getValue());



     };



     System.out.println(Arrays.deepToString(list.toArray()));



   }



   public void sortPageSize(Collection<String> urls) throws InterruptedException{



     CountDownLatch sortSignal = new CountDownLatch(urls.size());



     for(String url: urls){



         new Thread(new GetSizeWorker(url, sortSignal)).start();



     }



     sortSignal.await()://主线程在这里等待,任务数归0,则继续执行



     sort();



   }



}

三、循环屏障

循环屏障在做用上相似倒数闸门,不过他不像倒数闸门是一次性的,能够循环使用。另外,线程之间是互相平等的,彼此都须要等待对方完成,当一个线程完成本身的任务以后,等待其余线程完成。当全部线程都完成任务以后,全部线程才能够继续运行。

当线程之间须要再次进行互相等待时,能够复用同一个循环屏障。

类java.uti.concurrent.CyclicBarrier用来表示循环屏障,建立时指定使用该对象的线程数目,还能够指定一个Runnable接口的对象做为每次循环后执行的动做。(当最后一个线程完成任务以后,全部线程继续执行以前,被执行。若是线程之间须要更新一些共享的内部状态,能够利用这个Runnalbe接口的对象来处理)。

每一个线程任务完成以后,经过调用await方法进行等待,当全部线程都调用await方法以后,处于等待状态的线程均可以继续执行。在全部线程中,只要有一个在等待中被中断,超时或是其余错误,整个循环屏障会失败,全部等待中的其余线程抛出java.uti.concurrent.BrokenBarrierException。

例:每一个线程负责找一个数字区间的质数,当全部线程完成后,若是质数数目不够,继续扩大范围查找

[java] view plain copy

  1. public class PrimeNumber{

  2. private static final int TOTAL_COUTN = 5000;

  3. private static final int RANGE_LENGTH= 200;

  4. private static final int WORKER_NUMBER = 5;

  5. private static volatitle boolean done = false;

  6. private static int rangeCount = 0;

  7. private static final List<Long> results = new ArrayList<Long>():

  8. private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){

  9. public void run(){

  10. if(results.size() >= TOTAL_COUNT){

  11. done = true;

  12. }

  13. }

  14. });

  15. private static class PrimeFinder implements Runnable{

  16. public void run(){

  17. while(!done){// 整个过程在一个 while循环下,await()等待,下次循环开始,会再次判断 执行条件

  18. int range = getNextRange();

  19. long start = rang * RANGE_LENGTH;

  20. long end = (range + 1) * RANGE_LENGTH;

  21. for(long i = start; i<end;i++){

  22. if(isPrime(i)){

  23. updateResult(i);

  24. }

  25. }

  26. try{

  27. barrier.await();

  28. }catch (InterruptedException | BokenBarrierException e){

  29. done = true;

  30. }

  31. }

  32. }

  33. }

  34. private synchronized static void updateResult(long value){

  35. results.add(value);

  36. }

  37. private synchronized static int getNextRange(){

  38. return rangeCount++;

  39. }

  40. private static boolean isPrime(long number){

  41. //找质数的代码

  42. }

  43. public void calculate(){

  44. for(int i=0;i<WORKER_NUMBER;i++){

  45. new Thread(new PrimeFinder()).start();

  46. }

  47. while(!done){

  48. }

  49. //计算完成

  50. }

  51. }


public class PrimeNumber{



   private static final int TOTAL_COUTN = 5000;



   private static final int RANGE_LENGTH= 200;



   private static final int WORKER_NUMBER = 5;



   private static volatitle boolean done = false;



   private static int rangeCount = 0;



   private static final List<Long> results = new ArrayList<Long>():



   private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){



     public void run(){



         if(results.size() >= TOTAL_COUNT){



           done = true;



         }



     }



   });



   private static class PrimeFinder implements Runnable{



     public void run(){



         while(!done){// 整个过程在一个 while循环下,await()等待,下次循环开始,会再次判断 执行条件



           int range = getNextRange();



           long start = rang * RANGE_LENGTH;



           long end = (range + 1) * RANGE_LENGTH;



           for(long i = start; i<end;i++){



               if(isPrime(i)){



                 updateResult(i);



               }



           }



           try{



               barrier.await();



           }catch (InterruptedException | BokenBarrierException e){



               done = true;



           }



         }



     }



   }



   private synchronized static void updateResult(long value){



     results.add(value);



   }



   private synchronized static int getNextRange(){



     return rangeCount++;



   }



   private static boolean isPrime(long number){



     //找质数的代码



   }



   public void calculate(){



     for(int i=0;i<WORKER_NUMBER;i++){



         new Thread(new PrimeFinder()).start();



     }



     while(!done){







     }



     //计算完成



   }



}

四、对象交换器

适合于两个线程须要进行数据交换的场景。(一个线程完成后,把结果交给另外一个线程继续处理)

java.util.concurrent.Exchanger类,提供了这种对象交换能力,两个线程共享一个Exchanger类的对象,一个线程完成对数据的处理以后,调用Exchanger类的exchange()方法把处理以后的数据做为参数发送给另一个线程。而exchange方法的返回结果是另一个线程锁提供的相同类型的对象。若是另一个线程未完成对数据的处理,那么exchange()会使当前线程进入等待状态,直到另一个线程也调用了exchange方法来进行数据交换。

例:

[java] view plain copy

  1. public class SendAndReceiver{

  2. private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();

  3. private class Sender implements Runnable{

  4. public void run(){

  5. try{

  6. StringBuilder content = new StringBuilder("Hello");

  7. content = exchanger.exchange(content);

  8. }catch(InterruptedException e){

  9. Thread.currentThread().interrupt();

  10. }

  11. }

  12. }

  13. private class Receiver implements Runnable{

  14. public void run(){

  15. try{

  16. StringBuilder content = new StringBuilder("World");

  17. content = exchanger.exchange(content);

  18. }catch(InterruptedException e){

  19. Thread.currentThread().interrupt();

  20. }

  21. }

  22. }

  23. public void exchange(){

  24. new Thread(new Sender()).start();

  25. new Thread(new Receiver()).start();

  26. }

  27. }


public class SendAndReceiver{



   private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>();



   private class Sender implements Runnable{



     public void run(){



         try{



           StringBuilder content = new StringBuilder("Hello");



           content = exchanger.exchange(content);



         }catch(InterruptedException e){



           Thread.currentThread().interrupt();



         }



     }



   }



   private class Receiver implements Runnable{



     public void run(){



         try{



           StringBuilder content = new StringBuilder("World");



           content = exchanger.exchange(content);



         }catch(InterruptedException e){



           Thread.currentThread().interrupt();



         }



     }



   }



   public void exchange(){



     new Thread(new Sender()).start();



     new Thread(new Receiver()).start();



   }



}

8、数据结构(多线程程序使用的高性能数据结构)

java.util.concurrent包中提供了一些适合多线程程序使用的高性能数据结构,包括队列和集合类对象等。

一、队列

a、BlockingQueue接口:线程安全的阻塞式队列;当队列已满时,想队列添加会阻塞;当队列空时,取数据会阻塞。(很是适合消费者-生产者模式)

阻塞方式:put()、take()。

非阻塞方式:offer()、poll()。

实现类:基于数组的固定元素个数的ArrayBolockingQueue和基于链表结构的不固定元素个数的LinkedBlockQueue类。

b、BlockingDeque接口: 与BlockingQueue类似,但能够对头尾进行添加和删除操做的双向队列;方法分为两类,分别在队首和对尾进行操做。

实现类:标准库值提供了一个基于链表的实现,LinkedBlockgingDeque

二、集合类

在多线程程序中,若是共享变量时集合类的对象,则不适合直接使用java.util包中的集合类。这些类要么不是线程安全,要么在多线程下性能比较差。

应该使用java.util.concurrent包中的集合类。

a、ConcurrentMap接口: 继承自java.util.Map接口

putIfAbsent():只有在散列表不包含给定键时,才会把给定的值放入。

remove():删除条目。

replace(key,value):把value 替换到给定的key上。

replace(key, oldvalue, newvalue):CAS的实现。

实现类:ConcurrentHashMap

建立时,若是能够预估可能包含的条目个数,能够优化性能。(由于动态调整所能包含的数目操做比较耗时,这个HashMap也同样,只是多线程下更耗时)。

建立时,预估进行更新操做的线程数,这样实现中会根据这个数把内部空间划分为对应数量的部分。(默认是16,若是只有一个线程进行写操做,其余都是读取,那么把值设为1 能够提升性能)。

注:当从集合中建立出迭代器遍历Map元素时,不必定能看到正在添加的数据,只能和集合保证弱一致性。(固然使用迭代器不会由于查看正在改变的Map,而抛出java.util.ConcurrentModifycationException)

b、CopyOnWriteArrayList接口:继承自java.util.List接口。

顾名思义,在CopyOnWriteArrayList的实现类,全部对列表的更新操做都会新建立一个底层数组的副本,并使用副原本存储数据;对列表更新操做加锁,读取操做不加锁。

适合多读取少修改的场景,若是更新操做多,那么不适合用,一样迭代器只能表示建立时列表的状态,更新后使用了新的底层数组,迭代器仍是引用旧的底层数组。

9、多线程任务的执行

过去线程的执行,是先建立Thread类的想,再调用start方法启动,这种作法要求开发人员对线程进行维护,在线程较多时,通常建立一个线程池同一管理,同时下降重复建立线程的开销

在J2SE5.0中,java.util.concurrent包提供了丰富的用来管理线程和执行任务的实现。

一、基本接口(描述任务)

a、Callable接口:

Runnable接口受限于run方法的类型签名,而Callable只有一个方法call(),能够有返回值,能够抛出受检异常。

b、Future接口:

过去,须要异步线程的任务执行结果,要求主线程和任务执行线程之间进行同步和数据传递。

Future简化了任务的异步执行,做为异步操做的一个抽象。调用get()方法能够获取异步的执行结果,若是任务没有执行完,会等待,直到任务完成或被取消,cancel()能够取消。

c、Delayed接口:

延迟执行任务,getDelay()返回当前剩余的延迟时间,若是不大于0,说明延迟时间已通过去,应该调度并执行该任务。

二、组合接口(描述任务)

a、RunnableFuture接口:继承自Runnable接口和Future接口。

当来自Runnalbe接口中的run方法成功执行以后,至关于Future接口表示的异步任务已经完成,能够经过get()获取运行结果。

b、ScheduledFuture接口:继承Future接口和Delayed接口,表示一个能够调用的异步操做。

c、RunnableScheduledFuture接口:继承自Runnable、Delayed和Future,接口中包含isPeriodic,代表该异步操做是否能够被重复执行。

三、Executor接口、ExcutorServer接口、ScheduleExecutorService接口和CompletionService接口(描述任务执行)

a、executor接口,execute()用来执行一个Runnable接口的实现对象,不一样的Executor实现采起不一样执行策略,但提供的任务执行功能比较弱。

b、excutorServer接口,继承自executor;

提供了对任务的管理:submit(),能够吧Callable和Runnable做为任务提交,获得一个Future做为返回,能够获取任务结果或取消任务。

提供批量执行:invokeAll()和invokeAny(),同时提交多个Callable;invokeAll(),会等待全部任务都执行完成,返回一个包含每一个任务对应Future的列表;invokeAny(),任何一个任务成功完成,即返回该任务结果。

提供任务关闭:shutdown()、shutdownNow()来关闭服务,前者不容许新的任务提交,后者试图终止正在运行和等待的任务,并返回已经提交单没有被运行的任务列表。(两个方法都不会等待服务真正关闭,只是发出关闭请求。)。shutdownDow,一般作法是向线程发出中断请求,因此确保提交的任务实现了正确的中断处理逻辑。

c、ScheduleExecutorService接口,继承自excutorServer接口:支持任务的延迟执行和按期执行,能够执行Callable或Runnable。

schedule(),调度一个任务在延迟若干时间以后执行;

scheduleAtFixedRate():在初始延迟后,每隔一段时间循环执行;在下一次执行开始时,上一次执行可能还未结束。(同一时间,可能有多个)

scheduleWithFixedDelay:同上,只是在上一次任务执行完后,通过给定的间隔时间再开始下一次执行。(同一时间,只有一个)

以上三个方法都返回ScheduledFuture接口的实现对象。

d、CompletionService接口,共享任务执行结果。

一般在使用ExecutorService接口,经过submit提交任务,并获得一个Future接口来获取任务结果,若是任务提交者和执行结果的使用者是程序的不一样部分,那就要把Future在不一样部分进行传递;而CompletionService就是解决这个问题,程序不一样部分能够共享CompletionService,任务提交后,执行结果能够经过take(阻塞),poll(非阻塞)来获取。

标准库提供的实现是 ExecutorCompletionService,在建立时,须要提供一个Executor接口的实现做为参数,用来实际执行任务。

例:多线程方式下载文件

[java] view plain copy

  1. public class FileDownloader{

  2. // 线程池

  3. private final ExecutorService executor = Executors.newFixedThreadPool(10);

  4. public boolean download(final URL url, final Path path){

  5. Future<Path> future = executor.submit(new Callable<Path>(){ //submit提交任务

  6. public Path call(){

  7. //这里就省略IOException的处理了

  8. InputStream is = url.openStream();

  9. Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);

  10. return path;

  11. });

  12. try{

  13. return future.get() !=null ? true : false;

  14. }<span style="font-family: Arial, Helvetica, sans-serif;">catch(InterruptedException | ExecutionException e){</span>

  15. return false;

  16. }

  17. }

  18. public void close(){//当再也不使用FileDownloader类的对象时,应该使用close方法关闭其中包含的ExecutorService接口的实现对象,不然虚拟机不会退出,占用内存不释放

  19. executor.shutdown();// 发出关闭请求,此时不会再接受新任务

  20. try{

  21. if(!executor.awaitTermination(3, TimeUnit.MINUTES)){// awaitTermination 来等待一段时间,使正在执行的任务或等待的任务有机会完成

  22. executor.shutdownNow();// 若是等待时间事后还有任务没完成,则强制结束

  23. executor.awaitTermination(1, TimeUnit.MINUTES);// 再等待一段时间,使被强制结束的任务完成必要的清理工做

  24. }

  25. }catch(InterruptedException e){

  26. executor.shutdownNow();

  27. Thread.currentThread().interrupt();

  28. }

  29. }

  30. }


public class FileDownloader{



   // 线程池



   private final ExecutorService executor = Executors.newFixedThreadPool(10);



   public boolean download(final URL url, final Path path){



   Future<Path> future = executor.submit(new Callable<Path>(){ //submit提交任务



     public Path call(){



         //这里就省略IOException的处理了



         InputStream is = url.openStream();



         Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);



         return path;



     });



     try{



         return future.get() !=null ? true : false;



     }<span style="font-family: Arial, Helvetica, sans-serif;">catch(InterruptedException | ExecutionException e){</span>



         return false;



     }



   }



   public void close(){//当再也不使用FileDownloader类的对象时,应该使用close方法关闭其中包含的ExecutorService接口的实现对象,不然虚拟机不会退出,占用内存不释放



     executor.shutdown();// 发出关闭请求,此时不会再接受新任务



     try{



         if(!executor.awaitTermination(3, TimeUnit.MINUTES)){// awaitTermination 来等待一段时间,使正在执行的任务或等待的任务有机会完成



           executor.shutdownNow();// 若是等待时间事后还有任务没完成,则强制结束



           executor.awaitTermination(1, TimeUnit.MINUTES);// 再等待一段时间,使被强制结束的任务完成必要的清理工做



         }



     }catch(InterruptedException e){



         executor.shutdownNow();



         Thread.currentThread().interrupt();



     }



   }



}

10、Java SE 7 新特性

对java.util.concurrent包进行更新,增长了新的轻量级任务执行框架fork/join和多阶段线程同步工具。

一、轻量级任务执行框架fork/join

这个框架的目的主要是更好地利用底层平台上的多核和多处理器来进行并行处理。

经过分治算法或map/reduce算法来解决问题。

fork/join 类比于 map/reduce。

fork操做是把一个大的问题划分为若干个较小的问题,划分过程通常为递归,直到能够直接进行计算的粒度适合的子问题;子问题在结算后,能够获得整个问题的部分解

join操做收集子结果,合并,获得完整解,也多是 递归进行的。

相对通常的线程池实现,F/J框架的优点在任务的处理方式上。在通常线程池中,一个线程因为某些缘由没法运行,会等待;而在F/J,某个子问题因为等待另一个子问题的完成而没法继续运行,那么处理该子问题的线程会主动寻找其余还没有运行的子问题来执行。这种方式减小了等待时间,提升了性能。

为了F/J能高效,在每一个子问题视线中应避免使用synchronized或其余方式进行同步,也不该使用阻塞式IO或过多访问共享变量。在理想状况下,每一个子问题都应值进行CPU计算,只使用每一个问题的内部对象,惟一的同步应只发生在子问题和建立它的父问题之间。(这彻底就是Hadoop的MapReduce嘛)

a、ForkJoinTask类:表示一个由F/J框架执行的任务,该类实现了Future接口,能够按照Future接口的方式来使用。(表示任务)

fork(),异步方式启动任务的执行。

join(),等待任务完成并返回执行结果。

在建立本身的任务时,最好不要直接继承自ForkJoinTask,而是继承其子类,RecuriveTask或RecursiveAction,前者能够返回结果,后者不行。

b、ForkJoinPool类:表示任务执行,实现了ExecutorService接口,除了能够执行ForkJoinTask,也能够执行Callable和Runnable。(任务执行)

执行任务的两大类:

第一类:execute、invoke或submit方法:直接提交任务。

第二类:fork():运行ForkJoinTask在执行过程当中的子任务。

通常做法是表示整个问题的ForkJoinTask用第一类提交,执行过程当中产生的子任务不须要处理,ForkJoinPool会负责子任务执行。

例:查找数组中的最大值

[java] view plain copy

  1. private static class MaxValueTask extends RecursiveTask<Long>{

  2. private final long[] array;

  3. private final int start;

  4. private final int end;

  5. MaxValueTask(long[] array, int start, int end){

  6. this.array = array;

  7. this.start = start;

  8. this.end = end;

  9. }

  10. //compute是RecursiveTask的主方法

  11. protected long compute(){

  12. long max = Long.MIN_VALUE;

  13. if(end - start < RANG_LENGTH){//寻找最大值

  14. for(int i = start; i<end;i++{

  15. if(array[i] > max){

  16. max = array[i];

  17. }

  18. }

  19. }else{// 二分任务

  20. int mid = (start + end) /2;

  21. MaxValueTask lowTask = new MaxValueTask(array, start , mid);

  22. MaxValueTask highTask = new MaxValueTask(array, mid, end);

  23. lowTask.fork();// 异步启动任务

  24. highTask.fork();

  25. max = Math.max(max, lowTask.join());//等待执行结果

  26. max = Math.max(max, highTask.join();

  27. }

  28. return max;

  29. }

  30. public Long calculate(long[] array){

  31. MaxValueTask task = new MaxValueTask(array, 0 , array.length);

  32. Long result = forkJoinPool.invoke(task);

  33. return result;

  34. }

  35. }


private static class MaxValueTask extends RecursiveTask<Long>{



   private final long[] array;



   private final int start;



   private final int end;



   MaxValueTask(long[] array, int start, int end){



     this.array = array;



     this.start = start;



     this.end = end;



   }



   //compute是RecursiveTask的主方法



   protected long compute(){



     long max = Long.MIN_VALUE;



     if(end - start < RANG_LENGTH){//寻找最大值



         for(int i = start; i<end;i++{



           if(array[i] > max){



               max = array[i];



           }



         }



     }else{// 二分任务



         int mid = (start + end) /2;



         MaxValueTask lowTask = new MaxValueTask(array, start , mid);



         MaxValueTask highTask = new MaxValueTask(array, mid, end);



         lowTask.fork();// 异步启动任务



         highTask.fork();



         max = Math.max(max, lowTask.join());//等待执行结果



         max = Math.max(max, highTask.join();



     }



     return max;



   }



   public Long calculate(long[] array){



     MaxValueTask task = new MaxValueTask(array, 0 , array.length);



     Long result = forkJoinPool.invoke(task);



     return result;



   }



}

注:这个例子是示例,但从性能上说直接对整个数组顺序比较效率高,毕竟多线程所带来的额外开销过大。

在实际中,F/J框架发挥做用的场合不少,好比在一个目录包含的全部文本中搜索某个关键字,能够每一个文件建立一个子任务。

若是相关的功能能够用递归和分治来解决,就适合F/J。

二、多阶段线程同步工具

Phaser类是Java SE 7中新增的一个使用同步工具,功能和灵活性比倒数闸门和循环屏障要强不少。

在F/J框架中的子任务之间要进行同步时,应优先考虑Phaser。

Phaser把多个线程写做执行的任务划分红多个阶段(phase),编程时要明确各个阶段的任务,每一个阶段均可以有任意个参与者,线程能够随时注册并参与到某个阶段,当一个阶段中全部线程都成功完成以后,Phaser的onAdvance()被调用,能够经过覆盖添加自定义处理逻辑(相似循环屏障的使用的Runnable接口),而后Phaser类会自动进入下个阶段。如此循环,知道Phaser再也不包含任何参与者。

Phaser建立后,初始阶段编号为0,构造函数中指定初始参与个数。

register(),bulkRegister(),动态添加一个或多个参与者。

arrive(),某个参与者完成任务后调用

arriveAndDeregister(),任务完成,取消本身的注册。

arriveAndAwaitAdvance(),本身完成等待其余参与者完成。,进入阻塞,直到Phaser成功进入下个阶段。

awaitAdvance()、awaitAdvanceInterruptibly(),等待phaser进入下个阶段,参数为当前阶段的编号,后者能够设置超时和处理中断请求。

另外,Phaser的一个重要特征是多个Phaser能够组成树形结构,Phaser提供了构造方法来指定当前对象的父对象;当一个子对象参与者>0,会自动注册到父对象中;当=0,自动解除注册。

例:从指定网址,下载img标签的照片

阶段一、处理网址对应的html文本,和抽取img的连接;二、建立图片下载子线程,主线程等待;三、子线程下载图片,主线程等待;四、任务完成退出

[java] view plain copy

  1. public class WebPageImageDownloader{

  2. private final Phaser phaser = new Phaser(1);//初始参与数1,表明主线程。

  3. public void download(URL url, final Path path) throws IOException{

  4. String content = getContent(url);//得到HTML文本,省略。

  5. List<URL> imageUrls = extractImageUrls(content);//得到图片连接,省略。

  6. for(final URL imageUrl : imageUrls){

  7. phaser.register();//子线程注册

  8. new Thread(){

  9. public void run(){

  10. phaser.arriveAndAwaitAdvance();//第二阶段的等待,等待进入第三阶段

  11. try{

  12. InputStream is = imageUrl.openStream();

  13. File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);

  14. }catch(IOException e){

  15. e.printStackTrace():

  16. }finally{

  17. phaser.arriveAndDeregister();//子线程完成任务,退出。

  18. }

  19. }

  20. }.start();

  21. }

  22. phaser.arriveAndAwaitAdvance();//第二阶段等待,子线程在注册

  23. phaser.arriveAndAwaitAdvance();//第三阶段等待,子线程在下载

  24. phaser.arriveAndDeregister();//全部线程退出。

  25. }

  26. }


public class WebPageImageDownloader{



   private final Phaser phaser = new Phaser(1);//初始参与数1,表明主线程。



   public void download(URL url, final Path path) throws IOException{



     String content = getContent(url);//得到HTML文本,省略。



     List<URL> imageUrls = extractImageUrls(content);//得到图片连接,省略。



     for(final URL imageUrl : imageUrls){



         phaser.register();//子线程注册



         new Thread(){



           public void run(){



               phaser.arriveAndAwaitAdvance();//第二阶段的等待,等待进入第三阶段



               try{



                 InputStream is = imageUrl.openStream();



                 File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);



               }catch(IOException e){



                 e.printStackTrace():



               }finally{



                 phaser.arriveAndDeregister();//子线程完成任务,退出。



               }



           }



       }.start();



     }



     phaser.arriveAndAwaitAdvance();//第二阶段等待,子线程在注册



     phaser.arriveAndAwaitAdvance();//第三阶段等待,子线程在下载



     phaser.arriveAndDeregister();//全部线程退出。



   }



}

11、ThreadLocal类

java.lang.ThreadLocal,线程局部变量,把一个共享变量变为一个线程的私有对象。不一样线程访问一个ThreadLocal类的对象时,锁访问和修改的事每一个线程变量各自独立的对象。经过ThreadLocal能够快速把一个非线程安全的对象转换成线程安全的对象。(同时也就不能达到数据传递的做用了)。

a、get()和set()分别用来获取和设置当前线程中包含的对象的值。

b、remove(),删除。

c、initialValue(),初始化值。若是没有经过set方法设置值,第一个调用get,会经过initValue来获取对象的初始值。

ThreadLoacl的通常用法,建立一个ThreadLocal的匿名子类并覆盖initalValue(),把ThreadLoacl的使用封装在另外一个类中

[java] view plain copy

  1. public class ThreadLocalIdGenerator{

  2. private static final ThreadLocal<IdGenerator> idGenerator = new ThreadLocal<IdGenerator>(){

  3. protected IdGenerator initalValue(){

  4. return new IdGenerator();//IdGenerator 是个初始int value =0,而后getNext(){ return value++}

  5. }

  6. };

  7. public static int getNext(){

  8. return idGenerator.get().getNext();

  9. }

  10. }


public class ThreadLocalIdGenerator{



   private static final ThreadLocal<IdGenerator> idGenerator = new ThreadLocal<IdGenerator>(){



         protected IdGenerator initalValue(){



           return new IdGenerator();//IdGenerator 是个初始int value =0,而后getNext(){ return value++}



         }



     };



   public static int getNext(){



     return idGenerator.get().getNext();



   }



}

ThreadLoal的另一个做用是建立线程惟一的对象,在有些状况,一个对象在代码中各个部分都须要用到,传统作法是把这个对象做为参数在代码间传递,若是使用这个对I昂的代码都在同一个线程,能够封装在ThreadLocal中。

如:在多线程中,生成随机数

java.util.Random会带来竞争问题,java.util.concurrent.ThreadLocalRandom类提供多线程下的随机数声场,底层是ThreadLoacl。

总结:多线程开发中应该优先使用高层API,若是没法知足,使用java.util.concurrent.atomic和java.util.concurrent.locks包提供的中层API,而synchronized和volatile,以及wait,notify和notifyAll等低层API 应该最后考虑。

相关文章
相关标签/搜索