5.JDK并发包1 ......................................................................................................................1html
1.java
各类同步控制工具的使用 ..........................................................................................3编程
1.1. ReentrantLock .......................................................................................................3api
1.1.1. 可重入 ...........................................................................................................3数组
1.1.2. 可中断 ...........................................................................................................3缓存
1.1.3. 可限时 ...........................................................................................................3安全
1.1.4. 公平锁 ...........................................................................................................3多线程
1.2. Condition ...............................................................................................................3并发
1.2.1. 概述 ...............................................................................................................3dom
1.2.2. 主要接口 .......................................................................................................3
1.2.3. API详解 ..........................................................................................................3
1.3. Semaphore ............................................................................................................4
1.3.1. 概述 ...............................................................................................................4
1.3.2. 主要接口 .......................................................................................................4
1.4. ReadWriteLock ......................................................................................................4
1.4.1. 概述 ...............................................................................................................4
1.4.2. 访问状况 .......................................................................................................4
1.4.3. 主要接口 .......................................................................................................4
1.5. CountDownLatch...................................................................................................4
1.5.1. 概述 ...............................................................................................................5
1.5.2. 主要接口 .......................................................................................................5
1.5.3. 示意图 ...........................................................................................................5
1.6. CyclicBarrier ..........................................................................................................5
1.6.1. 概述 ...............................................................................................................5
1.6.2. 主要接口 .......................................................................................................5
1.6.3. 示意图 ...........................................................................................................6
1.7. LockSupport ..........................................................................................................6
1.7.1. 概述 ...............................................................................................................6
1.7.2. 主要接口 .......................................................................................................6
1.7.3. 与suspend()比较 ...........................................................................................6
1.7.4. 中断响应 .......................................................................................................6
1.8. ReentrantLock 的实现 ..........................................................................................6
1.8.1. CAS状态 .........................................................................................................6
1.8.2. 等待队列 .......................................................................................................6
1.8.3. park()..............................................................................................................7
并发容器及典型源码分析 .........................................................................................7
2.
2.1. 集合包装...............................................................................................................7
2.1.1. HashMap ........................................................................................................7
2.1.2. List ..................................................................................................................7
2.1.3. Set ..................................................................................................................7
2.2. ConcurrentHashMap .............................................................................................7
2.3. BlockingQueue ......................................................................................................7
2.4. ConcurrentLinkedQueue .......................................................................................8
1. 各类同步控制工具的使用
1.1. ReentrantLock
1.1.1. 可重入 单线程能够重复进入,但要重复退出
1.1.2. 可中断 lockInterruptibly()
1.1.3. 可限时 超时不能得到锁,就返回false,不会永久等待构成死锁
1.1.4. 公平锁
先来先得
public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);
1.2. Condition
1.2.1. 概述
相似于 Object.wait()和Object.notify() 与ReentrantLock结合使用
1.2.2. 主要接口
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
1.2.3. API详解 await()方法会使当前线程等待,同时释放当前锁,当其余线程中使用signal()时或者signalAll()方法时,线
程会从新得到锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很类似。
awaitUninterruptibly()方法与await()方法基本相同,可是它并不会再等待过程当中响应中断。
singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒全部在等待中的线程。这和Obej ct.notify()方法很相似。
目录
jdk中独占锁的实现除了使用关键字synchronized外,还可使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
public class ReentrantLockTest { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); for (int i = 1; i <= 3; i++) { lock.lock(); } for(int i=1;i<=3;i++){ try { } finally { lock.unlock(); } } } }
上面的代码经过lock()
方法先获取锁三次,而后经过unlock()
方法释放锁3次,程序能够正常退出。从上面的例子能够看出,ReentrantLock是能够重入的锁,当一个线程获取锁时,还能够接着重复获取屡次。在加上ReentrantLock的的独占性,咱们能够得出如下ReentrantLock和synchronized的相同点。
1.ReentrantLock和synchronized都是独占锁,只容许线程互斥的访问临界区。可是实现上二者不一样:synchronized加锁解锁的过程是隐式的,用户不用手动操做,优势是操做简单,但显得不够灵活。通常并发场景使用synchronized的就够了;ReentrantLock须要手动加锁和解锁,且解锁的操做尽可能要放在finally代码块中,保证线程正确释放锁。ReentrantLock操做较为复杂,可是由于能够手动控制加锁和解锁过程,在复杂的并发场景中能派上用场。
2.ReentrantLock和synchronized都是可重入的。synchronized由于可重入所以能够放在被递归执行的方法上,且不用担忧线程最后可否正确释放锁;而ReentrantLock在重入时要却确保重复获取锁的次数必须和重复释放锁的次数同样,不然可能致使其余线程没法得到该锁。
公平锁是指当锁可用时,在锁上等待时间最长的线程将得到锁的使用权。而非公平锁则随机分配这种使用权。和synchronized同样,默认的ReentrantLock实现是非公平锁,由于相比公平锁,非公平锁性能更好。固然公平锁能防止饥饿,某些状况下也颇有用。在建立ReentrantLock的时候经过传进参数true
建立公平锁,若是传入的是false
或没传参数则建立的是非公平锁
ReentrantLock lock = new ReentrantLock(true);
继续跟进看下源码
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
能够看到公平锁和非公平锁的实现关键在于成员变量sync
的实现不一样,这是锁实现互斥同步的核心。之后有机会咱们再细讲。
public class ReentrantLockTest { static Lock lock = new ReentrantLock(true); public static void main(String[] args) throws InterruptedException { for(int i=0;i<5;i++){ new Thread(new ThreadDemo(i)).start(); } } static class ThreadDemo implements Runnable { Integer id; public ThreadDemo(Integer id) { this.id = id; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } for(int i=0;i<2;i++){ lock.lock(); System.out.println("得到锁的线程:"+id); lock.unlock(); } } } }
咱们开启5个线程,让每一个线程都获取释放锁两次。为了能更好的观察到结果,在每次获取锁前让线程休眠10毫秒。能够看到线程几乎是轮流的获取到了锁。若是咱们改为非公平锁,再看下结果
线程会重复获取锁。若是申请获取锁的线程足够多,那么可能会形成某些线程长时间得不到锁。这就是非公平锁的“饥饿”问题。
当使用synchronized实现锁时,阻塞在锁上的线程除非得到锁不然将一直等待下去,也就是说这种无限等待获取锁的行为没法被中断。而ReentrantLock给咱们提供了一个能够响应中断的获取锁的方法lockInterruptibly()
。该方法能够用来解决死锁问题。
public class ReentrantLockTest { static Lock lock1 = new ReentrantLock(); static Lock lock2 = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new ThreadDemo(lock1, lock2));//该线程先获取锁1,再获取锁2 Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//该线程先获取锁2,再获取锁1 thread.start(); thread1.start(); thread.interrupt();//是第一个线程中断 } static class ThreadDemo implements Runnable { Lock firstLock; Lock secondLock; public ThreadDemo(Lock firstLock, Lock secondLock) { this.firstLock = firstLock; this.secondLock = secondLock; } @Override public void run() { try { firstLock.lockInterruptibly(); TimeUnit.MILLISECONDS.sleep(10);//更好的触发死锁 secondLock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); } finally { firstLock.unlock(); secondLock.unlock(); System.out.println(Thread.currentThread().getName()+"正常结束!"); } } } }
构造死锁场景:建立两个子线程,子线程在运行时会分别尝试获取两把锁。其中一个线程先获取锁1在获取锁2,另外一个线程正好相反。若是没有外界中断,该程序将处于死锁状态永远没法中止。咱们经过使其中一个线程中断,来结束线程间毫无心义的等待。被中断的线程将抛出异常,而另外一个线程将能获取锁后正常结束。
ReentrantLock还给咱们提供了获取锁限时等待的方法tryLock()
,能够选择传入时间参数,表示等待指定的时间,无参则表示当即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。咱们可使用该方法配合失败重试机制来更好的解决死锁问题。
public class ReentrantLockTest { static Lock lock1 = new ReentrantLock(); static Lock lock2 = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new ThreadDemo(lock1, lock2));//该线程先获取锁1,再获取锁2 Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//该线程先获取锁2,再获取锁1 thread.start(); thread1.start(); } static class ThreadDemo implements Runnable { Lock firstLock; Lock secondLock; public ThreadDemo(Lock firstLock, Lock secondLock) { this.firstLock = firstLock; this.secondLock = secondLock; } @Override public void run() { try { while(!lock1.tryLock()){ TimeUnit.MILLISECONDS.sleep(10); } while(!lock2.tryLock()){ lock1.unlock(); TimeUnit.MILLISECONDS.sleep(10); } } catch (InterruptedException e) { e.printStackTrace(); } finally { firstLock.unlock(); secondLock.unlock(); System.out.println(Thread.currentThread().getName()+"正常结束!"); } } } }
线程经过调用tryLock()
方法获取锁,第一次获取锁失败时会休眠10毫秒,而后从新获取,直到获取成功。第二次获取失败时,首先会释放第一把锁,再休眠10毫秒,而后重试直到成功为止。线程获取第二把锁失败时将会释放第一把锁,这是解决死锁问题的关键,避免了两个线程分别持有一把锁而后相互请求另外一把锁。
使用synchronized结合Object上的wait和notify方法能够实现线程间的等待通知机制。ReentrantLock结合Condition接口一样能够实现这个功能。并且相比前者使用起来更清晰也更简单。
Condition由ReentrantLock对象建立,而且能够同时建立多个
static Condition notEmpty = lock.newCondition(); static Condition notFull = lock.newCondition();
Condition接口在使用前必须先调用ReentrantLock的lock()方法得到锁。以后调用Condition接口的await()将释放锁,而且在该Condition上等待,直到有其余线程调用Condition的signal()方法唤醒线程。使用方式和wait,notify相似。
public class ConditionTest { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { lock.lock(); new Thread(new SignalThread()).start(); System.out.println("主线程等待通知"); try { condition.await(); } finally { lock.unlock(); } System.out.println("主线程恢复运行"); } static class SignalThread implements Runnable { @Override public void run() { lock.lock(); try { condition.signal(); System.out.println("子线程通知"); } finally { lock.unlock(); } } } }
阻塞队列是一种特殊的先进先出队列,它有如下几个特色
1.入队和出队线程安全
2.当队列满时,入队线程会被阻塞;当队列为空时,出队线程会被阻塞。
public class MyBlockingQueue<E> { int size;//阻塞队列最大容量 ReentrantLock lock = new ReentrantLock(); LinkedList<E> list=new LinkedList<>();//队列底层实现 Condition notFull = lock.newCondition();//队列满时的等待条件 Condition notEmpty = lock.newCondition();//队列空时的等待条件 public MyBlockingQueue(int size) { this.size = size; } public void enqueue(E e) throws InterruptedException { lock.lock(); try { while (list.size() ==size)//队列已满,在notFull条件上等待 notFull.await(); list.add(e);//入队:加入链表末尾 System.out.println("入队:" +e); notEmpty.signal(); //通知在notEmpty条件上等待的线程 } finally { lock.unlock(); } } public E dequeue() throws InterruptedException { E e; lock.lock(); try { while (list.size() == 0)//队列为空,在notEmpty条件上等待 notEmpty.await(); e = list.removeFirst();//出队:移除链表首元素 System.out.println("出队:"+e); notFull.signal();//通知在notFull条件上等待的线程 return e; } finally { lock.unlock(); } } }
public static void main(String[] args) throws InterruptedException { MyBlockingQueue<Integer> queue = new MyBlockingQueue<>(2); for (int i = 0; i < 10; i++) { int data = i; new Thread(new Runnable() { @Override public void run() { try { queue.enqueue(data); } catch (InterruptedException e) { } } }).start(); } for(int i=0;i<10;i++){ new Thread(new Runnable() { @Override public void run() { try { Integer data = queue.dequeue(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
ReentrantLock是可重入的独占锁。比起synchronized功能更加丰富,支持公平锁实现,支持中断响应以及限时等待等等。能够配合一个或多个Condition条件方便的实现等待通知机制。
1.3. Semaphore 1.3.1. 概述
共享锁 运行多个线程同时临界区
1.3.2. 主要接口
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit) public void release()
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/3 6:36 PM */ public class SemaphoreTest implements Runnable{ private Semaphore semaphore = new Semaphore(5);// 同步关键类,构造方法传入的数字是多少,则同一个时刻,只运行多少个进程同时运行制定代码 public static void main(String args[]) { SemaphoreTest semp = new SemaphoreTest(); ExecutorService exec = Executors.newFixedThreadPool(20); for(int i = 0;i<20;i++){ exec.submit(semp); } exec.shutdown(); } @Override public void run() { try { semaphore.acquire(); // 2 表示进入此代码,就会消耗2个通路,2个通路从6个中扣除 System.out.println(Thread.currentThread().getId() + ":doSomething start-" ); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + ":doSomething end-" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // release 放到 finally 中 } } }
1.4. ReadWriteLock
1.4.1. 概述
ReadWriteLock是JDK5中提供的读写分离锁
1.4.2. 访问状况
读-读不互斥:读读之间不阻塞。
读-写互斥:读阻塞写,写也会阻塞读。
写-写互斥:写写阻塞。
1.4.3. 主要接口
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
Java并发包中ReadWriteLock是一个接口,主要有两个方法,以下:
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。
Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
在具体讲解ReetrantReadWriteLock的使用方法前,咱们有必要先对其几个特性进行一些深刻学习了解。
什么是可重入锁,不可重入锁呢?"重入"字面意思已经很明显了,就是能够从新进入。可重入锁,就是说一个线程在获取某个锁后,还能够继续获取该锁,即容许一个线程屡次获取同一个锁。好比synchronized内置锁就是可重入的,若是A类有2个synchornized方法method1和method2,那么method1调用method2是容许的。显然重入锁给编程带来了极大的方便。假如内置锁不是可重入的,那么致使的问题是:1个类的synchornized方法不能调用本类其余synchornized方法,也不能调用父类中的synchornized方法。与内置锁对应,JDK提供的显示锁ReentrantLock也是能够重入的,这里经过一个例子着重说下可重入锁的释放须要的事儿。
package test; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test1 { public static void main(String[] args) throws InterruptedException { final ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); Thread t = new Thread(new Runnable() { @Override public void run() { lock.writeLock().lock(); System.out.println("Thread real execute"); lock.writeLock().unlock(); } }); lock.writeLock().lock(); lock.writeLock().lock(); t.start(); Thread.sleep(200); System.out.println("realse one once"); lock.writeLock().unlock(); } }
运行结果.png
从运行结果中,能够看到,程序并未执行线程的run方法,由此咱们可知,上面的代码会出现死锁,由于主线程2次获取了锁,可是却只释放1次锁,致使线程t永远也不能获取锁。一个线程获取多少次锁,就必须释放多少次锁。这对于内置锁也是适用的,每一次进入和离开synchornized方法(代码块),就是一次完整的锁获取和释放。
再次添加一次unlock以后的运行结果.png
要实现一个读写锁,须要考虑不少细节,其中之一就是锁升级和锁降级的问题。什么是升级和降级呢?ReadWriteLock的javadoc有一段话:
Can the write lock be downgraded to a read lock without allowing an intervening writer? Can a read lock be upgraded to a write lock, in preference to other waiting readers or writers?
翻译过来的结果是:在不容许中间写入的状况下,写入锁能够降级为读锁吗?读锁是否能够升级为写锁,优先于其余等待的读取或写入操做?简言之就是说,锁降级:从写锁变成读锁;锁升级:从读锁变成写锁,ReadWriteLock是否支持呢?让咱们带着疑问,进行一些Demo 测试代码验证。
/** *Test Code 1 **/ package test; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test1 { public static void main(String[] args) { ReentrantReadWriteLock rtLock = new ReentrantReadWriteLock(); rtLock.readLock().lock(); System.out.println("get readLock."); rtLock.writeLock().lock(); System.out.println("blocking"); } }
TestCode1 Result.png
结论:上面的测试代码会产生死锁,由于同一个线程中,在没有释放读锁的状况下,就去申请写锁,这属于锁升级,ReentrantReadWriteLock是不支持的。
/** *Test Code 2 **/ package test; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test2 { public static void main(String[] args) { ReentrantReadWriteLock rtLock = new ReentrantReadWriteLock(); rtLock.writeLock().lock(); System.out.println("writeLock"); rtLock.readLock().lock(); System.out.println("get read lock"); } }
TestCode2 Result.png
结论:ReentrantReadWriteLock支持锁降级,上面代码不会产生死锁。这段代码虽然不会致使死锁,但没有正确的释放锁。从写锁降级成读锁,并不会自动释放当前线程获取的写锁,仍然须要显示的释放,不然别的线程永远也获取不到写锁。
在使用ReetrantReadWriteLock实现锁机制前,咱们先看一下,多线程同时读取文件时,用synchronized实现的效果
package test; /** * * synchronized实现 * @author itbird * */ public class ReadAndWriteLockTest { public synchronized static void get(Thread thread) { System.out.println("start time:" + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行读操做……"); } System.out.println(thread.getName() + ":读操做完毕!"); System.out.println("end time:" + System.currentTimeMillis()); } public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { get(Thread.currentThread()); } }).start(); new Thread(new Runnable() { @Override public void run() { get(Thread.currentThread()); } }).start(); } }
让咱们看一下运行结果:
synchronized实现的效果结果.png
从运行结果能够看出,两个线程的读操做是顺序执行的,整个过程大概耗时200ms。
package test; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * * ReetrantReadWriteLock实现 * @author itbird * */ public class ReadAndWriteLockTest { public static void get(Thread thread) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.readLock().lock(); System.out.println("start time:" + System.currentTimeMillis()); for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行读操做……"); } System.out.println(thread.getName() + ":读操做完毕!"); System.out.println("end time:" + System.currentTimeMillis()); lock.readLock().unlock(); } public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { get(Thread.currentThread()); } }).start(); new Thread(new Runnable() { @Override public void run() { get(Thread.currentThread()); } }).start(); } }
让咱们看一下运行结果:
ReetrantReadWriteLock实现.png
从运行结果能够看出,两个线程的读操做是同时执行的,整个过程大概耗时100ms。
经过两次实验的对比,咱们能够看出来,ReetrantReadWriteLock的效率明显高于Synchronized关键字。
经过上面的测试代码,咱们也能够延伸得出一个结论,ReetrantReadWriteLock读锁使用共享模式,即:同时能够有多个线程并发地读数据。可是另外一个问题来了,写锁之间是共享模式仍是互斥模式?读写锁之间是共享模式仍是互斥模式呢?下面让咱们经过Demo进行一一验证吧。
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * * ReetrantReadWriteLock实现 * @author itbird * */ public class ReadAndWriteLockTest { public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static void main(String[] args) { //同时读、写 ExecutorService service = Executors.newCachedThreadPool(); service.execute(new Runnable() { @Override public void run() { readFile(Thread.currentThread()); } }); service.execute(new Runnable() { @Override public void run() { writeFile(Thread.currentThread()); } }); } // 读操做 public static void readFile(Thread thread) { lock.readLock().lock(); boolean readLock = lock.isWriteLocked(); if (!readLock) { System.out.println("当前为读锁!"); } try { for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行读操做……"); } System.out.println(thread.getName() + ":读操做完毕!"); } finally { System.out.println("释放读锁!"); lock.readLock().unlock(); } } // 写操做 public static void writeFile(Thread thread) { lock.writeLock().lock(); boolean writeLock = lock.isWriteLocked(); if (writeLock) { System.out.println("当前为写锁!"); } try { for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行写操做……"); } System.out.println(thread.getName() + ":写操做完毕!"); } finally { System.out.println("释放写锁!"); lock.writeLock().unlock(); } } }
运行结果:
运行结果.png
结论:读写锁的实现必须确保写操做对读操做的内存影响。换句话说,一个得到了读锁的线程必须能看到前一个释放的写锁所更新的内容,读写锁之间为互斥。
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * * ReetrantReadWriteLock实现 * @author itbird * */ public class ReadAndWriteLockTest { public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static void main(String[] args) { //同时写 ExecutorService service = Executors.newCachedThreadPool(); service.execute(new Runnable() { @Override public void run() { writeFile(Thread.currentThread()); } }); service.execute(new Runnable() { @Override public void run() { writeFile(Thread.currentThread()); } }); } // 读操做 public static void readFile(Thread thread) { lock.readLock().lock(); boolean readLock = lock.isWriteLocked(); if (!readLock) { System.out.println("当前为读锁!"); } try { for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行读操做……"); } System.out.println(thread.getName() + ":读操做完毕!"); } finally { System.out.println("释放读锁!"); lock.readLock().unlock(); } } // 写操做 public static void writeFile(Thread thread) { lock.writeLock().lock(); boolean writeLock = lock.isWriteLocked(); if (writeLock) { System.out.println("当前为写锁!"); } try { for (int i = 0; i < 5; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName() + ":正在进行写操做……"); } System.out.println(thread.getName() + ":写操做完毕!"); } finally { System.out.println("释放写锁!"); lock.writeLock().unlock(); } } }
运行结果:
运行结果.png
1.Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性
2.ReetrantReadWriteLock读写锁的效率明显高于synchronized关键字
3.ReetrantReadWriteLock读写锁的实现中,读锁使用共享模式;写锁使用独占模式,换句话说,读锁能够在没有写锁的时候被多个线程同时持有,写锁是独占的
4.ReetrantReadWriteLock读写锁的实现中,须要注意的,当有读锁时,写锁就不能得到;而当有写锁时,除了得到写锁的这个线程能够得到读锁外,其余线程不能得到读锁
1.5. CountDownLatch
1.5.1. 概述
倒数计时器 一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,每每还要进行各项设备、仪器的检查。 只有等全部检查完毕后,引擎才能点火。这种场景就很是适合使用CountDownLatch。它可使得点火线程 ,等待全部检查线程所有完工后,再执行
1.5.2. 主要接口
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
1.5.3 示意图
import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/2 5:45 PM */ public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try { //模拟检查任务 Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check cpmplete"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for(int i = 0;i<10;i++){ exec.submit(demo); } //等待检查 end.await(); //发射火箭 System.out.println("Fire!"); exec.shutdown(); } }
1.6. CyclicBarrier
1.6.1. 概述
循环栅栏 Cyclic意为循环,也就是说这个计数器能够反复使用。好比,假设咱们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,而后接着凑齐下一批10个线程
1.6.2. 主要接口
public CyclicBarrier(int parties, Runnable barrierAction)
barrierAction就是当计数器一次计数完成后,系统会执行的动做 await()
1.6.3. 示意图
import java.util.concurrent.CyclicBarrier; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/3 9:14 PM */ public class CyclicBarrierDemo { static class TaskThread extends Thread { CyclicBarrier barrier; public TaskThread(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep(1000); System.out.println(getName() + " 到达栅栏 A"); barrier.await(); System.out.println(getName() + " 冲破栅栏 A"); Thread.sleep(2000); System.out.println(getName() + " 到达栅栏 B"); barrier.await(); System.out.println(getName() + " 冲破栅栏 B"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { int threadNum = 5; CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 完成最后任务"); } }); for(int i = 0; i < threadNum; i++) { new TaskThread(barrier).start(); } } }
Thread-3 到达栅栏 A Thread-1 到达栅栏 A Thread-4 到达栅栏 A Thread-0 到达栅栏 A Thread-2 到达栅栏 A Thread-2 完成最后任务 Thread-2 冲破栅栏 A Thread-1 冲破栅栏 A Thread-3 冲破栅栏 A Thread-0 冲破栅栏 A Thread-4 冲破栅栏 A Thread-2 到达栅栏 B Thread-3 到达栅栏 B Thread-4 到达栅栏 B Thread-1 到达栅栏 B Thread-0 到达栅栏 B Thread-0 完成最后任务 Thread-0 冲破栅栏 B Thread-2 冲破栅栏 B Thread-1 冲破栅栏 B Thread-4 冲破栅栏 B Thread-3 冲破栅栏 B
1.7. LockSupport 1.7.1. 概述
提供线程阻塞原语 1.7.2. 主要接口
LockSupport.park(); LockSupport.unpark(t1);
1.7.3. 与suspend()比较 不容易引发线程冻结
1.7.4. 中断响应 可以响应中断,但不抛出异常。
中断响应的结果是,park()函数的返回,能够从Thread.interrupted()获得中断标志
import java.util.concurrent.locks.LockSupport; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/3 9:36 PM */ public class ThreadParkTest { public static void main(String[] args) { MyThread mt = new MyThread(); mt.setName("mt"); mt.start(); try { Thread.currentThread().sleep(10); mt.park(); Thread.currentThread().sleep(10000); mt.unPark(); Thread.currentThread().sleep(10000); mt.park(); } catch (InterruptedException e) { e.printStackTrace(); } } static class MyThread extends Thread { private boolean isPark = false; public void run() { System.out.println(" Enter Thread running....."); while (true) { if (isPark) { System.out.println(Thread.currentThread().getName() + "Thread is Park....."); LockSupport.park(); } //do something System.out.println(Thread.currentThread().getName() + ">> is running"); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void park() { isPark = true; } public void unPark() { isPark = false; LockSupport.unpark(this); System.out.println("Thread is unpark....."); } } }
2.3. BlockingQueue
阻塞队列
java.util.concurrent 包里的 BlockingQueue是一个接口, 继承Queue接口,Queue接口继承 Collection
BlockingQueue----->Queue-->Collection
图:
队列的特色是:先进先出(FIFO)
BlockingQueue 具备 4 组不一样的方法用于插入、移除以及对队列中的元素进行检查。若是请求的操做不能获得当即执行的话,每一个方法的表现也不一样。这些方法以下:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 | remove() |
poll() |
take() |
poll(time, unit) |
检查 | element() |
peek() |
不可用 | 不可用 |
四组不一样的行为方式解释:
1(异常)
若是试图的操做没法当即执行,抛一个异常。
2(特定值)
若是试图的操做没法当即执行,返回一个特定的值(经常是 true / false)。
3(阻塞)
若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行。
4(超时)
若是试图的操做没法当即执行,该方法调用将会发生阻塞,直到可以执行,但等待时间不会超过给定值。返回一个特定值以告知该操做是否成功(典型的是 true / false)。
不能向BlockingQueue插入一个空对象,不然会抛出NullPointerException,相应的实现类校验代码
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
BlockingQueue :不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用做指示 poll 操做失败的警惕值。
BlockingQueue: 能够是限定容量的。它在任意给定时间均可以有一个 remainingCapacity,超出此容量,便没法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 老是报告Integer.MAX_VALUE 的剩余容量。
BlockingQueue :实现主要用于生产者-使用者队列,但它另外还支持 Collection
接口。所以,举例来讲,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操做一般不 会有效执行,只能有计划地偶尔使用,好比在取消排队信息时。
BlockingQueue :实现是线程安全的。全部排队方法均可以使用内部锁或其余形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和removeAll)没有 必要自动执行,除非在实现中特别说明。所以,举例来讲,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操做来指示再也不添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种经常使用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
继承他的接口:
public interface BlockingDeque extends BlockingQueue, Deque 1.6新增
public interface TransferQueue extends BlockingQueue 1.7新增
BlockingQueue 一般用于一个线程生产对象,而另一个线程消费这些对象的场景。下图是对这个原理的阐述:
一个线程往里边放,另一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。若是该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。若是消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
public class BlockingQueueExample { public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000); } }
public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); Thread.sleep(1000); queue.put("2"); Thread.sleep(1000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class Consumer implements Runnable{ protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦建立了这样的缓存区,就不能再增长其容量。试图向已满队列中放入元素会致使操做受阻塞;试图从空队列中提取元素将致使相似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认状况下,不保证是这种排序。然而,经过将公平性 (fairness) 设置为 true 而构造的队列容许按照 FIFO 顺序访问线程。公平性一般会下降吞吐量,但也减小了可变性和避免了“不平衡性”
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");String string = queue.take();
Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。若是延迟都尚未期满,则队列没有头部,而且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即便没法使用 take 或 poll 移除未到期的元素,也不会将这些元素做为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不容许使用 null 元素
3. 链阻塞队列 LinkedBlockingQueue
LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(连接节点)对其元素进行存储。若是须要的话,这一链式结构能够选择一个上限。若是没有定义上限,将使用 Integer.MAX_VALUE 做为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在全部元素之中是放入时间最久的那个,而尾元素则是最短的那个。
BlockingQueue unbounded = new LinkedBlockingQueue();
BlockingQueue bounded = new LinkedBlockingQueue(1024);bounded.put("Value");
String value = bounded.take();
System.out.println(value);
System.out.println(unbounded.remainingCapacity()==Integer.MAX_VALUE);//true
PriorityBlockingQueue 类实现了 BlockingQueue 接口。
一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,而且提供了阻塞获取操做。虽然此队列逻辑上是无界的,可是资源被耗尽时试图执行 add 操做也将失败(致使OutOfMemoryError)。此类不容许使用 null 元素。依赖天然顺序的优先级队列也不容许插入不可比较的对象(这样作会致使抛出 ClassCastException)。
此类及其迭代器能够实现 Collection 和 Iterator 接口的全部可选 方法。iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。若是须要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。此外,可使用方法 drainTo 按优先级顺序移除 所有或部分元素,并将它们放在另外一个 collection 中。
在此类上进行的操做不保证具备同等优先级的元素的顺序。若是须要实施某一排序,那么能够定义自定义类或者比较器,比较器可以使用修改键断开主优先级值之间的联系。例如,如下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。要使用该类,则须要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只可以容纳单个元素。若是该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另外一个线程将该元素从队列中抽走。一样,若是该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另外一个线程向队列中插入了一条新的元素。
据此,把这个类称做一个队列显然是夸大其词了。它更多像是一个汇合点。
java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。
BlockingDeque 类是一个双端队列,在不可以插入元素时,它将阻塞住试图插入元素的线程;在不可以抽取元素时,它将阻塞住试图抽取的线程。
deque(双端队列) 是 "Double Ended Queue" 的缩写。所以,双端队列是一个你能够从任意一端插入或者抽取元素的队列。
一个基于已连接节点的、任选范围的阻塞双端队列。
可选的容量范围构造方法参数是一种防止过分膨胀的方式。若是未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地建立连接节点。
大多数操做都以固定时间运行(不计阻塞消耗的时间)。异常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操做,它们均以线性时间运行。
2.4. ConcurrentLinkedQueue
一个基于连接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。 新的元素插入到队列的尾部,队列获取操做从队列头部得到元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不容许使用 null 元素。
poll()
获取并移除此队列的头,若是此队列为空,则返回 null。
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,队列是否空?" + queue.isEmpty()); System.out.println("从队列中poll:" + queue.poll()); System.out.println("pool后,队列是否空?" + queue.isEmpty()); }
offer是往队列添加元素,poll是从队列取出元素而且删除该元素
执行结果
offer后,队列是否空?false 从队列中poll:哈哈哈 pool后,队列是否空?true
ConcurrentLinkedQueue中的add() 和 offer() 彻底同样,都是往队列尾部添加元素
peek()
获取但不移除此队列的头;若是此队列为空,则返回 null
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,队列是否空?" + queue.isEmpty()); System.out.println("从队列中peek:" + queue.peek()); System.out.println("从队列中peek:" + queue.peek()); System.out.println("从队列中peek:" + queue.peek()); System.out.println("pool后,队列是否空?" + queue.isEmpty()); }
执行结果:
offer后,队列是否空?false 从队列中peek:哈哈哈 从队列中peek:哈哈哈 从队列中peek:哈哈哈 pool后,队列是否空?false
remove(Object o)
从队列中移除指定元素的单个实例(若是存在)
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,队列是否空?" + queue.isEmpty()); System.out.println("从队列中remove已存在元素 :" + queue.remove("哈哈哈")); System.out.println("从队列中remove不存在元素:" + queue.remove("123")); System.out.println("remove后,队列是否空?" + queue.isEmpty()); }
remove一个已存在元素,会返回true,remove不存在元素,返回false
执行结果:
offer后,队列是否空?false 从队列中remove已存在元素 :true 从队列中remove不存在元素:false remove后,队列是否空?true
size()
返回此队列中的元素数量
注意:
若是此队列包含的元素数大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。 须要当心的是,与大多数 collection 不一样,此方法不是 一个固定时间操做。因为这些队列的异步特性,肯定当前的元素数须要进行一次花费 O(n) 时间的遍历。 因此在须要判断队列是否为空时,尽可能不要用 queue.size()>0,而是用 !queue.isEmpty()
比较size()和isEmpty() 效率的示例:
场景:10000我的去饭店吃饭,10张桌子供饭,分别比较size() 和 isEmpty() 的耗时
public class Test01ConcurrentLinkedQueue { public static void main(String[] args) throws InterruptedException { int peopleNum = 10000;//吃饭人数 int tableNum = 10;//饭桌数量 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); CountDownLatch count = new CountDownLatch(tableNum);//计数器 //将吃饭人数放入队列(吃饭的人进行排队) for(int i=1;i<=peopleNum;i++){ queue.offer("消费者_" + i); } //执行10个线程从队列取出元素(10个桌子开始供饭) System.out.println("-----------------------------------开饭了-----------------------------------"); long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(tableNum); for(int i=0;i<tableNum;i++) { executorService.submit(new Dinner("00" + (i+1), queue, count)); } //计数器等待,知道队列为空(全部人吃完) count.await(); long time = System.currentTimeMillis() - start; System.out.println("-----------------------------------全部人已经吃完-----------------------------------"); System.out.println("共耗时:" + time); //中止线程池 executorService.shutdown(); } private static class Dinner implements Runnable{ private String name; private ConcurrentLinkedQueue<String> queue; private CountDownLatch count; public Dinner(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count) { this.name = name; this.queue = queue; this.count = count; } @Override public void run() { //while (queue.size() > 0){ while (!queue.isEmpty()){ //从队列取出一个元素 排队的人少一个 System.out.println("【" +queue.poll() + "】----已吃完..., 饭桌编号:" + name); } count.countDown();//计数器-1 } } }
执行结果:
使用size耗时:757ms
使用isEmpty耗时:210
当数据量越大,这种耗时差距越明显。因此这种判断用isEmpty 更加合理
contains(Object o)
若是此队列包含指定元素,则返回 true
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("123"); System.out.println(queue.contains("123")); System.out.println(queue.contains("234")); }
执行结果:
toArray
toArray()
返回以恰当顺序包含此队列全部元素的数组
toArray(T[] a)
返回以恰当顺序包含此队列全部元素的数组;返回数组的运行时类型是指定数组的运行时类型
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Object[] objects = queue.toArray(); System.out.println(objects[0] + ", " + objects[1]); //将数据存储到指定数组 String[] strs = new String[2]; queue.toArray(strs); System.out.println(strs[0] + ", " + strs[1]); }
执行结果:
iterator()
返回在此队列元素上以恰当顺序进行迭代的迭代器
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Iterator<String> iterator = queue.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } }
ConcurrentLinkedQueue文档说明:
构造方法摘要 |
---|
ConcurrentLinkedQueue() 建立一个最初为空的 ConcurrentLinkedQueue。 |
ConcurrentLinkedQueue(Collection<? extends E> c) 建立一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。 |
方法摘要 | ||
---|---|---|
boolean |
add(E e) 将指定元素插入此队列的尾部。 |
|
boolean |
contains(Object o) 若是此队列包含指定元素,则返回 true。 |
|
boolean |
isEmpty() 若是此队列不包含任何元素,则返回 true。 |
|
Iterator<E> |
iterator() 返回在此队列元素上以恰当顺序进行迭代的迭代器。 |
|
boolean |
offer(E e) 将指定元素插入此队列的尾部。 |
|
E |
peek() 获取但不移除此队列的头;若是此队列为空,则返回 null。 |
|
E |
poll() 获取并移除此队列的头,若是此队列为空,则返回 null。 |
|
boolean |
remove(Object o) 从队列中移除指定元素的单个实例(若是存在)。 |
|
int |
size() 返回此队列中的元素数量。 |
|
Object[] |
toArray() 返回以恰当顺序包含此队列全部元素的数组。 |
|
|
toArray(T[] a) 返回以恰当顺序包含此队列全部元素的数组;返回数组的运行时类型是指定数组的运行时类型。 |