一、继承Threadjava
Thread01 thread = new Thread01(); thread.start();
二、实现Runable接口spring
Runable01 runable = new Runable(); new Thread(runable).start();
三、实现Callable接口 + FutureTask(能够拿到返回结果,能够处理异常)多线程
Callable01 callable01 = new Callable01(); FutureTask<Integer> futureTask = new FutureTask<>(new callable01()); new Thread(futureTask).start(); Integer result = futureTask.get(); // 阻塞等待,直到整个线程执行完成,得到返回结果
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } // Runnable也能够得到返回结果 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
四、线程池 (最经常使用) ,应该将全部多线程异步任务都交给线程池执行app
Excutors(少用)框架
public static ExecutorService service = Executors.newFixedThreadPool(10);
7大参数源码less
/** * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached */ public ThreadPoolExecutor( int corePoolSize, // 核心线程数,建立好就准备就绪的线程数量,一直存在,空闲也不会被释放,除非设置allowCoreThreadTimeOut int maximumPoolSize, // 最大线程数量,用于控制资源,空闲线程超过指定的keepAliveTime时间会被释放 long keepAliveTime, // 存活时间。若是当前的线程数大于核心线程数,那么只要线程空闲时间大于指定keepAliveTime,就释放该线程 TimeUnit unit, // 存活时间的时间单位 BlockingQueue<Runnable> workQueue, // 阻塞式工做队列,若是任务不少,就会将多的任务放进该队列,只要有线程空闲就会去队列取新任务执行 ThreadFactory threadFactory, // 线程的建立工厂 RejectedExecutionHandler handler // 若是workQueue工做队列满了,按照指定的handler方法执行拒绝策略执行任务 )
其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程而后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。异步
用户经过submit提交一个任务。线程池会执行以下流程:async
咱们先来看一下ThreadPoolExecutor中的几个关键属性。ide
//这个属性是用来存放 当前运行的worker数量以及线程池状态的 //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //存听任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; //worker的集合,用set来存放 private final HashSet<Worker> workers = new HashSet<Worker>(); //历史达到的worker数最大值 private int largestPoolSize; //当队列满了而且worker的数量达到maxSize的时候,执行具体的拒绝策略 private volatile RejectedExecutionHandler handler; //超出coreSize的worker的生存时间 private volatile long keepAliveTime; //常驻worker的数量 private volatile int corePoolSize; //最大worker的数量,通常当workQueue满了才会用到这个参数 private volatile int maximumPoolSize;
下面是execute方法的源码函数
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //workerCountOf(c)会获取当前正在运行的worker数量 if (workerCountOf(c) < corePoolSize) { //若是workerCount小于corePoolSize,就建立一个worker而后直接执行该任务 if (addWorker(command, true)) return; c = ctl.get(); } //isRunning(c)是判断线程池是否在运行中,若是线程池被关闭了就不会再接受任务 //后面将任务加入到队列中 if (isRunning(c) && workQueue.offer(command)) { //若是添加到队列成功了,会再检查一次线程池的状态 int recheck = ctl.get(); //若是线程池关闭了,就将刚才添加的任务从队列中移除 if (! isRunning(recheck) && remove(command)) //执行拒绝策略 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //若是加入队列失败,就尝试直接建立worker来执行任务 else if (!addWorker(command, false)) //若是建立worker失败,就执行拒绝策略 reject(command); }
添加worker的方法addWorker源码
private boolean addWorker(Runnable firstTask, boolean core) { retry: //使用自旋+cas失败重试来保证线程竞争问题 for (;;) { //先获取线程池的状态 int c = ctl.get(); int rs = runStateOf(c); // 若是线程池是关闭的,或者workQueue队列非空,就直接返回false,不作任何处理 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //根据入参core 来判断能够建立的worker数量是否达到上限,若是达到上限了就拒绝建立worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //没有的话就尝试修改ctl添加workerCount的值。这里用了cas操做,若是失败了下一个循环会继续重试,直到设置成功 if (compareAndIncrementWorkerCount(c)) //若是设置成功了就跳出外层的那个for循环 break retry; //重读一次ctl,判断若是线程池的状态改变了,会再从新循环一次 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; //建立一个worker,将提交上来的任务直接交给worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //加锁,防止竞争 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); //仍是判断线程池的状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //若是worker的线程已经启动了,会抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); //添加新建的worker到线程池中 workers.add(w); int s = workers.size(); //更新历史worker数量的最大值 if (s > largestPoolSize) largestPoolSize = s; //设置新增标志位 workerAdded = true; } } finally { mainLock.unlock(); } //若是worker是新增的,就启动该线程 if (workerAdded) { t.start(); //成功启动了线程,设置对应的标志位 workerStarted = true; } } } finally { //若是启动失败了,会触发执行相应的方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker是ThreadPoolExecutor内部定义的一个内部类。咱们先看一下Worker的继承关系
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
它实现了Runnable接口,因此能够拿来当线程用。同时它还继承了AbstractQueuedSynchronizer同步器类,主要用来实现一个不可重入的锁。
一些属性还有构造方法:
//运行的线程,前面addWorker方法中就是直接经过启动这个线程来启动这个worker final Thread thread; //当一个worker刚建立的时候,就先尝试执行这个任务 Runnable firstTask; //记录完成任务的数量 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立一个Thread,将本身设置给他,后面这个thread启动的时候,也就是执行worker的run方法 this.thread = getThreadFactory().newThread(this); }
worker的run方法
public void run() { //这里调用了ThreadPoolExecutor的runWorker方法 runWorker(this); }
ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //执行unlock方法,容许其余线程来中断本身 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //若是前面的firstTask有值,就直接执行这个任务 //若是没有具体的任务,就执行getTask()方法从队列中获取任务 //这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环 while (task != null || (task = getTask()) != null) { //执行任务前先锁住,这里主要的做用就是给shutdown方法判断worker是否在执行中的 //shutdown方法里面会尝试给这个线程加锁,若是这个线程在执行,就不会中断它 w.lock(); //判断线程池状态,若是线程池被强制关闭了,就立刻退出 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行任务前调用。预留的方法,可扩展 beforeExecute(wt, task); Throwable thrown = null; try { //真正的执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行任务后调用。预留的方法,可扩展 afterExecute(task, thrown); } } finally { task = null; //记录完成的任务数量 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法咱们能够看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 若是线程池已经关闭了,就直接返回null, //若是这里返回null,调用的那个worker就会跳出while循环,而后执行完销毁线程 //SHUTDOWN状态表示执行了shutdown()方法 //STOP表示执行了shutdownNow()方法 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取当前正在运行中的worker数量 int wc = workerCountOf(c); // 若是设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了 //其实就是从队列获取任务的时候要不要设置超时间时间,若是超过这个时间队列尚未任务进来,就会返回null boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //若是上一次循环从队列获取到的未null,这时候timedOut就会为true了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //经过cas来设置WorkerCount,若是多个线程竞争,只有一个能够设置成功 //最后若是没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是要设置超时时间,就设置一下咯 //过了这个keepAliveTime时间尚未任务进队列就会返回null,那worker就会销毁 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //若是r为null,就设置timedOut为true timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
要添加一个有返回值的任务的实现也很简单。其实就是对任务作了一层封装,将其封装成Future,而后提交给线程池执行,最后返回这个future。
这里的 newTaskFor(task) 方法会将其封装成一个FutureTask类。
外部的线程拿到这个future,执行get()方法的时候,若是任务自己没有执行完,执行线程就会被阻塞,直到任务执行完。
下面是FutureTask的get方法
public V get() throws InterruptedException, ExecutionException { int s = state; //判断状态,若是任务还没执行完,就进入休眠,等待唤醒 if (s <= COMPLETING) s = awaitDone(false, 0L); //返回值 return report(s); }
FutureTask中经过一个state状态来判断任务是否完成。当run方法执行完后,会将state状态置为完成,同时唤醒全部正在等待的线程。咱们能够看一下FutureTask的run方法
public void run() { //判断线程的状态 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行call方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //这个方法里面会设置返回内容,而且唤醒因此等待中的线程 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,而后会将剩余的任务所有执行完
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查是否能够关闭线程 checkShutdownAccess(); //设置线程池状态 advanceRunState(SHUTDOWN); //尝试中断worker interruptIdleWorkers(); //预留方法,留给子类实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历全部的worker for (Worker w : workers) { Thread t = w.thread; //先尝试调用w.tryLock(),若是获取到锁,就说明worker是空闲的,就能够直接中断它 //注意的是,worker本身自己实现了AQS同步框架,而后实现的相似锁的功能 //它实现的锁是不可重入的,因此若是worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow作的比较绝,它先将线程池状态设置为STOP,而后拒绝全部提交的任务。最后中断左右正在运行中的worker,而后清空任务队列。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //检测权限 advanceRunState(STOP); //中断全部的worker interruptWorkers(); //清空任务队列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历全部worker,而后调用中断方法 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
无回调结果的异步方法:CompletableFuture.runAsync(Runnable runnable, Executor executor)
public static void main(String[] args) { System.out.println("main。。。start。。。"); CompletableFuture.runAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); }, executor); System.out.println("main。。。end。。。"); }
带有回调结果的异步方法:CompletableFuture.supplyAsyn(Supplier supplier, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main。。。start。。。"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, executor); Integer integer = future.get(); System.out.println("main。。。end。。。" + integer); }
方法完成后的感知 whenCompleteAsync 和 exceptionally
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main。。。start。。。"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).whenCompleteAsync((res, exception) -> { // 只能获得异常信息,没法修改返回数据 System.out.println("异步任务完成了,结果是:" + res + "异常是:" + exception); }).exceptionally(throwable -> { // 能够获取异常信息,同时能够返回默认值 return 10; // 修改返回值future.get() 结果为10 }); Integer integer = future.get(); System.out.println("main。。。end。。。" + integer); } /** 结果: main。。。start。。。 当前线程:11 异步任务完成了,结果是:null异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero main。。。end。。。10 **/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).handle((res, thr) -> { if(res != null) { return res * 2; // 修改future返回结果 } if(thr != null) { // 异常 return 0 } return 0; });
一、thenRunAsync:不能获取到上一步执行结果,无返回值
CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).thenRunAsync(() -> { // 没有返回值, 不能获取到上一步的执行结果 System.out.println("任务2启动了。。"); }, executor);
二、thenAcceptAsync:能接收到上一步结果但无返回值
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).thenAcceptAsync((res) -> { // 没有返回值 System.out.println("任务2启动了。。" + res); }, executor);
三、thenApplAsyncy:能接收到上一步返回结果,也有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).thenApplyAsync((res) -> { // 没有返回值 System.out.println("任务2启动了。。" + res); return "hello" + res; }, executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("当前线程:" + Thread.currentThread().getId()); int i = 10 / 4; System.out.println("运行结果:" + i); return i; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return "hello"; }, executor); future1.runAfterBothAsync(future2, () -> { // 没法感知前两个任务的结果 System.out.println("任务3开始。。"); }, executor); future1.thenAcceptBothAsync(future2, (f1, f2) -> { // 能获取到前两个任务的结果 System.out.println("任务3开始。。。以前的结果:" + f1 + "=>" +f2); },executor); // 既能获取到前两个任务的返回结果, 又能最终的返回结果future CompletableFuture<String> future = future1.thenCombineAsync(future2, (f1, f2) -> { return f1 + ":" + f2 + "-> haha"; }, executor); // 两个任务只要有一个完成,就执行任务3, 不能接受结果,没有返回值 future1.runAfterEitherAsync(future2, () -> { System.out.println("任务3开始。。。以前的结果:" + res); }, executor); // 两个任务只要有一个完成,就执行任务3, 感知结果,本身没有返回值 future1.acceptEitherAsync(future2, (res) -> { System.out.println("任务3开始。。。以前的结果:" + res); }, executor); // 两个任务只要有一个完成,就执行任务3, 既能感知结果,本身也有返回值 CompletableFuture<Object> future = future1.applyToEitherAsync(future2, res -> { System.out.println("任务3开始。。。以前的结果:" + res); }executor);
多任务组合操做
// 执行完全部任务才执行 allOf.get() CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3); System.out.println("main。。。end。。。" + allOf.get()); // 有一个执行完就执行 anyOf.get() CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3); System.out.println("main。。。end。。。" + anyOf.get());
@Test public void test0() throws Exception { System.out.println("main函数开始执行"); Thread thread=new Thread(new Runnable() { @Override public void run() { System.out.println("===task start==="); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("===task finish==="); } }); thread.start(); System.out.println("main函数执行结束"); }
jdk8以前的实现方式,在JUC下增长了Future,从字面意思理解就是将来的意思,但使用起来却着实有点鸡肋,并不能实现真正意义上的异步,获取结果时须要阻塞线程,或者不断轮询。
@Test public void test1() throws Exception { System.out.println("main函数开始执行"); ExecutorService executor = Executors.newFixedThreadPool(1); Future<Integer> future = executor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("===task start==="); Thread.sleep(5000); System.out.println("===task finish==="); return 3; } }); //这里须要返回值时会阻塞主线程,若是不须要返回值使用是OK的。倒也还能接收 //Integer result=future.get(); System.out.println("main函数执行结束"); System.in.read(); }
使用原生的CompletableFuture实现异步操做,加上对lambda的支持,能够说实现异步任务已经发挥到了极致。
@Test public void test2() throws Exception { System.out.println("main函数开始执行"); ExecutorService executor = Executors.newFixedThreadPool(2); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("===task start==="); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("===task finish==="); return 3; } }, executor); future.thenAccept(e -> System.out.println(e)); System.out.println("main函数执行结束"); }
使用spring实现异步须要开启注解,可使用xml方式或者java config的方式。
xml方式: <task:annotation-driven />
<task:annotation-driven executor="executor" /> <task:executor id="executor" pool-size="2" 线程池的大小 queue-capacity="100" 排队队列长度 keep-alive="120" 线程保活时间(单位秒) rejection-policy="CALLER_RUNS" 对拒绝的任务处理策略 />
java方式:
@EnableAsync public class MyConfig { @Bean public TaskExecutor executor(){ ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); //核心线程数 executor.setMaxPoolSize(20); //最大线程数 executor.setQueueCapacity(1000); //队列大小 executor.setKeepAliveSeconds(300); //线程最大空闲时间 executor.setThreadNamePrefix("fsx-Executor-"); //指定用于新建立的线程名称的前缀。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
@Test public void test3() throws Exception { System.out.println("main函数开始执行"); myService.longtime(); System.out.println("main函数执行结束"); } @Async public void longtime() { System.out.println("我在执行一项耗时任务"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("完成"); }
若是须要返回值,耗时方法返回值用AsyncResult包装。
@Test public void test4() throws Exception { System.out.println("main函数开始执行"); Future<Integer> future=myService.longtime2(); System.out.println("main函数执行结束"); System.out.println("异步执行结果:"+future.get()); } @Async public Future<Integer> longtime2() { System.out.println("我在执行一项耗时任务"); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("完成"); return new AsyncResult<>(3); }