当客户端向服务层同时提交多个任务(Task)时,为了充分发挥服务器的处理计算能力,普通单线程已经不能知足咱们的需求了,须要采用多线程或多进程的技术来并发处理task,服务器在处理并发任务时须要知足一下几个要求:java
Client.java服务器
package com.ngsky.async; /** * @Description TODO * @Author daxiong * @Date 7/7/2018 10:48 AM **/ public class AsyncTaskTest { public static void main(String[] args){ AsyncExecutorService executorService = new AsyncExecutorService(5, 10, 30, TimeUnit.SECONDS, 2, 5); long beginTime = System.currentTimeMillis(); for(int i = 0; i < 1000;i++){ MockTask mockTask = new MockTask(); mockTask.setName("task" + i); mockTask.setRetryTimes(3); mockTask.setRetryWaitTime(2000); executorService.execute(mockTask); } long endTime = System.currentTimeMillis(); System.out.println("The task spend on " + ((endTime - beginTime) / 60L) + " minutes"); } } // every task spend 20s, (1000 * 2) / 60 = 33 minutes class MockTask extends AsyncTask{ public void doWork() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
AsyncTask.java多线程
package com.ngsky.async; /** * @Description TODO * @Author daxiong * @Date 7/7/2018 10:14 AM **/ public abstract class AsyncTask implements Runnable { private String name; // task name private boolean successDone; // whether the task completed successful private int retryTimes; // if the task was't finished successful, system allow retry to do the task for retrytimes private int retryWaitTime; // system cant't retry to do work immediately, but execute after retryWaitTime public void run() { System.out.println("(task) " + getName() + " beginning execute...."); long beginTime = System.currentTimeMillis(); int currentRetryTimes = 1; try { doWork(); System.out.println("(task) " + getName() + " completed successful!"); this.setSuccessDone(true); } catch (Exception e) { System.out.println("(task) " + " execute filed..., message " + e); this.setSuccessDone(false); } if (getRetryTimes() <= 0) return; while (!isSuccessDone() && currentRetryTimes <= getRetryTimes()) { System.out.println("(task) " + "Executing retry " + currentRetryTimes + "th!" ); if (getRetryWaitTime() > 0) { try { Thread.sleep(getRetryWaitTime()); } catch (InterruptedException e) { e.printStackTrace(); } } try { doWork(); System.out.println("(task) " + getName() + " completed successful!"); this.setSuccessDone(true); } catch (Exception e) { System.out.println("(task) " + getName() + " was failed, unknown reason! Please try again!"); this.setSuccessDone(false); currentRetryTimes++; } } long endTime = System.currentTimeMillis(); System.out.println("(task) " + " spend on " + (endTime - beginTime) + "ms and result is " + (this.isSuccessDone() ? "successful!" : "failed,Please check your task!")); } public abstract void doWork() throws Exception; public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isSuccessDone() { return successDone; } public void setSuccessDone(boolean successDone) { this.successDone = successDone; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } public int getRetryWaitTime() { return retryWaitTime; } public void setRetryWaitTime(int retryWaitTime) { this.retryWaitTime = retryWaitTime; } }
AsyncExecutorService.java并发
package com.ngsky.async; import java.util.concurrent.*; /** * @Description async service interface * @Author daxiong * @Date 7/7/2018 9:16 AM **/ public class AsyncExecutorService { private int corePoolSize; private int maximumPoolSize; private long keepLiveTime; private TimeUnit timeUnit; private int retryTimes; private int retryWaitTime; private LinkedBlockingQueue<Runnable> blockingQueue; private ThreadPoolExecutor executor; public AsyncExecutorService(int corePoolSize, int maximumPoolSize, long keepLiveTimes, TimeUnit timeUnit, int retryTimes, int retryWaitTimes){ this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.keepLiveTime = keepLiveTimes; this.timeUnit = timeUnit; this.retryTimes = retryTimes; this.retryWaitTime = retryWaitTimes; init(); } private void init(){ System.out.println("Async executor initializing..."); blockingQueue = new LinkedBlockingQueue<Runnable>(); executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepLiveTime, timeUnit, blockingQueue); } // init task information private AsyncTask initTask(AsyncTask task){ System.out.println("(task) " + task.getName() + " initializing..."); if(retryTimes > 0) task.setRetryTimes(retryTimes); if(retryWaitTime > 0) task.setRetryWaitTime(retryWaitTime); return task; } public void execute(AsyncTask task){ task = initTask(task); executor.execute(task); } public <T> Future<T> submit(Callable<T> job){ return executor.submit(job); } public void shutdown(){ if(executor != null) executor.shutdown(); } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaximumPoolSize() { return maximumPoolSize; } public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } public long getKeepLiveTime() { return keepLiveTime; } public void setKeepLiveTime(long keepLiveTime) { this.keepLiveTime = keepLiveTime; } public TimeUnit getTimeUnit() { return timeUnit; } public void setTimeUnit(TimeUnit timeUnit) { this.timeUnit = timeUnit; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } public int getRetryWaitTime() { return retryWaitTime; } public void setRetryWaitTime(int retryWaitTime) { this.retryWaitTime = retryWaitTime; } public LinkedBlockingQueue<Runnable> getBlockingQueue() { return blockingQueue; } public void setBlockingQueue(LinkedBlockingQueue<Runnable> blockingQueue) { this.blockingQueue = blockingQueue; } public ThreadPoolExecutor getExecutor() { return executor; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } }
执行结果 异步
异步任务调度相关的理论和实现还有许多地方没有涉及到,这里作一个抛砖引玉,好比任务执行过程当中的拦截操做,线程池初始化时大小如何选择等等。async
任务调度针对的是高并发任务请求,要求保证系统性能,以最快的速度处理任务,提升处理效率和成功率。高并发
选择的技术:多线程,线程池,并发处理,任务调度,资源分配,重试机制。性能
再接再砺,写好每一篇博客,不要求天天写一篇,但要求每一篇都要是精华,都要饱含质量!this