常常出如今等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 自己是并没有关系的,能够同时进行执行的。咱们但愿可以两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。java
由此能够扩展,在不少任务下,咱们须要执行两个任务但这两个任务并无先后的关联关系,咱们也但愿两个任务可以同时执行,而后再将执行结果汇聚就能够了。数据库
future 经过提交一个 callable 任务给线程池,线程池后台启动其余线程去执行,而后再调用 get() 方法获取结果编程
private void test() {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> sleep(1));
try {
Integer integer = future.get(3, TimeUnit.SECONDS);
System.out.println(integer);
} catch (InterruptedException e) {
// 当前线在等待中被中断
e.printStackTrace();
} catch (ExecutionException e) {
// 任务执行中的异常
e.printStackTrace();
} catch (TimeoutException e) {
// 超时
e.printStackTrace();
}
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
复制代码
该方式存在的问题,若是 sleep 执行超过 3 秒钟,future 将没法拿到返回结果。固然,Future 提供了一个无参的get 方法,能够一直等待结果。不过仍是建议使用带超时参数的 get 方法,同时定义好超时的处理方法。缓存
调用某个方法,调用方在被调用方运行的过程当中会等待,直到被调用方运行结束后返回,调用方取得被调用方的返回值并继续运行。即便调用方和被调用方不在同一个线程中运行,调用方仍是须要等待被调用方结束才运行,这就是阻塞式调用。并发
异步 API 调用后会直接返回,将计算任务交给其余线程来进行。其余线程执行完成后,再将结果返回给调用方。less
使用异步 API异步
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(1);
completableFuture.complete(sleep);
}).start();
CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(2);
completableFuture1.complete(sleep);
}).start();
Integer integer = null;
Integer integer1 = null;
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." + integer1);
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
异步处理async
上面代码的问题是,若是在线程内发生了异常,如何在外部的调用中被发现,同时去处理呢?正常的状况是,线程内发生异常,会直接被封锁在线程内,而最终线程会被杀死,那么 get 方法一直会阻塞。测试
此时就不该该使用 get() 方法,而是使用带有超时参数的 get 方法,而且在线程内,将异常传递回调用方。ui
new Thread(() -> {
try {
int sleep = sleep(2);
completableFuture1.complete(sleep);
} catch (Exception e) {
completableFuture1.completeExceptionally(e);
}
}).start();
复制代码
completableFuture1.completeExceptionally(e);
将异常传递出来,在 ExecutionException
中会被捕获,而后对其进行处理便可。
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
复制代码
示例:
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
throw new RuntimeException("故意抛出的异常...");
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
}).start();
Integer integer = null;
try {
integer = completableFuture.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." );
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
复制代码
此时会收到的异常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意抛出的异常...
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at com.example.demo.me.sjl.service.UserService.test(UserService.java:92)
at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at ....
复制代码
supplyAsync
建立 CompletableFutureCompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1));
复制代码
相比于 new 的方式,更加优雅、简洁,而且不用显式的建立线程(new Thread) 操做。默认会交由 ForkJoinPoll
池中的某个执行线程运行,同时也提供了重载的方法,指定 Executor 。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
复制代码
如何肯定默认的线程数量:
若是配置了系统属性 java.util.concurrent.ForkJoinPool.common.parallelism
则取该值,转换成 int 做为线程数量
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
parallelism = Integer.parseInt(pp);
复制代码
没有配置该值,则取 Runtime.getRuntime().availableProcessors()
值做为线程数量
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
复制代码
parallelism 初始值为 -1
调整线程池的大小
其中:$$N_{cppu}$$ 是处理器的核的数目,能够经过 Runtime.getRuntime().availableProcessors
获得
上面是一个参考公式《Java并发编程实战》(mng.bz/979c )
这是计算出的理论值,不过咱们在使用时,须要考虑实际状况,好比我有 5 个并行任务,那么我须要开启 5 个线程来分别进行执行,多了会千万浪费,少了达不到并发的效果。此时咱们须要 5 个线程。
public void save(){
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(1112)
.userName("施杰灵")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
});
CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(223)
.userName("施杰灵1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
if (true) {
throw new RuntimeException("故意抛出的异常...");
}
return userRepository.save(entity);
});
System.out.println(completableFuture.join());
System.out.println(completableFuture1.join());
}
复制代码
测试结果,上面那条数据正常插入到数据库中,下面的数据插入失败。事务并无回滚。
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
public void test(){
CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCompose(
(x) -> CompletableFuture.supplyAsync(() -> sleep(x))
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
从上面的代码中,能够看到在进行计算的时候,是使用到了前面的返回值 x
,整个任务的运行时间是 4 秒。
public void test() {
CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombine(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
两个方法接收的参数是一致的,区别在于他们接收的第二个参数:BiFunction
是否会在提交到线程池中,由另一个任务以异步的方式执行。thenCombine
不会以异步方式执行 BiFunction
而 thenCombineAsync
会以异步的方式执行。
什么时候使用 Async 后缀的方法?
当咱们进行合并的方法是一个耗时的方法时,就尽量的考虑使用 Async 后缀的方法。
咱们日常的操做是,插入数据库时,若是两个操做中,其中一个操做发生异常,是否会回滚?
@Transactional(rollbackFor = Exception.class)
public void save() {
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(111)
.userName("施杰灵")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}).thenCombine(CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(222)
.userName("施杰灵1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a;
});
UserEntity join = completableFuture.join();
System.out.println(join);
}
复制代码
通过实际测试,第二个任务抛出异常,是会回滚的。
Java 8的CompletableFuture 经过thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值作参数。
public void test() {
CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombineAsync(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
).thenAccept((t) -> System.out.println(t + "------"));
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
复制代码
int corePoolSize : 核心池的大小,这个参数与后面讲述的线程池的实现原理有很是大的关系。在建立了线程池后,默认状况下,线程池中并无任何线程,而是等待有任务到来才建立线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就能够看出,是预建立线程的意思,即在没有任务到来以前就建立corePoolSize个线程或者一个线程。默认状况下,在建立了线程池后,线程池中的线程数为0,当有任务来以后,就会建立一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
int maximumPoolSize : 线程池最大线程数,它表示在线程池中最多能建立多少个线程;
long keepAliveTime : 表示线程没有任务执行时最多保持多久时间会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起做用,直到线程池中的线程数不大于corePoolSize:即当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;可是若是调用了**allowCoreThreadTimeOut(boolean)**方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0;
TimeUnit unit : 参数keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue : 一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,通常来讲,这里的阻塞队列有如下几种选择
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
ArrayBlockingQueue和PriorityBlockingQueue使用较少,通常使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
ThreadFactory threadFactory
RejectedExecutionHandler handler : 实现RejectedExecutionHandler接口,可自定义处理器