Java中建立线程主要有三种方式:html
其他详略,自查网络java
synchronized关键字之内存中的一个对象做为锁(互斥锁),获取到这个对象的线程能够执行synchronized内部的代码,执行完毕才放弃锁web
public class T { private int count = 10; private Object o = new Object(); public void m() { synchronized(o) { //任何线程要执行下面的代码,必须先拿到o的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }
专门建立一个无用的对象做为锁显得浪费,能够直接以当前对象做为锁面试
public class T { private int count = 10; public void m() { synchronized(this) { //任何线程要执行下面的代码,必须先拿到this的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }
public class T { private int count = 10; public synchronized void m() { //等同于在方法的代码执行时要synchronized(this) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } }
下面代码m方法与mm方法的锁定效果同样api
public class T { private static int count = 10; public synchronized static void m() { //这里等同于synchronized(yxxy.c_004.T.class) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void mm() { synchronized(T.class) { //考虑一下这里写synchronized(this)是否能够? // 不能够,静态方法不能访问非静态对象 count --; } } }
下面的代码能够演示到多线程运行时出现的重复问题,若取消synchronized关键字的注释,能够避免这个问题数组
public class T implements Runnable { private int count = 10; public /*synchronized*/ void run() { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void main(String[] args) { T t = new T(); for(int i=0; i<5; i++) { new Thread(t, "THREAD" + i).start(); } } }
一个synchronized代码块是做为原子性操做的,总体不可分缓存
缘由是只有synchronized方法执行才须要申请锁,其余方法不须要申请锁,二者互不干扰(比喻:A上厕所锁了门与来洗手盆洗手的B没有关系)安全
public class T { public synchronized void m1() { System.out.println(Thread.currentThread().getName() + " m1 start..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m1 end"); } public void m2() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 "); } public static void main(String[] args) { T t = new T(); /*new Thread(()->t.m1(), "t1").start(); new Thread(()->t.m2(), "t2").start();*/ new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); /* new Thread(new Runnable() { @Override public void run() { t.m1(); } }); */ } }
面试有关小知识点:银行代码固然须要加锁,写操做须要加锁那么读操做须要加锁吗?网络
一个线程已经拥有某个对象的锁,再次申请相同的锁的时候仍然会获得该对象的锁session
即已取得锁的当前线程再申请获取同一个锁是可行的
也就是说synchronized得到的锁是可重入的
下面代码m1内调用m2,是能够执行的
import java.util.concurrent.TimeUnit; public class T { synchronized void m1() { System.out.println("m1 start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } m2(); } synchronized void m2() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m2"); } }
因此,在并发处理的过程当中,有异常要多加当心,可能会发生不一致的状况。
好比,在一个web app处理过程当中,多个servlet线程共同访问同一个资源,这时若是异常处理不合适,在第一个线程中抛出异常,其余线程就会进入同步代码区,有可能会访问到异常产生时第一个线程未修改完的数据
所以要很是当心的处理同步业务逻辑中的异常
当可能抛出异常能够catch来避免上述错误(如回滚数据操做)
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while(true) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if(count == 5) { int i = 1/0; //此处抛出异常,锁将被释放,要想不被释放,能够在这里进行catch,而后让循环继续 System.out.println(i); } } } public static void main(String[] args) { T t = new T(); Runnable r = new Runnable() { @Override public void run() { t.m(); } }; new Thread(r, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(r, "t2").start(); } }
最基本原理:
线程1须要按顺序获取锁A和锁B来执行,线程2须要按顺序获取锁B和锁A来执行,当线程1获取了锁A未获取锁B时线程2并发,线程2执行获取了锁B,此时就发生死锁,线程1没法获取锁B,线程2也没法获取锁A来继续执行,程序卡死
下面程序能够模拟死锁
public class T { Object a = new Object(); Object b = new Object(); public void m1() { synchronized (a) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b) { System.out.println("success1"); } } } public void m2() { synchronized (b) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { System.out.println("success2"); } } } public static void main(String[] args) { T t = new T(); new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); } }
synchronized同步代码块中的代码越少越好
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m1() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //业务逻辑中只有下面这句须要sync,这时不该该给整个方法上锁 count ++; //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } void m2() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //业务逻辑中只有下面这句须要sync,这时不该该给整个方法上锁 //采用细粒度的锁,可使线程争用时间变短,从而提升效率 synchronized(this) { count ++; } //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
锁定某对象o,若是o的属性发生改变,不影响锁的使用,可是若是o变成另一个对象,则锁定的对象发生改变
应该避免将锁定对象的引用变成另外的对象
当锁定对象改变后,多个线程间获取的锁对象就有可能不同了,使得同步代码块失效
下面代码说明了这个问题,锁定对象改变了,线程t2进入同步代码块执行,但若锁定对象不变,线程t2将不能进入同步代码块执行
import java.util.concurrent.TimeUnit; public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //启动第一个线程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //建立第二个线程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //锁对象发生改变,因此t2线程得以执行,若是注释掉这句话,线程2将永远得不到执行机会 t2.start(); } }
不要以字符串常量做为锁定对象,因为String类型的特殊性(常量池),表面上变量名不一样的两个String对象可能指向的是同一个地址
在下面的程序中,m1和m2其实锁定的是同一个对象
这种状况还会发生比较诡异的现象,好比你用到了一个类库,在该类库中代码锁定了字符串“Hello”,可是你读不到源码,因此你在本身的代码中也锁定了"Hello",这时候就有可能发生很是诡异的死锁阻塞,由于你的程序和你用到的类库不经意间使用了同一把锁
public class T { String s1 = "Hello"; String s2 = "Hello"; void m1() { synchronized(s1) { } } void m2() { synchronized(s2) { } } }
volatile关键字,能够使一个变量在多个线程间可见
如A、B线程都用到同一个变量,Java默认是A线程中保留一份copy,这样若是B线程修改了该变量,则A线程未必知道
在下面的代码中,running是存在于堆内存的t对象中,当线程t1开始运行的时候,会把running值从内存中读到(拷贝到)t1线程的工做区,在运行过程当中直接使用这个copy,并不会每次都去读取堆内存,这样,当主线程修改running的值以后,t1线程感知不到,因此不会中止运行
使用volatile关键字,将会强制全部线程都去堆内存中读取变量running的值
能够阅读这篇文章进行更深刻的理解
http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
下面代码能够说明volatile关键字的效果,当running变量能够被其余线程改变时,while代码块才会结束并打印"m end"
import java.util.concurrent.TimeUnit; public class T { /*volatile*/ boolean running = true; //对比一下有无volatile的状况下,整个程序运行结果的区别 void m() { System.out.println("m start"); while(running) { /* try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ // 若上面几行代码取消注释,有可能在cpu空闲时去读取堆内存中的running变量的值从而结束while代码块 } System.out.println("m end!"); } public static void main(String[] args) { T t = new T(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }
volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
运行下面的程序输出count的值会出现不到100000的结果,缘由是由于虽然volatile能够保证每一个线程读取count的值是同步的,但不能保证/要求线程写入count的值时必定是根据此时的count值+1的操做
即读操做和写操做是复合操做,不构成原子性操做
解决这个问题可使用synchronized关键字修饰m方法便可以保证可见性和原子性同步
import java.util.ArrayList; import java.util.List; public class T { volatile int count = 0; void m() { for (int i = 0; i < 10000; i++) count++; } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
AtomicXXX类自己方法都是原子性的,但不能保证多个方法连续调用是原子性的,即多个AtomicXXX类的方法连续被调用的时候不能保证原子性
import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class T { /*volatile*/ //int count = 0; AtomicInteger count = new AtomicInteger(0); /*synchronized*/ void m() { for (int i = 0; i < 10000; i++) //if count.get() < 1000 count.incrementAndGet(); //count++ } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
线程安全有两个方面:执行控制和内存可见
synchronized关键字解决的是执行控制的问题,它会阻止其它线程获取当前对象的监控锁,这样就使得当前对象中被synchronized关键字保护的代码块没法被其它线程访问,也就没法并发执行。更重要的是,synchronized还会建立一个内存屏障,内存屏障指令保证了全部CPU操做结果都会直接刷到主存中,从而保证了操做的内存可见性,同时也使得先得到这个锁的线程的全部操做,都happens-before于随后得到这个锁的线程的操做。
volatile关键字解决的是内存可见性的问题,会使得全部对volatile变量的读写都会直接刷到主存,即保证了变量的可见性。这样就能知足一些对变量可见性有要求而对读取顺序没有要求的需求。
使用volatile关键字仅能实现对原始变量(如boolen、 short 、int 、long等)操做的原子性,但须要特别注意, volatile不能保证复合操做的原子性,即便只是i++,实际上也是由多个原子操做组成:read i; inc; write i,假如多个线程同时执行i++,volatile只能保证他们操做的i是同一块内存,但依然可能出现写入脏数据的状况。
volatile的性能比synchronized的性能要高
能够用volatile的时候尽可能避免使用synchronized
ReentrantLock能够用于替代synchronized,前者能够完成后者可完成的功能且更灵活(但性能没有明显区别)
但使用ReentrantLock必须手动释放锁,使用synchronized锁定的话若是遇到异常,JVM会自动释放锁,可是ReentrantLock必须手动释放锁,所以常常在finally块中进行锁的释放
ReentrantLock是Java并发包中互斥锁,它有公平锁和非公平锁两种实现方式
详细可见https://www.jianshu.com/p/155260c8af6c
下面的代码能够演示使用重入锁完成使m1方法与m2方法互斥的功能,
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock2 { Lock lock = new ReentrantLock(); // 建立锁 void m1() { try { lock.lock(); //synchronized(this) 申请并锁定锁lock for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 解锁 } } void m2() { lock.lock(); // 申请并锁定锁lock System.out.println("m2 ..."); lock.unlock(); // 解锁 } public static void main(String[] args) { ReentrantLock2 rl = new ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
使用ReentrantLock能够调用tryLock方法尝试进行锁定
线程能够根据tryLock方法的返回值判断是否锁定并以此决定是否继续等待或执行其余操做
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock进行尝试锁定,无论锁定与否,方法都将继续执行 * 能够根据tryLock的返回值来断定是否锁定 * 也能够指定tryLock的时间,因为tryLock(time)抛出异常,因此要注意unclock的处理,必须放到finally中 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLock3 rl = new ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
调用lockInterruptibly方法申请锁的线程能够对线程interrupt方法作出响应
即若主线程想调用interrupt方法打断某一个线程的执行,一般来说经过lock方法申请锁却由于未申请到锁而阻塞的线程不能被打断,而经过lockInterruptibly方法申请锁的线程阻塞时能够对主线程调用interrupt方法做出响应而被打断
简言之:lockInterruptibly方法的做用是使在一个线程在等待锁的过程当中,能够被打断
下面的代码能够演示当线程t1霸占锁使得线程t2一直等待得到锁而阻塞,t2使用lockInterruptibly方法代替lock方法来声明申请锁,主线程能够经过调用线程t2对象的interrupt方法打断线程t2的执行——“别等了,哥们”
线程t1在获取/申请锁的过程中不响应中断(lock方法),而t2在获取/申请锁的过程响应中断(lockInterruptibly方法)
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class ReentrantLock4 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(()->{ try { lock.lock(); System.out.println("t1 start"); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); System.out.println("t1 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t1.start(); Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //能够对interrupt()方法作出响应 System.out.println("t2 start"); TimeUnit.SECONDS.sleep(5); System.out.println("t2 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt(); //打断线程2的等待 } }
ReentrantLock在建立时能够被指定为公平锁,而synchronized是非公平锁
公平锁:某个对象的锁对全部线程都是公平的,先到先得。每次加锁前都会检查队列里面有没有排队等待的线程,有则排队等待,没有才会尝试获取锁。
非公平锁:当一个线程采用非公平锁这种方式获取锁时,该线程会首先去尝试获取锁而不是等待。若是没有后去成功,那么它才会去队列里面等待。
下面代码使用公平锁,结果应该是两个线程交替打印,若建立重入锁时没有传入true,则打印结果没法预测
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock5 extends Thread { private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果 public void run() { for(int i=0; i<100; i++) { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"得到锁"); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLock5 rl=new ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }
ThreadLocal是使用空间换时间,synchronized是使用时间换空间
简言之:ThreadLocal中存放的数据每一个线程独立一份,各个线程之间的ThreadLocal互不影响
当单个线程能够独立维护一个变量,不须要或不该该被其余线程修改这个变量,则可使用TreadLocal,好比在Hibernate中session就存在与ThreadLocal中,避免synchronized的使用
注意:ThreadLocal可能会致使内存泄漏
下面代码结果应该是线程t1经过get方法不能获取Person对象(获取了null)
import java.util.concurrent.TimeUnit; public class ThreadLocal2 { //volatile static Person p = new Person(); static ThreadLocal<Person> tl = new ThreadLocal<>(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tl.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person()); }).start(); } static class Person { String name = "zhangsan"; } }
详见https://blog.csdn.net/cselmu9/article/details/51366946
两个特色
在任何状况下,单例类永远只有一个实例存在
单例须要有能力为整个系统提供这一惟一实例
下面的程序使用了静态内置类的方式来实现单例模式
能够实现懒加载并且线程安全
import java.util.Arrays; public class Singleton { private Singleton() { System.out.println("single"); } private static class Inner { private static Singleton s = new Singleton(); } public static Singleton getSingle() { return Inner.s; } public static void main(String[] args) { Thread[] ths = new Thread[200]; for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ System.out.println(Singleton.getSingle()); }); } Arrays.asList(ths).forEach(o->o.start()); } }
使用早期的同步容器以及Collections.synchronized**方法的不足之处,本文省略不表,请阅读:
http://blog.csdn.net/itm_hadf/article/details/7506529*
使用新的并发容器
http://xuganggogo.iteye.com/blog/321630
有N张火车票,每张票都有一个编号,同时有10个窗口对外售票,写一个模拟程序
实现一
如下是最基本的实现,弊端为ArrayList的remove方法不是同步的,可能重复remove同一张票;判断剩余票数的代码也不是同步的,可能卖出超过10000张票
/** * 有N张火车票,每张票都有一个编号 * 同时有10个窗口对外售票 * 请写一个模拟程序 * * 分析下面的程序可能会产生哪些问题? * 重复销售?超量销售? * * @author 马士兵 */ import java.util.ArrayList; import java.util.List; public class TicketSeller1 { static List<String> tickets = new ArrayList<>(); static { for(int i=0; i<10000; i++) tickets.add("票编号:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { System.out.println("销售了--" + tickets.remove(0)); } }).start(); } } }
使用Vector或者Collections.synchronizedXXX这种同步容器来实现同步
仍然存在弊端,由于判断条件的代码和操做容器的代码分离了,虽然Vector的remove方法是同步的,但判断是否还有票的代码不是同步的,可能售出超过10000张票(若打开while块内线程sleep的语句,能够模拟这种出错,缘由是多个线程进入到了while块同时执行remove操做了)
即从判断到操做容器这两步中间可能会出现并发问题,由于两步操做不一样步
解决方法呼之欲出了,使判断(size>0)和操做容器(remove方法)的代码同步,即实现原子性
import java.util.Vector; import java.util.concurrent.TimeUnit; public class TicketSeller2 { static Vector<String> tickets = new Vector<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { /*try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("销售了--" + tickets.remove(0)); } }).start(); } } }
经过在判断和操做这两步外部加synchronized块实现原子性
但这种实现的效率并不特别高
import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; public class TicketSeller3 { static List<String> tickets = new LinkedList<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { synchronized(tickets) { if(tickets.size() <= 0) { break; } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("销售了--" + tickets.remove(0)); } } }).start(); } } }
使用并发容器ConcurrentLinkedQueue(并发链表队列)提升并发性
其中poll方法是同步的,操做代码并无加锁,但能够实现高效率线程安全操做
原理是队列的特性:队列内通常不容许有值为null的元素(即便容器容许null值也应该做判断避免传入null值),若没有元素则返回null,先取出后判断,并非先判断后取出,因此完美解决了判断和操做两部分的同步问题
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; public class TicketSeller4 { static Queue<String> tickets = new ConcurrentLinkedQueue<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { String s = tickets.poll(); if(s == null) { break; } else { System.out.println("销售了--" + s); } } }).start(); } } }
可见并发容器的优点所在,下文将介绍各个经常使用的并发容器类
如下程序能够演示高并发下不一样并发Map容器的添加效率
小关键:这里的CountDownLatch主要是为了限制主线程等待添加操做完成后再继续执行
import java.util.Arrays; import java.util.Hashtable; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; public class T01_ConcurrentMap { public static void main(String[] args) { //Map<String, String> map = new ConcurrentHashMap<>(); Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发而且排序 //Map<String, String> map = new Hashtable<>(); //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX //TreeMap Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ for(int j=0; j<10000; j++) { map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); } }
上文代码执行结果能够发现执行结果中在并发的状况下使用ConcurrentHashMap的效率比HashTable高,缘由是HashTable在添加操做时会锁定整个容器,只响应一个线程的添加操做;而ConcurrentHashMap则是将容器分段(默认16段)(存疑,1.8以后底层改变,CAS替代分段锁,具体自查),并发操做时只锁定其中一段
在高并发且须要对元素排序的状况下,可使用ConcurrentSkipListMap提升效率
关于SkipList(跳跃列表)可参阅http://blog.csdn.net/sunxianghuang/article/details/52221913
跳表能够理解为“多链链表”,是一种用空间换时间的数据结构,经过在每一个节点中增长了向前的指针,从而提高查找的效率
写时复制容器(copy on write),当添加/删除等修改元素操做发生时,将逐一复制原列表值到新容器,修改操做(即写的操做)完成后再将原容器的引用调整至新容器,从而实现读取数据的线程安全
主要是读写分离的思想:在写的过程当中引用并未指向新容器,因此读操做仍然在旧容器中读取,待写操做完成后才更新新容器的引用
CopyOnWriteArrayList的实现原理是,在一个线程开始遍历(建立Iterator对象)时,内部会建立一个“快照”数组,遍历基于这个快照Iterator进行,在遍历过程当中这个快照数组不会改变,也就不会抛出
ConcurrentModificationException
。若是在遍历的过程当中有其余线程尝试改变数组的内容,就会拷贝一份新的数据进行变动,然后面再来访问这个数组的线程,看到的就是变动过的数组。
其实CopyOnWirteArrayList主要就是解决了并发环境下修改操做和对容器遍历操做的冲突(修改时另外一线程开始遍历容器会抛出ConcurrentModificationException)
能够查阅https://juejin.im/post/5aaa2ba8f265da239530b69e
有关ConcurrentModificationException的拓展资料查阅https://juejin.im/post/5a992a0d6fb9a028e46e17ef
多线程环境下,写时效率低,读时效率高,适合写少读多的环境
下面的程序能够演示CopyOnWirteArrayList的读/写的效率
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; public class T02_CopyOnWriteList { public static void main(String[] args) { List<String> lists = //new ArrayList<>(); //这个会出并发问题! //new Vector(); new CopyOnWriteArrayList<>(); Random r = new Random(); Thread[] ths = new Thread[100]; for(int i=0; i<ths.length; i++) { Runnable task = new Runnable() { @Override public void run() { for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; ths[i] = new Thread(task); } runAndComputeTime(ths); System.out.println(lists.size()); } static void runAndComputeTime(Thread[] ths) { long s1 = System.currentTimeMillis(); Arrays.asList(ths).forEach(t->t.start()); Arrays.asList(ths).forEach(t->{ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long s2 = System.currentTimeMillis(); System.out.println(s2 - s1); } }
在并发环境下用得较多的容器
是无界队列,容量取决于内存
下面的代码演示基本使用ConcurrentLinkedQueue,poll和peek方法的区别是poll方法将返回并移除元素,peek方法是获取元素但不移除
另外有ConcurrentLinkedDeque并发双向链表队列
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strs = new ConcurrentLinkedQueue<>(); for(int i=0; i<10; i++) { strs.offer("a" + i); //add } System.out.println(strs); System.out.println(strs.size()); System.out.println(strs.poll()); System.out.println(strs.size()); System.out.println(strs.peek()); System.out.println(strs.size()); //双端队列Deque } }
BlockingQueue(阻塞式队列)实际上是Java对生产者/消费者模式的实现
其中LinkedBlockingQueue是使用链表实现的阻塞式无界队列,put方法在容器已满时将等待,而take方法在容器为空时将等待(下文例题中有实现这种生产者/消费者模式的容器的程序)
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class T05_LinkedBlockingQueue { static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); static Random r = new Random(); public static void main(String[] args) { new Thread(() -> { for (int i = 0; i < 100; i++) { try { strs.put("a" + i); //若是满了,就会等待 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { for (;;) { try { System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //若是空了,就会等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }
底层使用数组实现,容量有限制,offer方法能够向容器添加元素,并返回是否添加成功的布尔值(若容器已满则不添加元素并返回false,而使用add方法则会抛出异常)
且offer方法能够传入参数设置时间间隔,在此段时间间隔内不断添加,超时则放弃添加操做并返回false
而put方法在容器已满时将阻塞
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class T06_ArrayBlockingQueue { static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); static Random r = new Random(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { strs.put("a" + i); } strs.put("aaa"); //满了就会等待,程序阻塞 //strs.add("aaa"); //strs.offer("aaa"); //strs.offer("aaa", 1, TimeUnit.SECONDS); System.out.println(strs); } }
TransferQueue有transfer方法(将元素放入容器),这个方法的做用是当多个消费者线程等待获取队列中的元素时,此时生产者再生产一个元素,不放入队列中,而是能够直接交给消费者线程,但使用了transfer方法,若没有消费者线程等待获取元素,使用transfer方法的线程将阻塞直至消费者线程出现
能够提升并发效率
import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); /*new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ //strs.transfer("aaa"); strs.put("aaa"); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
是一种特殊的TransferQueue,特殊在容量为0
不能调用add方法,只能调用put方法(将阻塞等待消费者线程)
若没有消费者线程等待获取容器中的值,则会抛出异常IllegalStateException:Queue full
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; public class T09_SynchronusQueue { //容量为0 public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strs = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.put("aaa"); //阻塞等待消费者消费 //strs.add("aaa"); System.out.println(strs.size()); } }
一样,DelayQueue也是一个线程安全的无界队列
特色是当队列中的元素到达延迟时间时才能被取出,队列元素会按照最终执行时间(阻塞结束后到被执行的时间)在队列中进行排序,头部为最终执行时间最长的元素
可使用延迟队列来执行定时任务
DelayQueue是一个无界阻塞队列,该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。若是延迟都尚未期满,则队列没有头部,而且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即便没法使用 take 或 poll 移除未到期的元素,也不会将这些元素做为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不容许使用 null 元素。
下面的代码演示了使用DelayQueue,其中DelayQueue存放的元素须要实现Delayed接口以实现元素延迟计时等功能(以下MyTask类实现了Delay接口)
import java.util.Calendar; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class T07_DelayQueue { static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); } } }
最顶层定义
Executor是一个接口,内部定义了execute(Runnable command)方法,实现类须要实现这个方法编写须要实现的具体任务
简言之,Executor的实现类是用于执行某个任务的
import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor{ public static void main(String[] args) { new T01_MyExecutor().execute(()->System.out.println("hello executor")); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } }
ExecutorService是一个继承Executor的接口,除继承execute方法外,还定义了一系列其余关于执行任务的方法(如submit方法)
其中submit方法能够传入Callable
与Runnable接口很是类似,Runnable接口内定义了run方法,而Callable接口内部定义了call方法
与Runnable接口的区别在于,Runnable接口的run方法没有返回值且不能抛出异常,而Callable接口定义的call方法能够有返回值且能够抛出异常
Executors是简化使用Executor接口常见实现类的工具类
其中定义了一些使用的方法好比建立线程池等
具体API自查
若是并发的线程数量不少,而且每一个线程都是执行一个时间很短的任务就结束了,这样频繁建立线程就会大大下降系统的效率,由于频繁建立线程和销毁线程须要时间
使用线程池能够达到线程的重用,提升性能
下面的程序演示了建立一个固定线程数量的线程池并直接向线程池派发任务并执行(把任务扔进线程池中,线程池中的数个线程将抢着执行任务)
其中shutdown方法的做用是关闭线程池,若线程仍在执行任务则等待线程所有空闲再关闭,有shutdownNow方法能够强制关闭线程池
isTerminated方法做用是检测此时线程池内任务是否被执行完毕(所有线程空闲)
isShutdown方法注意是检测该线程池是否执行了shutdown方法
在Java中,若是须要设定代码执行的最长时间,即超时,能够用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实 现,能够来进行异步计算。
Future模式能够这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务。期间我本身能够去作任何想作的事情。一段时 间以后,我就即可以从Future那儿取出结果。就至关于下了一张定货单,一段时间后能够拿着提订单来提货,这期间能够干别的任何事情。其中Future 接口就是定货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。
Future接口提供方法来检测任务是否被执行完,等待任务执行完得到结果,也能够设置任务执行的超时时间
这个设置超时的方法就是实现Java程序执行超时的关键
在Future接口中声明了5个方法
FutureTask实现了Future接口
不直接构造Future对象,也可使用ExecutorService.submit方法来得到Future对象,submit方法即支持以 Callable接口类型,也支持Runnable接口做为参数,具备很大的灵活性
下面的程序演示了两种获取Future对象的方式并经过讲task对象传入线程构造函数开启线程使用,其中FutureTask的泛型表示获取值的类型
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(f.get()); System.out.println(f.isDone()); } }
下面程序使用FutureTask配合固定线程数的线程池实现了并行计算1-200000范围内得素数并对比了串行计算和并行计算的效率
其中不均分计算范围是由于数值越大计算量越大
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() throws Exception { List<Integer> r = getPrime(startPos, endPos); return r; } } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for(int i=start; i<=end; i++) { if(isPrime(i)) results.add(i); } return results; } }
最基本的线程池,线程数固定
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
当任务须要时就自动建立新线程(不限制线程数量),线程默认超过60s空闲则销毁
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T08_CachedPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); System.out.println(service); } }
这种线程池的做用是保证多个任务顺序执行
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i<5; i++) { final int j = i; service.execute(()->{ System.out.println(j + " " + Thread.currentThread().getName()); }); } } }
下面程序演示了使用固定频率执行任务
具体API自查
import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); } }
通常来说每一个线程维护一个任务队列,每一个线程只执行分配给自身的任务,而使用工做窃取线程池当有空闲线程时空闲线程将主动窃取另外线程的任务来执行
WorkStealingPool底层是由ForkJoinPool线程池实现的
注意产生的是守护线程
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); System.out.println(Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //daemon service.execute(new R(2000)); //因为产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }
任务的切分(切分子任务到多小)和合并能够由开发者指定,而ForkJoinPool将根据切分和合并的规则来建立线程并由ForkJoinPool维护线程
能够参阅:https://www.jianshu.com/p/8d7e3cc892cf
下面程序演示了计算长度为1000000的,内部存放随机数值的数组的和
其中使用static inner class是为了防止包可见致使命名冲突
import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } /* static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } */ static class AddTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }
其实全部线程池的底层都是使用ThreadPoolExecutor做为支撑的,能够本身自定义线程池,指定不一样的特定策略(最小/最大线程数、使用什么任务队列和执行策略等)
默认使用多线程并行计算
具体自查
import java.util.ArrayList; import java.util.List; import java.util.Random; public class T14_ParallelStreamAPI { public static void main(String[] args) { List<Integer> nums = new ArrayList<>(); Random r = new Random(); for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000)); //System.out.println(nums); long start = System.currentTimeMillis(); nums.forEach(v->isPrime(v)); long end = System.currentTimeMillis(); System.out.println(end - start); //使用parallel stream api start = System.currentTimeMillis(); nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime); end = System.currentTimeMillis(); System.out.println(end - start); } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } }
/** * 曾经的面试题:(淘宝?) * 实现一个容器,提供两个方法,add,size * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束 * * 分析下面这个程序,能完成这个功能吗? * @author mashibing */ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer1 { List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer1 c = new MyContainer1(); new Thread(() -> { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); new Thread(() -> { while(true) { if(c.size() == 5) { break; } } System.out.println("t2 结束"); }, "t2").start(); } }
上述程序将不如咱们所愿当size=5时break,而是不断添加到10次仍不打印"t2 结束",且程序在打印10次"add"后仍然没法退出
缘由是size对线程t2不可见(一直为0),能够在lists变量声明volatile关键字解决
解法一
在lists变量声明volatile关键字
但这种解法有几个问题:
使用wait和notify方法解决
锁对象调用wait和notify方法的做用
- wait方法:会让当前线程进入等待,直到另外一个线程调用同一个对象的notify()或notifyAll()方法
- 调用wait方法时将放弃锁/控制权
- notify方法:唤醒因调用这个对象wait()方法而阻塞的线程
- 调用notify方法时将不会放弃锁/控制权
- 当执行notify方法时,会唤醒一个处于等待该对象锁的线程,而后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁
- 故应该尽可能在线程调用notify/notifyAll()后,当即退出临界区,即notify方法后面避免出现更多耗时的代码
注意:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer3 { //添加volatile,使t2可以获得通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer3 c = new MyContainer3(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2启动"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }
使用Latch(门闩)替代wait和notify方法来进行通知,好处是通讯方式简单,同时也能够指定等待时间
CountDownLatch类位于java.util.concurrent包下,利用它能够实现相似计数器的功能
好比有一个任务A,它要等待其余4个任务执行完毕以后才能执行,此时就能够利用CountDownLatch来实现这种功能了
其中使用await和countdown方法替代wait和notify方法
- CountDownLatch(int count) //构造一个用给定计数初始化的 CountDownLatch。
- void await() // 使当前线程在锁存器倒计数至零以前一直等待,除非线程被中断
- boolean await(long timeout, TimeUnit unit) // 使当前线程在锁存器倒计数至零以前一直等待,除非线程被中断或超出了指定的等待时间
- void countDown() // 递减锁存器的计数,若是计数到达零,则释放全部等待的线程
- long getCount() // 返回当前计数
- String toString() // 返回标识此锁存器及其状态的字符串
CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行
简言之,CountDownLatch不是锁,而是一个对全部线程可见的、有令线程等待的功能的计数器(就像是一位严格的母亲,要求儿子必须等5位大人所有动筷才能动筷,其中每位大人第一次动筷母亲内心的计数器就减一)
这种方式不须要加锁,性能比上面的解法要好,但我的疑问在是否会出现解法一中的t2执行时机不许确的弊端(经验证,在t2结束代码前使t2等待5秒时将出现这种弊端)
当不涉及同步,只涉及线程通讯的时候,用synchronized + wait/notify(加锁)就显得过重了,应该考虑使用CountDownLatch/CyclicBarrier/Semaphore代替
能够查看https://www.cnblogs.com/dolphin0520/p/3920397.html
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class MyContainer5 { // 添加volatile,使t2可以获得通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer5 c = new MyContainer5(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2启动"); if (c.size() != 5) { try { latch.await(); //也能够指定等待时间 //latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); for (int i = 0; i < 10; i++) { c.add(new Object()); System.out.println("add " + i); if (c.size() == 5) { // 打开门闩,让t2得以执行 latch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); } }
其中使用while而不是if来执行wait方法的缘由是:当使用if判断时只会在阻塞前判断一次,阻塞结束直接放行不做二次判断,但此时若实际条件被其余线程改变成应该再次阻塞,则该线程放行执行会出现错误(如容器原本已满,if判断为容器已满,阻塞put方法,若两个生产者线程均进入到put方法阻塞,当容器变为未满状态时唤醒两个阻塞线程直接放行,某一个生产者线程操做使容器已满,则put方法实际应该被阻塞,但if语句再也不判断,后来执行的生产者线程继续生产,从而使容器溢出发生错误)
注意:同步代码块/synchronized块即便线程在代码块内被wait,唤醒后依然须要获取锁后才能继续执行,不然继续阻塞等待锁
而while循环判断这个条件,能够解决这个问题
Effective Java一书中说明了wait方法绝大部分都是配合while来使用的
另外,使用notifyAll方法而不是notify方法的缘由是notify方法只能唤醒一个,可能唤醒的是同类线程(生产者唤醒生产者使得while判断后两个生产者均wait())使整个程序出现假死
Effective Java一书中也说明了应该永远使用notifyAll方法,不使用notify方法
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 可以支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想一想为何用while而不是用if? try { this.wait(); // effective java 放弃锁,使得两个生产者线程都可进入同步代码块执行到这一行,先获取锁的先往下执行,未获取锁的暂时等待 } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消费者线程进行消费 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生产者进行生产 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
使用ReentrantLock做为锁且配合Condition对象使用能够精确唤醒/使等待具体的生产者/消费者线程
其中Condition是依靠“谁在这个方法执行到消费者.await方法来判断谁是消费者”的,并不是直接指定哪一个线程是生产者/消费者(如get方法内某线程执行到了consumer.await()则这个线程就被认为是消费者了)
线程的生产者/消费者之分是由线程内部执行什么方法来定义的,并不是线程之间有所不同,全部线程都是同样的,只是执行的方法不同而区分为生产者和消费者
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 可以支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * * 使用Lock和Condition来实现 * 对比两种方式,Condition的方式能够更加精确的指定哪些线程被唤醒 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { //想一想为何用while而不是用if? producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消费者线程进行消费 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生产者进行生产 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }