上期回顾:html
上次博客咱们主要说了咱们juc并发包下面的ReetrantLock的一些简单使用和底层的原理,是如何实现公平锁、非公平锁的。内部的双向链表究竟是什么意思,prev和next究竟是什么,为何要引入heap和tail来值向null的Node节点。高并发时候是如何保证state来记录重入锁的,在咱们的上次博客都作了详细的说明。此次咱们来聊一些简单易懂且实用的AQS中的工具类。node
Semaphore信号量:多线程
这个东西很简单,别看字面意思,什么信号量,我也不懂得那个术语什么意思,Semaphore你能够这样来理解,咱们要去看电影,并且是3D电影(必须戴3D眼镜才能够进入),可是比较不巧的是咱们电影院只有两个3D眼镜了,也就是说,咱们每次只能进去两我的看电影,而后等待这两我的看完电影之后把眼镜还回来,后面的两我的才能继续观看,就是说每次只容许最多进去两我的,每次进入到线程获取锁,须要你获得前置的票据,才能够进行后续的流程。能够理解为一个简单的限流吧。咱们来一下代码示例。并发
public class Test { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 5; i++) { new Thread(new Task(semaphore,"xiaocaijishu"+i)).start(); } } static class Task extends Thread{ Semaphore semaphore; public Task(Semaphore semaphore,String tname){ this.semaphore = semaphore; this.setName(tname); } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"拿着3D眼镜进去了,时间是"+System.currentTimeMillis()); Thread.sleep(1000); semaphore.release(); System.out.println(Thread.currentThread().getName()+"出来了,将3D眼镜还给了服务人员,时间是"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果就是这样的高并发
咱们来解释一下运行结果,线程1和线程3同一时间去看电影了,而后1出来了,这时线程9立刻拿着咱们的3D眼镜进去了,过了一会线程3也看完电影了,出来了还了3D眼镜,线程7又在同一时间拿着3D眼镜进去看电影了,后续线程都是如此执行的,每次只是进入两个线程。工具
简单的使用看到了,咱们来看看底层的源码设计吧。开始的时候咱们是建立一个Semaphore内部票据数目给予的是2。ui
//1.建立初始票据是2的Semaphore Semaphore semaphore = new Semaphore(2); //2.进入Semaphore,查看数据2是如何存储的. public Semaphore(int permits) { sync = new NonfairSync(permits); } //3.底层仍是基于sync 建立了一个对象,但不一样于过去ReetrantLock的是,此次是一个非公平的锁对象,咱们再次进入NonfairSync看看那个数字2到底放在哪里了. Sync(int permits) { setState(permits); } //4.咱们能够看到底层仍是用State来存储的.
此次没有把全部代码所有粘出来,感受那样像是凑篇幅同样。this
经过上述代码,咱们能够看到,咱们的初始票据数,是上一次那个state来存的。spa
后续咱们调用了acquire方法来尝试获取票据,acquire方法也能够传入获取票据数目的好比semaphore.acquire(2);也是能够的。咱们进入acquire方法来看看究竟是如何获取的。线程
//从new Semaphore(2);点击进入后续方法 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //咱们能够看到,当咱们没有传须要获取多少票据的时候,会默认给予1这个参数,咱们来继续看后续流程 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //Thread.interrupted()判断当前线程是否已经中断,若是中断我直接抛出异常,电影都演完了,我拿3D眼镜还有毛线用. //tryAcquireShared(arg)尝试获取票据,arg是1,刚才给予的默认1 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //内部有实现关系,因此调用的是Semaphore类nonfairTryAcquireShared方法,咱们来解读一下 //直接就是一个死循环, int available = getState();获取一下当前还有多少票据 // int remaining = available - acquires;计算出当前票据减去所需票据的一个剩余值 //if (remaining < 0 || compareAndSetState(available, remaining))咱们现有2个票据,拿走1个,剩余1个,因此remaining < 0 必定是false的 //再来看另外一半compareAndSetState,用原子计算(上次博客说过为何要原子计算)方式来修改剩余票据,这个是能够修改为功的.因此知足条件能够返回一个2-1 也就是返回一个正数1
是否是有点看懵圈了,不少小伙伴感受if (remaining < 0 ||compareAndSetState(available, remaining))前面的remaining<0,这个或判断貌似没用啊,来张图解释一下。
有没有感受好点了,本身能够跟着源代码走一走,获取的过程就差一个doAcquireSharedInterruptibly尚未看了,若是获取超过了票据数,也就是不该该让返回负数时运行doAcquireSharedInterruptibly方法,咱们来看一下。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//以共享方式添加节点 boolean failed = true; try { for (;;) { final Node p = node.predecessor();//判断前驱节点是否为空 if (p == head) { int r = tryAcquireShared(arg);//再次尝试获取票据 if (r >= 0) {//>= 0表示获取票据成功 setHeadAndPropagate(node, r);//更改头节点 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //剔除不可用的Node节点 parkAndCheckInterrupt()) //阻塞当前线程 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
通过两次以上的尝试,咱们将该线程阻塞了,不至于一直for循环在运行,也就这样,票据发放完毕了。
过程差很少就是这样的,咱们能够再仔细看一下是如何添加节点的,上次ReetrantLock说了一些,咱们此次再来看一下。咱们现已第一次塞节点为例,
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//第一次必定是空的,咱们如今已初始塞节点为例。 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//为空直接进入这个逻辑 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //1.第一次必定是空 //二次循环不为空 进入else if (compareAndSetHead(new Node()))//2.建立一个空节点,而且做为head节点. tail = head;//3.tail指向那个head节点 } else { node.prev = t;//4. 将node节点的前驱指针指向 if (compareAndSetTail(t, node)) {//5.原子计算方式将node节点后驱节点指向tail t.next = node;//6.将t节点(空节点)的后驱指针指向node节点 return t; } } } }
第一次循环只是一个内部的初始空节点,第二次循环才是移动指针塞入的过程。
节点唤醒是在释放票据时被唤醒的,代码超级简单,能够本身当作一份做业,本身去看一遍代码吧~!提示流程就是先还票据,而后唤醒。Semaphore差很少就这些知识点,我也带着你们简单的看了一遍源码。咱们再来继续看一下后面AQS的一些工具类。
CountDownLaunch的基本使用
CountDownLaunch很好理解,也是比较实用的,咱们干王者农药的时候就是一个很好的栗子,游戏选完人物你们一块儿加载地图等游戏资料,有的人慢,有的人快,这时就印出来了CountDownLaunch,至关于咱们5个玩家同时开启5个线程,而后一块儿执行,执行完毕先等着,直到5个玩家所有执行完成时,才能够运行后续操做。咱们来看一下代码。
public class CountDownLaunchSample { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new playerOne(countDownLatch)).start(); new Thread(new playerTwo(countDownLatch)).start(); countDownLatch.await(); System.out.println("所有加载完成"); } static class playerOne implements Runnable { CountDownLatch countDownLatch; public playerOne(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("玩家1开始加载..."); Thread.sleep(2000); System.out.println("玩家1加载完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } } static class playerTwo implements Runnable { CountDownLatch countDownLatch; public playerTwo(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("玩家2开始加载..."); Thread.sleep(10000); System.out.println("玩家2加载完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } } }
实际项目中若是遇到读取excel多个sheet页签而后汇总数据的状况也能够采用CountDownLanch。注意最后final的countDownLatch.countDown()方法,也是一个相似上面票据增减的方法。
CyclicBarrier栅栏的简单使用:
CyclicBarrier和咱们上面的CountDownLanch差很少,都是开启多个任务一块儿去执行,不一样的是CountDownLanch须要支线任务执行完成而后CountDownLanch作一个汇总,而后继续运行后续程序。CyclicBarrier不须要作汇总。再就是CyclicBarrier是能够重复的。
public class CyclicBarrierTest implements Runnable { private CyclicBarrier cyclicBarrier; private int index ; public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) { this.cyclicBarrier = cyclicBarrier; this.index = index; } public void run() { try { System.out.println("index: " + index); index--; cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() { public void run() { System.out.println("全部特工到达屏障,准备开始执行秘密任务"); } }); for (int i = 0; i < 10; i++) { new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start(); } cyclicBarrier.await(); System.out.println("所有到达屏障...."); } }
这个须要注意的是CyclicBarrier cyclicBarrier = new CyclicBarrier(11, 这个11,就是说必定有11个线程执行完毕,我才能够执行后面的操做,咱们下面for循环是10,而咱们那里写的是11啊,别忘记还有一个主线程呢,因此说每次计算必定加一个主线程啊。
Exchanger的简单使用
最后就是咱们Exchanger,平时使用的很少,咱们了解一下就能够了,搂一眼代码,就是线程之间的变量交换。
public static void main(String []args) { final Exchanger<Integer> exchanger = new Exchanger<Integer>(); for(int i = 0 ; i < 4 ; i++) { final Integer num = i; new Thread() { public void run() { System.out.println("我是线程:Thread_" + this.getName() + "个人数据是:" + num); try { Integer exchangeNum = exchanger.exchange(num); Thread.sleep(1000); System.out.println("我是线程:Thread_" + this.getName() + "我原先的数据为:" + num + " , 交换后的数据为:" + exchangeNum); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } }
总结:
此次咱们核心梳理了咱们的Semaphore的执行流程,内部是如何来实现咱们的票据计数,获取,归还等操做的,再就是咱们for无限循环会在两次之后自动阻塞的设计思想,还有咱们的CountDownLanch、CyclicBarrier、Executors的基本使用,并赋予你们简单的代码流程,今天就说到这,明天咱们继续来讲咱们的多线程。