双向队列(Deque),是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll)。使用场景好比工做窃取,好比限流。ide
使用deque来限流,其中timeIntervalInMs为事件窗口,maxLimit为该事件窗口的最大值。测试
public class MyRateLimiter { private static final Logger LOGGER = LoggerFactory.getLogger(DemoRateLimiter.class); private final Deque<Long> queue; private long timeIntervalInMs; public MyRateLimiter(long timeIntervalInMs, int maxLimit) { this.timeIntervalInMs = timeIntervalInMs; this.queue = new LinkedBlockingDeque<Long>(maxLimit); } public boolean incrAndReachLimit(){ long currentTimeMillis = System.currentTimeMillis(); boolean success = queue.offerFirst(currentTimeMillis); if(success){ //没有超过maxLimit return false; } synchronized (this){ //queue is full long last = queue.getLast(); //还在时间窗口内,超过maxLimit if (currentTimeMillis - last < timeIntervalInMs) { return true; } LOGGER.info("time window expired,current:{},last:{}",currentTimeMillis,last); //超过期间窗口了,超过maxLimit的状况下,重置时间窗口 queue.removeLast(); queue.addFirst(currentTimeMillis); return false; } } }
测试this
@Test public void testDeque() throws InterruptedException { DemoRateLimiter limiter = new DemoRateLimiter(5*1000,3); Callable<Void> test = new Callable<Void>(){ @Override public Void call() throws Exception { for(int i=0;i<1000;i++){ LOGGER.info("result:{}",limiter.incrAndReachLimit()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } return null; } }; ExecutorService pool = Executors.newFixedThreadPool(10); pool.invokeAll(Arrays.asList(test,test,test,test,test)); Thread.sleep(100000); }
这里使用了Deque的容量来做为时间窗口的限流大小,利用两端来判断时间窗口,相对来说有点巧妙。code