锁像synchronized同步块同样,是一种线程同步机制。让自Java 5开始,java.util.concurrent.locks包提供了另外一种方式实现线程同步机制——Lock。那么问题来了既然均可以经过synchronized来实现同步访问了,那么为何还须要提供Lock呢?这个问题咱们下面讨论java.util.concurrent.locks包中包含了一些锁的实现,因此咱们不须要重复造轮子了。可是咱们仍然须要去了解怎样使用这些锁,且了解这些实现背后的理论也是颇有用处的。html
本文将从下面几个方面介绍java
在学习或者使用Java的过程当中进程会遇到各类各样的锁的概念:公平锁、非公平锁、自旋锁、可重入锁、偏向锁、轻量级锁、重量级锁、读写锁、互斥锁等待。下边总结了对各类锁的解释算法
公平锁是指多个线程在等待同一个锁时按照申请锁的前后顺序来获取锁。相反的非公平锁是指多个线程获取锁的顺序并非按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。编程
公平锁的好处是等待锁的线程不会饿死,可是总体效率相对低一些;非公平锁的好处是总体效率相对高一些,可是有些线程可能会饿死或者说很早就在等待锁,但要等好久才会得到锁。其中的缘由是公平锁是严格按照请求所的顺序来排队得到锁的,而非公平锁时能够抢占的,即若是在某个时刻有线程须要获取锁,而这个时候恰好锁可用,那么这个线程会直接抢占,而这时阻塞在等待队列的线程则不会被唤醒。缓存
对于Java ReentrantLock
而言,经过构造函数指定该锁是不是公平锁,默认是非公平锁。例:new ReentrantLock(true)是公平锁
对于Synchronized
而言,也是一种非公平锁。因为其并不像ReentrantLock
是经过AQS的来实现线程调度,因此并无任何办法使其变成公平锁。多线程
也叫递归锁,是指在外层函数得到锁以后,内层递归函数仍然能够获取到该锁。即线程能够进入任何一个它已经拥有锁的代码块。在JAVA环境下 ReentrantLock 和synchronized 都是可重入锁。可重入锁最大的做用是避免死锁。并发
具体区别下文阐述。app
在Java中,自旋锁是指尝试获取锁的线程不会当即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减小线程上下文切换的消耗,缺点是循环会消耗CPU。框架
JDK6中已经变为默认开启自旋锁,而且引入了自适应的自旋锁。自适应意味着自旋的时间不在固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。自旋是在轻量级锁中使用的,在重量级锁中,线程不使用自旋。dom
这三种锁是指锁的状态,而且是针对Synchronized
。在Java 5后经过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是经过对象监视器在对象头中的字段来代表的。以下图
这里的无锁和偏向锁在对象头的倒数第三bit中分别采用0和1标记
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度
独占锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。
ReentrantLock,可重入锁,是一种递归无阻塞的同步机制。它能够等同于synchronized的使用,可是ReentrantLock提供了比synchronized更强大、灵活的锁机制,能够减小死锁发生的几率。
ReentrantLock还提供了公平锁和非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,不然为非公平锁。
通常使用以下方式获取锁
ReentrantLock lock = new ReentrantLock(); lock.lock();
lock方法:
public void lock() { sync.lock(); }
Sync为Sync为ReentrantLock里面的一个内部类,它继承AQS。关于AQS的相关知识能够自行补充一下。Sync有两个子类分别是FairSync(公平锁)和 NofairSync(非公平锁)。默认使用NofairSync,下面是ReentrantLock的构造类
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
下边是一个简单的重入锁使用案例
1 public class ReentrantLockDemo implements Runnable { 2 public static final Lock lock = new ReentrantLock(); 3 public static int i = 0; 4 5 @Override 6 public void run() { 7 for (int j = 0; j < 1000000; j++) { 8 lock.lock(); 9 try { 10 i++; 11 } finally { 12 lock.unlock(); 13 } 14 } 15 } 16 17 public static void main(String[] args) throws InterruptedException { 18 ReentrantLockDemo demo = new ReentrantLockDemo(); 19 Thread t1 = new Thread(demo); 20 Thread t2 = new Thread(demo); 21 t1.start(); 22 t2.start(); 23 t1.join(); 24 t2.join(); 25 System.out.println(i); 26 } 27 }
上述代码的第8~12行,使用了重入锁保护了临界区资源i,确保了多线程对i的操做。输出结果为2000000。能够看到与synchronized相比,重入锁必选手动指定在什么地方加锁,什么地方释放锁,因此更加灵活。
要注意是,再退出临界区的时候,须要释放锁,不然其余线程就没法访问临界区了。这里为啥叫可重入锁是由于这种锁是能够被同一个线程反复进入的。好比上述代码的使用锁部分能够写成这样
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
在这种状况下,一个线程联连续两次获取同一把锁,这是容许的。可是须要注意的是,若是同一个线程屡次获的锁,那么在释放是也要释放相同次数的锁。若是释放的锁少了,至关于该线程依然持有这个锁,那么其余线程就没法访问临界区了。释放的次数多了也会抛出java.lang.IllegalMonitorStateException异常。
除了使用上的灵活,ReentrantLock还提供了一些高级功能如中断。限时等待等。
对用synchrozide来讲,若是一个线程在等待,那么结果只有两种状况,要么得到这把锁继续执行下去要么一直等待下去。而使用重入锁,提供了另一种可能,那就是线程能够被中断。也就是说在这里能够取消对锁的请求。这种状况对解决死锁是有必定帮组的。
下面代码产生了一个死锁,可是咱们能够经过锁的中断,解决这个死锁。
public class ReentrantLockDemo implements Runnable { //重入锁ReentrantLock public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public ReentrantLockDemo(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); Thread.sleep(500); lock2.lockInterruptibly(); System.out.println("this is thread 1"); } else { lock2.lockInterruptibly(); Thread.sleep(500); lock1.lockInterruptibly(); System.out.println("this is thread 2"); } } catch (Exception e) { //e.printStackTrace(); } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock();//释放锁 } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":线程退出"); } } public static void main(String[] args) throws InterruptedException { ReentrantLockDemo r1 = new ReentrantLockDemo(1); ReentrantLockDemo r2 = new ReentrantLockDemo(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); Thread.sleep(1000); //t2线程被中断,放弃锁申请,释放已得到的lock2,这个操做使得t1线程顺利得到lock2继续执行下去; //若没有此段代码,t2线程没有中断,那么会出现t1获取lock1,请求lock2,而t2获取lock2,请求lock1的相互等待死锁状况 t2.interrupt(); } }
线程t1和t2启动后,t1先占用lock1而后在请求lock2;t2先占用lock2,而后请求lock1,所以很容易造成线程之间的相互等待。着这里使用的是ReenTrantLock提供了一种可以中断等待锁的线程的机制,经过lock.lockInterruptibly()来实现这个机制。
最后因为t2线程被中断,t2会放弃对lock1的1请求,同时释放lock2。这样可使t1继续执行下去,结果以下图
除了等待通知之外,避免死锁还有另一种方式,那就是限时等待。经过给定一个等待时间,让线程自动放弃。
public class TimeLockDemo implements Runnable { private static ReentrantLock reentrantLock = new ReentrantLock(); @Override public void run() { try { if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("Gets lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (reentrantLock.isHeldByCurrentThread()){ reentrantLock.unlock(); } } } public static void main(String[] args) { TimeLockDemo demo1 = new TimeLockDemo(); TimeLockDemo demo2 = new TimeLockDemo(); Thread t1 = new Thread(demo1); Thread t2 = new Thread(demo2); t1.start(); t2.start(); } }
tryLock有两个参数,一个表示等待时长,另外一个表示计时单位。在这里就是经过lock.tryLock(5,TimeUnit.SECONDS)来设置锁申请等待限时,此例就是限时等待5秒获取锁。在这里的锁请求最多为5秒,若是超过5秒未得到锁请求,则会返回fasle,若是成功得到锁就会返回true。此案例中第一个线程会持有锁长达6秒,因此另一个线程没法在5秒内得到锁 故案例输出结果为Gets lock failed
另外tryLock方法也能够不带参数之直接运行,在这种状况下,当前线程会尝试得到锁,若是锁并未被其余线程占用,则申请锁直接成功,当即返回true,不然当前线程不会进行等待,而是当即返回false。这种模式不会引发线程等待,所以也不会产生死锁。
下边展现了这种使用方式
public class ReentrantLockDemo implements Runnable { //重入锁ReentrantLock public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public ReentrantLockDemo(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { Thread.sleep(1000); } finally { lock1.unlock(); } } if (lock2.tryLock()) { try { System.out.println("thread " + Thread.currentThread().getId() + " 执行完毕"); return; } finally { lock2.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { Thread.sleep(1000); } finally { lock2.unlock(); } } if (lock1.tryLock()) { try { System.out.println("thread " + Thread.currentThread().getId() + " 执行完毕"); return; } finally { lock1.unlock(); } } } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ReentrantLockDemo r1 = new ReentrantLockDemo(1); ReentrantLockDemo r2 = new ReentrantLockDemo(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } }
使用了tryLock后,线程不会傻傻的等待,而是不一样的尝试获取锁,所以,只要执行足够长的时间,线程老是会得到全部须要的资源。从而正常执行。下边展现了运行结果。表示两个线程运行都正常。
在大多数状况下。锁的申请都是非公平的。也就是说系统只是会从等待锁的队列里随机挑选一个,因此不能保证其公平性。可是公平锁的实现成本很高,性能也相对低下。所以若是没有特别要求,也不须要使用公平锁。
对上边ReentrantLock几个重要的方法整理以下。
Conditon和ReentrantLock的组合可让线程在合适的时间等待,或者在某一个特定的时间获得通知,继续执行。在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通讯方式,Condition均可以实现,这里注意,Condition是被绑定到Lock上的,要建立一个Lock的Condition必须用newCondition()方法。
await()
的返回条件基础上增长了超时响应,返回值表示当前剩余的时间,若是在nanosTimeout以前被唤醒,返回值 = nanosTimeout - 实际消耗的时间,返回值 <= 0表示超时;await()
的返回条件基础上增长了超时响应,与上一接口不一样的是能够自定义超时时间单位; 返回值返回true/false,在time以前被唤醒,返回true,超时返回false。使用案例以下
public class ConditionDemo { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入锁 final Lock lock = new ReentrantLock(); //第一个条件当屏幕上输出到3 final Condition reachThreeCondition = lock.newCondition(); //第二个条件当屏幕上输出到6 final Condition reachSixCondition = lock.newCondition(); //NumberWrapper只是为了封装一个数字,一边能够将数字对象共享,并能够设置为final //注意这里不要用Integer, Integer 是不可变对象 final NumberWrapper num = new NumberWrapper(); //初始化A线程 Thread threadA = new Thread(new Runnable() { @Override public void run() { //须要先得到锁 lock.lock(); try { System.out.println("threadA start write"); //A线程先输出前3个数 while (num.value <= 3) { System.out.println(num.value); num.value++; } //输出到3时要signal,告诉B线程能够开始了 reachThreeCondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待输出6的条件 while(num.value <= 6) { reachSixCondition.await(); } System.out.println("threadA start write"); //输出剩余数字 while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3输出完毕的信号 reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { lock.lock(); //已经收到信号,开始输出4,5,6 System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } //4,5,6输出完毕,告诉A线程6输出完了 reachSixCondition.signal(); } finally { lock.unlock(); } } }); //启动两个线程 threadB.start(); threadA.start(); } }
结果以下
这样看来,Condition和传统的线程通讯没什么区别,Condition的强大之处在于它能够为多个线程间创建不一样的Condition,下面引入API中的一段代码,加以说明。
class BoundedBuffer { final Lock lock = new ReentrantLock();//锁对象 final Condition notFull = lock.newCondition();//写线程条件 final Condition notEmpty = lock.newCondition();//读线程条件 final Object[] items = new Object[100];//缓存队列 int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是队列满了 notFull.await();//阻塞写线程 items[putptr] = x;//赋值 if (++putptr == items.length) putptr = 0;//若是写索引写到队列的最后一个位置了,那么置为0 ++count;//个数++ notEmpty.signal();//唤醒读线程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是队列为空 notEmpty.await();//阻塞读线程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是读索引读到队列的最后一个位置了,那么置为0 --count;//个数-- notFull.signal();//唤醒写线程 return x; } finally { lock.unlock(); } } }
这个示例中BoundedBuffer是一个固定长度的集合,这个在其put操做时,若是发现长度已经达到最大长度,那么要等待notFull信号才能继续put,若是获得notFull信号会像集合中添加元素,而且put操做会发出notEmpty的信号,而在其take方法中若是发现集合长度为空,那么会等待notEmpty的信号,接受到notEmpty信号才能继续take,同时若是拿到一个元素,那么会发出notFull的信号。
信号量(Semaphore)为多线程协做提供了更为强大的控制用法。不管是内部锁Synchronized仍是ReentrantLock,一次都只容许一个线程访问资源,而信号量能够多个线程访问同一资源。Semaphore是用来保护一个或者多个共享资源的访问,Semaphore内部维护了一个计数器,其值为能够访问的共享资源的个数。一个线程要访问共享资源,先得到信号量,若是信号量的计数器值大于1,意味着有共享资源能够访问,则使其计数器值减去1,再访问共享资源。若是计数器值为0,线程进入休眠。当某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,以前进入休眠的线程将被唤醒并再次试图得到信号量。
信号量的UML的类图以下,能够看出和ReentrantLock同样,Semaphore也包含了sync对象,sync是Sync类型;并且,Sync是一个继承于AQS的抽象类。Sync包括两个子类:"公平信号量"FairSync 和 "非公平信号量"NonfairSync。sync是"FairSync的实例",或者"NonfairSync的实例";默认状况下,sync是NonfairSync(即,默认是非公平信号量)
信号量主要提供了如下构造函数
Semaphore(int num) Semaphore(int num,boolean how)
这里,num指定初始许可计数。所以,它指定了一次能够访问共享资源的线程数。若是是1,则任什么时候候只有一个线程能够访问该资源。默认状况下,全部等待的线程都以未定义的顺序被授予许可。经过设置how为true,能够确保等待线程按其请求访问的顺序被授予许可。信号量的主要逻辑方法以下
// 今后信号量获取一个许可,在提供一个许可前一直将线程阻塞,不然线程被中断。 void acquire() // 今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。 void acquire(int permits) // 今后信号量中获取许可,在有可用的许可前将其阻塞。 void acquireUninterruptibly() // 今后信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。 void acquireUninterruptibly(int permits) // 返回此信号量中当前可用的许可数。 // 释放一个许可,将其返回给信号量。 void release() // 释放给定数目的许可,将其返回到信号量。 // 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。 boolean tryAcquire() // 仅在调用时此信号量中有给定数目的许可时,才今后信号量中获取这些许可。 boolean tryAcquire(int permits) // 若是在给定的等待时间内此信号量有可用的全部许可,而且当前线程未被中断,则今后信号量获取给定数目的许可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 若是在给定的等待时间内,此信号量有可用的许可而且当前线程未被中断,则今后信号量获取一个许可。
实例以下:这里咱们模拟10我的去银行存款,可是该银行只有两个办公柜台,有空位则上去存钱,没有空位则只能去排队等待。最后输出银行总额
public class SemaphoreThread { private int customer; public SemaphoreThread() { customer = 0; } /** * 银行存钱类 */ class Bank { private int account = 100; public int getAccount() { return account; } public void save(int money) { account += money; } } /** * 线程执行类,每次存10块钱 */ class NewThread implements Runnable { private Bank bank; private Semaphore semaphore; public NewThread(Bank bank, Semaphore semaphore) { this.bank = bank; this.semaphore = semaphore; } @Override public void run() { int tempCustomer = customer++; if (semaphore.availablePermits() > 0) { System.out.println("客户" + tempCustomer + "启动,进入银行,有位置当即去存钱"); } else { System.out.println("客户" + tempCustomer + "启动,进入银行,无位置,去排队等待等待"); } try { semaphore.acquire(); bank.save(10); System.out.println(tempCustomer + "银行余额为:" + bank.getAccount()); Thread.sleep(1000); System.out.println("客户" + tempCustomer + "存钱完毕,离开银行"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 创建线程,调用内部类,开始存钱 */ public void useThread() { Bank bank = new Bank(); // 定义2个新号量 Semaphore semaphore = new Semaphore(2); // 创建一个缓存线程池 ExecutorService es = Executors.newCachedThreadPool(); // 创建10个线程 for (int i = 0; i < 10; i++) { // 执行一个线程 es.submit(new Thread(new NewThread(bank, semaphore))); } // 关闭线程池 es.shutdown(); // 从信号量中获取两个许可,而且在得到许可以前,一直将线程阻塞 semaphore.acquireUninterruptibly(2); System.out.println("到点了,工做人员要吃饭了"); // 释放两个许可,并将其返回给信号量 semaphore.release(2); } public static void main(String[] args) { SemaphoreThread test = new SemaphoreThread(); test.useThread(); } }
ReentrantReadWriteLock是Lock的另外一种实现方式,咱们已经知道了ReentrantLock是一个排他锁,同一时间只容许一个线程访问,而ReentrantReadWriteLock容许多个读线程同时访问(也就是读操做),但不容许写线程和读线程、写线程和写线程同时访问。约束以下
相对于排他锁,提升了并发性。在实际应用中,大部分状况下对共享数据(如缓存)的访问都是读操做远多于写操做,这时ReentrantReadWriteLock可以提供比排他锁更好的并发性和吞吐量。
看一下官方案例
lass CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void processCachedData() { rwl.readLock().lock();//1 if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock();//2 rwl.writeLock().lock();//3 try { // Recheck state because another thread might have,acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // 在释放写锁以前经过获取读锁降级写锁(注意此时尚未释放写锁) rwl.readLock().lock();//4 } finally { // 释放写锁而此时已经持有读锁 rwl.writeLock().unlock();//5 } } try { use(data); } finally { rwl.readLock().unlock();//6 } } }
若是不使用锁降级功能,如先释放写锁,而后得到读锁,在这个get过程当中,可能会有其余线程竞争到写锁 或者是更新数据 则得到的数据是其余线程更新的数据,可能会形成数据的污染,即产生脏读的问题
1 public class ReadAndWriteLock { 2 private static ReentrantLock lock = new ReentrantLock(); 3 private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 4 private static Lock readLock = readWriteLock.readLock(); 5 private static Lock writeLock = readWriteLock.writeLock(); 6 7 public ReadAndWriteLock setValue(int value) { 8 this.value = value; 9 return this; 10 } 11 12 private int value; 13 14 public Object handleRead(Lock lock) throws InterruptedException { 15 try { 16 //模拟读操做 17 lock.lock(); 18 System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value); 19 Thread.sleep(1000); 20 return value; 21 } finally { 22 lock.unlock(); 23 } 24 } 25 26 public Object handleWrite(Lock lock, int index) throws InterruptedException { 27 try { 28 //模拟写操做 29 lock.lock(); 30 value = index; 31 Thread.sleep(1000); 32 System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value); 33 return value; 34 35 } finally { 36 lock.unlock(); 37 } 38 } 39 40 public static void main(String[] args) throws InterruptedException { 41 final ReadAndWriteLock demo = new ReadAndWriteLock(); 42 demo.setValue(0); 43 Runnable readRunnable = new Runnable() { 44 @Override 45 public void run() { 46 try { 47 //读锁 48 demo.handleRead(readLock); 49 //可重入锁 50 //demo.handleRead(lock); 51 52 } catch (InterruptedException e) { 53 e.printStackTrace(); 54 } 55 56 } 57 }; 58 59 Runnable writeRunnable = new Runnable() { 60 @Override 61 public void run() { 62 try { 63 //写锁 64 demo.handleWrite(readLock, (int) (Math.random() * 1000)); 65 //可重入锁 66 //demo.handleWrite(lock, (int) (Math.random() * 1000)); 67 } catch (InterruptedException e) { 68 e.printStackTrace(); 69 } 70 71 } 72 }; 73 ExecutorService exec = new ThreadPoolExecutor(0, 200, 74 0, TimeUnit.SECONDS, 75 new SynchronousQueue<Runnable>()); 76 ; 77 long startTime = System.currentTimeMillis(); 78 79 for (int i = 0; i < 18; i++) { 80 exec.execute(readRunnable); 81 } 82 83 for (int i = 0; i < 18; i++) { 84 exec.execute(writeRunnable); 85 } 86 exec.shutdown(); 87 exec.awaitTermination(60, TimeUnit.MINUTES); 88 long endTime = System.currentTimeMillis(); //获取结束时间 89 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); 90 91 } 92 }
在这里读线程彻底并行,而写会阻塞读。程序执行时间以下
将上述案例中的读写锁改为可重入锁,即将第行代码注释掉那么全部的读和写线程都必须相互等待,程序执行时间以下所示
CountDownLatch是java1.5版本以后util.concurrent提供的工具类。这里简单介绍一下CountDownLatch,能够将其当作是一个计数器,await()方法能够阻塞至超时或者计数器减至0,其余线程当完成本身目标的时候能够减小1,利用这个机制咱们能够将其用来作并发。 好比有一个任务A,它要等待其余4个任务执行完毕以后才能执行,此时就能够利用CountDownLatch来实现这种功能了。
CountDownLatch类只提供了一个构造器,该构造器接受一个整数做为参数,即当前这个计数器的计数个数 。
public CountDownLatch(int count) { }; //参数count为计数值
使用场景:好比对于马拉松比赛,进行排名计算,参赛者的排名,确定是跑完比赛以后,进行计算得出的,翻译成Java识别的预发,就是N个线程执行操做,主线程等到N个子线程执行完毕以后,在继续往下执行。
public class CountDownLatchTest { public static void main(String[] args){ int threadCount = 10; final CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println("线程" + Thread.currentThread().getId() + "开始出发"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getId() + "已到达终点"); latch.countDown(); } }).start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10个线程已经执行完毕!开始计算排名"); } }
结果以下
线程12开始出发
线程14开始出发
线程15开始出发
线程17开始出发
线程13开始出发
线程16开始出发
线程18开始出发
线程19开始出发
线程20开始出发
线程21开始出发
线程16已到达终点
线程13已到达终点
线程19已到达终点
线程18已到达终点
线程17已到达终点
线程14已到达终点
线程15已到达终点
线程12已到达终点
线程21已到达终点
线程20已到达终点
10个线程已经执行完毕!开始计算排名
CountDownLatch在并行化应用中也是比较经常使用。经常使用的并行化框架OpenMP中也是借鉴了这种思想。好比有这样的一个需求,在你淘宝订单的时候,这笔订单可能还须要查,用户信息,折扣信息,商家信息,商品信息等,用同步的方式(也就是串行的方式)流程以下。
设想一下这5个查询服务,平均每次消耗100ms,那么本次调用至少是500ms,咱们这里假设,在这个这五个服务其实并无任何数据依赖,谁先获取谁后获取均可以,那么咱们能够想办法并行化这五个服务。
这里可使用CountDownLatch来实现这个效果。
public class CountDownDemo { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 8; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException { // 新建一个为5的计数器 CountDownLatch countDownLatch = new CountDownLatch(5); OrderInfo orderInfo = new OrderInfo(); THREAD_POOL.execute(() -> { System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName()); orderInfo.setTenantInfo(new TenantInfo()); countDownLatch.countDown(); }); THREAD_POOL.execute(() -> { System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); countDownLatch.countDown(); }); countDownLatch.await(1, TimeUnit.SECONDS); System.out.println("主线程:" + Thread.currentThread().getName()); } }
创建一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行咱们的任务(生成用户信息,菜品信息等),最后利用await方法阻塞等待结果成功返回。
字面意思循环栅栏,栅栏就是一种障碍物。这里就是内存屏障。经过它能够实现让一组线程等待至某个状态以后再所有同时执行。叫作回环是由于当全部等待线程都被释放之后,CyclicBarrier能够被重用。CyclicBarrier比CountDownLatch 功能更强大一些,CyclicBarrier能够接受一个参数做为barrierAction。所谓barrierAction就是当计算器一次计数完成后,系统会执行的动做。CyclicBarrier强调的是n个线程,你们相互等待,只要有一个没完成,全部人都得等着。(这种思想在高性能计算最为常见,GPU计算中关于也有相似内存屏障的用法)。构造函数以下,其中parties表示计数总数,也就是参与的线程总数。
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
案例10我的去旅行,规定达到一个地点后才能继续前行.代码以下
class CyclicBarrierWorker implements Runnable { private int id; private CyclicBarrier barrier; public CyclicBarrierWorker(int id, final CyclicBarrier barrier) { this.id = id; this.barrier = barrier; } @Override public void run() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); System.out.println(id + " th people wait"); barrier.await(); // 你们等待最后一个线程到达 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public class TestCyclicBarrier { public static void main(String[] args) { int num = 10; CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { System.out.println("go on together!"); } }); for (int i = 1; i <= num; i++) { new Thread(new CyclicBarrierWorker(i, barrier)).start(); } } }
从上面输出结果能够看出,每一个线程执行本身的操做以后,就在等待其余线程执行操做完毕。当全部线程线程执行操做完毕以后,全部线程就继续进行后续的操做了。
参考资料
《Java高并发编程设计》
https://www.cnblogs.com/-new/p/7256297.html
https://www.cnblogs.com/dolphin0520/p/3923167.html