package sample; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /* * * Executors 类使用 ExecutorService 提供了一个 ThreadPoolExecutor 的简单实现, * 但 ThreadPoolExecutor 提供的功能远不止这些。 * 咱们能够指定建立 ThreadPoolExecutor 实例时活跃的线程数,而且能够限制线程池的大小, * 还能够建立本身的 RejectedExecutionHandler 实现来处理不适合放在工做队列里的任务。 * */ public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { /** * ThreadPoolExecutor 提供了一些方法,能够查看执行状态、线程池大小、 * 活动线程数和任务数。因此,我经过一个监视线程在固定间隔输出执行信息。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
package sample; import java.util.concurrent.ThreadPoolExecutor; public class MonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run = true; public MonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds = delay; } public void shutdown() { this.run = false; } public void run() { while (run) { System.out .println(String .format( "[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor .getCorePoolSize(), this.executor .getActiveCount(), this.executor .getCompletedTaskCount(), this.executor.getTaskCount(), this.executor .isShutdown(), this.executor .isTerminated())); try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package sample; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPoolAdvanced { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor /** * 请注意:在初始化 ThreadPoolExecutor 时,初始线程池大小设为二、最大值设为四、工做队列大小设为2。 * 因此,若是当前有4个任务正在运行而此时又有新任务提交, * 工做队列将只存储2个任务和其余任务将交由RejectedExecutionHandlerImpl 处理。 */ ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MonitorThread monitor = new MonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for (int i = 1; i < 11; i++) { executorPool.execute(new WorkerThread("Worker Thread No: " + i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }