最近能够进行个税申报了,尚未申报的同窗能够赶忙去试试哦。不过我反正是从上午到下午一直都没有成功的进行申报,一进行申报
就返回“当前访问人数过多,请稍后再试”。为何有些人就可以申报成功,有些人就直接返回失败。这很明显申报处理资源是有限的,
只能等别人处理完了在来处理你的,你若是运气好可能重试几回就轮到你了,若是运气很差可能重试一天也可能轮不到你。
我反正已是放弃了,等到夜深人静的时候再来试试。做为一个程序员咱们确定知道这是个税申请app的限流操做,若是还有不懂什么
是限流操做的能够参考下这个文章《高并发系统三大利器之限流》。
好比个税申报系统每台机器只最多分别只能处理1000
个请求,再多的请求就会把机器打挂。若是是多余的请求就把这些请求拒绝掉。直接给你返回一句舒适提示:“当前访问人数过多,请稍后再试”,若是要实现这个功能你们想一想能够经过哪些方法算法来实现。java
学习semaphore
以前咱们必需要先了解下什么是共享锁。在上一篇文章《Java高并发编程基础之AQS》咱们介绍了公平锁于非公平锁的区别。node
独占锁:也有人把它叫作“独享锁”,它是是独占的,排他的,只能被一个线程可持有,
当独占锁已经被某个线程持有时,其余线程只能等待它被释放后,才能去争锁,而且同一时刻只有一个线程能争锁成功。git
在《Java并发编程艺术》(微信搜【java金融】回复电子书能够免费获取PDF版本)这一书中是这么说的:程序员
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它经过协调各个线程,以保证合理的使用公共资源。不少年以来,我都以为从字面上很难理解Semaphore所表达的含义,只能把它比做是控制流量的红绿灯,好比XX马路要限制流量,只容许同时有一百辆车在这条路上行使,其余的都必须在路口等待,因此前一百辆车会看到绿灯,能够开进这条马路,后面的车会看到红灯,不能驶入XX马路,可是若是前一百辆中有五辆车已经离开了XX马路,那么后面就容许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。github
Semaphore
机制是提供给线程抢占式获取许可,因此他能够实现公平或者非公平,相似于ReentrantLock
。
说了这么多咱们来个实际的例子看一看,好比咱们去停车场停车,停车场总共只有5
个车位,可是如今有8
辆汽车来停车,剩下的3
辆汽车要么等其余汽车开走后进行停车,或者去找别的停车位?面试
/** * @author: 公众号【Java金融】 */ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { // 初始化五个车位 Semaphore semaphore = new Semaphore(5); // 等全部车子 final CountDownLatch latch = new CountDownLatch(8); for (int i = 0; i < 8; i++) { int finalI = i; if (i == 5) { Thread.sleep(1000); new Thread(() -> { stopCarNotWait(semaphore, finalI); latch.countDown(); }).start(); continue; } new Thread(() -> { stopCarWait(semaphore, finalI); latch.countDown(); }).start(); } latch.await(); log("总共还剩:" + semaphore.availablePermits() + "个车位"); } private static void stopCarWait(Semaphore semaphore, int finalI) { String format = String.format("车牌号%d", finalI); try { semaphore.acquire(1); log(format + "找到车位了,去停车了"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); log(format + "开走了"); } } private static void stopCarNotWait(Semaphore semaphore, int finalI) { String format = String.format("车牌号%d", finalI); try { if (semaphore.tryAcquire()) { log(format + "找到车位了,去停车了"); Thread.sleep(10000); log(format + "开走了"); semaphore.release(); } else { log(format + "没有停车位了,不在这里等了去其余地方停车去了"); } } catch (Exception e) { e.printStackTrace(); } } public static void log(String content) { // 格式化 DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 当前时间 LocalDateTime now = LocalDateTime.now(); System.out.println(now.format(fmTime) + " "+content); } }
2021-03-01 18:54:57 车牌号0找到车位了,去停车了 2021-03-01 18:54:57 车牌号3找到车位了,去停车了 2021-03-01 18:54:57 车牌号2找到车位了,去停车了 2021-03-01 18:54:57 车牌号1找到车位了,去停车了 2021-03-01 18:54:57 车牌号4找到车位了,去停车了 2021-03-01 18:54:58 车牌号5没有停车位了,不在这里等了去其余地方停车去了 2021-03-01 18:55:07 车牌号7找到车位了,去停车了 2021-03-01 18:55:07 车牌号6找到车位了,去停车了 2021-03-01 18:55:07 车牌号2开走了 2021-03-01 18:55:07 车牌号0开走了 2021-03-01 18:55:07 车牌号3开走了 2021-03-01 18:55:07 车牌号4开走了 2021-03-01 18:55:07 车牌号1开走了 2021-03-01 18:55:17 车牌号7开走了 2021-03-01 18:55:17 车牌号6开走了 2021-03-01 18:55:17 总共还剩:5个车位
从输出结果咱们能够看到车牌号5
这辆车看见没有车位了,就不在这个地方傻傻的等了,而是去其余地方了,可是车牌号6
和车牌号7
分别须要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphore
的acquire
方法若是没有获取到凭证它就会阻塞,而tryAcquire
方法若是没有获取到凭证不会阻塞的。算法
在Dubbo
中能够给Provider
配置线程池大小来控制系统提供服务的最大并行度,默认是200
。apache
<dubbo:provider threads="200"/>
好比我如今这个订单系统有三个接口,分别为创单、取消订单、修改订单。这三个接口加起来的并发是200可是创单接口是核心接口,我想让它多分点线程来执行
让它能够有最大150
个线程,取消订单和修改订单分别最大25
个线程执行就能够了。dubbo
提供了executes
这一属性来实现这个功能编程
<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/> <dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/> <dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>
咱们能够看看dubbo
内部是如何来executes
的,具体实现是在ExecuteLimitFilter
这个类咱们能够segmentfault
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // 若是当前使用的线程数量已经大于等于设置的阈值,那么直接抛出异常 // if (count.getActive() >= max) { // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; // 计数器+1 RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 计数器-1 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
从上述代码咱们也能够看出早期这个是没有采用Semaphore
来实现的,而是直接采用被注释的 if (count.getActive() >= max)
这个来来实现的,因为这个count.getActive() >= max 和这个计数加1不是原子性的,因此会有问题,具体bug号能够看https://github.com/apache/dubbo/pull/582后面才采用上述代码用Semaphore
来修复非原子性问题。具体更详细的分析能够参见代码的连接。不过如今最新版本(2.7.9)我看是采用采用自旋加上和CAS
来实现的。
上面就是对Semaphore
一个简单的使用以及dubbo
中用到的例子,说句实话Semaphore在工做中用的仍是比较少的,不过面试又有可能会被问到,因此仍是有必要来一块儿学习一下它。咱们前面《Java高并发编程基础之AQS》经过ReentrantLock 一块儿学习了下AQS,其实Semaphore一样也是经过AQS来是实现的,咱们能够一块儿来对照下独占锁的方法,基本上都是有方法一一相对应的。
这里有两点稍微须要注意的地方:
在共享锁模式下,当一个节点获取到了共享锁,咱们在获取成功后就能够唤醒后继节点了,而不须要等到该节点释放锁的时候,这是由于共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点均可以直接来获取。所以,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。
咱们一样仍是经过非公平锁的模式来老获取凭证
咱们能够看下acquire的核心方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 主要看下这个方法,这个方法返回的值也就是tryAcquireShared返回的值,由于tryAcquireShared->nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { //自旋 for (;;) { //Semaphore用AQS的state变量的值表明可用许可数 int available = getState(); //可用许可数减去本次须要获取的许可数即为剩余许可数 int remaining = available - acquires; //若是剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }
tryAcquireShared
获取返回许可书小于0时说明获取许可失败须要进入doAcquireSharedInterruptibly
这个方法去休眠。tryAcquireShared
获取返回许可书小于0时说明获取许可成功直接结束。 ```java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE), // 而共享锁调用的是addWaiter(Node.SHARED),代表了该节点处于共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这个方法是否是跟咱们上篇文章讲的AQS
的独占锁的acquireQueued
很像,不过独占锁它是直接调用了用了setHead(node)
方法,而共享锁调用的是setHeadAndPropagate(node, r)
这个方法除了调用setHead
里面还调用了doReleaseShared
(唤醒后继节点)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
其余的方法基本上是和ReentrantLock
来实现的独占锁差很少,我相信你们对源码分析感兴趣的应该也很少,其余更多细节问题仍是须要本身亲自动手去看源码的。
Semaphore
初始化设置许可证为1 时,它也能够看成互斥锁使用。其中0、1就至关于它的状态,当=1时表示其余线程能够获取,当=0时,排他,即其余线程必需要等待。Semaphore
是JUC
包中的一个很简单的工具类,用来实现多线程下对于资源的同一时刻的访问线程数限制Semaphore
中存在一个【许可】的概念,即访问资源以前,先要得到许可,若是当前许可数量为0
,那么线程阻塞,直到得到许可Semaphore
内部使用AQS
实现,由抽象内部类Sync
继承了AQS
。由于Semaphore
天生就是共享的场景,因此其内部实际上相似于共享锁的实现semaphore
来进行限流的话会产生突刺现象。
指在必定时间内的一小段时间内就用完了全部资源,后大部分时间中无资源可用。
好比在限流方法中的计算器算法,设置1s内的最大请求数为100,在前100ms已经永远了100个请求,则后面900ms将没法处理请求,这就是突刺现象结束