实现限制执行频率的线程池

Java 中的线程池(ThreadPoolExecutor)咱们都知道(不知道请自行搜索),它的执行机制简单讲就是多个线程不停的从队列里面取任务执行。可是咱们可能遇到下面这样的场景:java

我有一批数据要经过线程池来处理,处理过程当中须要调用某个远程服务。但该服务存在调用频率限制,好比每秒钟最多调用 50 次,超过这个阈值将返回错误信息。安全

这是否意味着咱们不该该用多线程了呢?不是,在这个场景中,咱们要保证的是以间隔不低于 20ms 的频率发起请求,至于处理时间,无论是几百甚至几千毫秒,都不影响发起请求的频率,所以多线程是必要的。多线程

默认的线程池(ThreadPoolExecutor)没有按固定频率执行任务的特性,有的同窗可能会想到 ScheduledThreadPoolExecutor,可是很惋惜这个类也不能用,别看它名字里面带了计划任务的特性,但这个是用来反复执行同一个任务的,而咱们的场景是一个任务只执行一次。ide

固然也有的同窗会想到一种方案,依旧使用 ScheduledThreadPoolExecutor,可是将任务队列外部化(即不使用 ScheduledThreadPoolExecutor 的内部任务队列),而后 ScheduledThreadPoolExecutor 的任务自己就是从外部队列取任务执行。ui

这种方案是可行的,可是抛开实现起来过于复杂不说,线程池的执行机制也会遭到破坏,好比说咱们原本能够经过 shutdown()awaitTermination() 来等待线程池队列所有执行完,令线程池安全关闭;但若任务队列外部化,这点就作不到了,由于线程池会马上关闭,不会再处理外部队列中的剩余任务。this

这里有一个相对简单的解决方案。好在 ThreadPoolExecutor 给咱们提供了 beforeExecute() 这样一个扩展点,咱们能够经过继承 ThreadPoolExecutor,覆写这个方法来实现执行频率的限制:线程

  1. 使每一个线程在执行任务前延迟一段时间;
  2. 使用一个信号量来同步这段延迟,这样每一个线程在执行任务前被这个信号量锁住,拿到锁后延迟一段时间再释放锁,而后再执行任务。

因而可知,这样的设计既实现了执行频率限制,又保持了任务执行自己的并行性,同时线程池的执行机制没有受到影响。设计

代码实现起来不复杂,以下:code

public class FundThreadPoolExecutor extends ThreadPoolExecutor {

    private int fixedRateMillis;

    private final Semaphore fixedRateSemaphore = new Semaphore(1);

    // 设置执行频率限制的延迟时间(ms)
    public void setFixedRateMillis(int fixedRateMillis) {
        this.fixedRateMillis = fixedRateMillis;
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if (this.fixedRateMillis > 0) {
            try {
                this.fixedRateSemaphore.acquire();
                Thread.sleep(this.fixedRateMillis);
            } catch (InterruptedException e) {
                // ignore this
            } finally {
                this.fixedRateSemaphore.release();
            }
        }
    }
}
相关文章
相关标签/搜索