有且只有一个抽象方法的接口被称为函数式接口,函数式接口适用于函数式编程的场景,Lambda 就是 Java 中函数式编程的体现,可使用Lambda表达式建立一个函数式接口的对象,必定要确保接口中有且只有一个抽象方法,这样Lambda才能顺利的进行推导。 php
函数式接口里除了抽象方法以外,还容许包含默认方法和静态方法。css
与@Override 注解的做用相似,Java 8中专门为函数式接口引入了一个新的注解:@FunctionalInterface 。该注解用于编译级错误检查,加上该注解,当你写的接口不符合函数式接口定义的时候,编译器会报错。 。可是这个注解不是必须的,只要符合函数式接口的定义,那么这个接口就是函数式接口。java
在 java.util.function
包下定义了内置核心四大函数式接口,可使用 lambda 表达式。web
关于这四个接口的介绍以下图所示:算法
函数型接口,有一个输入,有一个输出。编程
public static void main(String[] args) {
// Function function = new Function<String, Integer>() {
// @Override
// public Integer apply(String s) {
// return s.length();
// }
// };
//使用lambda表达式
Function<String, Integer> function = s -> {
return s.length();
};
System.out.println(function.apply("xxx"));
}
复制代码
判定型接口,有一个输入参数,返回只有布尔值。api
public static void main(String[] args) {
//判断字符串是否为空,空返回true
// Predicate predicate = new Predicate<String>() {
// @Override
// public boolean test(String s) {
// return s.isEmpty();
// }
// };
Predicate<String> predicate = str ->{return str.isEmpty();};
System.out.println(predicate.test("ff"));
}
复制代码
消费型接口,有一个输入参数,没有返回值。数组
public static void main(String[] args) {
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String s) {
// System.out.println(s);
// }
// };
Consumer<String> consumer = Str ->{System.out.println(Str);};
consumer.accept("fjdskf");
}
复制代码
供给型接口,没有输入参数,只有返回参数。多线程
public static void main(String[] args) {
// Supplier<String> supplier = new Supplier<String>() {
// @Override
// public String get() {
// return "hresh";
// }
// };
Supplier<String> supplier = () -> {
return "hresh";
};
System.out.println(supplier.get());
}
复制代码
官网文档定义以下:并发
关于流的方法能够去官网看详细介绍。
是数据渠道,用于操做数据源(集合、数组等)所生成的元素序列。
集合存储数据,流讲的是计算!
特色:
一、新建一个实体类 User
@Data
@AllArgsConstructor
public class User {
private int id;
private String name;
private int age;
}
复制代码
二、流式计算
/**
* 题目:请按照给出数据,找出同时知足如下条件的用户
* 也即如下条件:
* 一、所有知足偶数ID
* 二、年龄大于24
* 三、用户名转为大写
* 四、用户名字母倒排序
* 五、只输出一个用户名字 limit
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1,"a",22);
User u2 = new User(2,"b",23);
User u3 = new User(3,"c",24);
User u4 = new User(4,"d",25);
User u5 = new User(6,"e",26);
List<User> list = Arrays.asList(u1,u2,u3,u4,u5);
list.stream().filter(u->{return u.getAge()>23;})
.filter(u->{return u.getId() %2 ==0;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
List<Integer> list2 = null;
list2 = list.stream().map(u -> {return u.getAge()+2;}).collect(Collectors.toList());
list2.forEach(System.out::println);
}
}
复制代码
使用流式计算,代码看起来更加简洁,效率相应也会有所提高。
从 JDK1.7开始,Java 提供 Fork/Join 框架用于并行执行任务。ForkJoin 的框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,而后将各个计算结果进行汇总。相应的 ForkJoin 将复杂的计算当作一个任务。而分解的多个计算则是当作一个子任务。
主要有两步:
它的模型大体是这样的:线程池中的每一个线程都有本身的工做队列(PS:这一点和 ThreadPoolExecutor 不一样,ThreadPoolExecutor 是全部线程共用一个工做队列,全部线程都从这个工做队列中取任务),当本身队列中的任务都完成之后,会从其它线程的工做队列中偷一个任务执行,这样能够充分利用资源。
另外,forkjoin 有一个工做窃取的概念。简单理解,就是一个工做线程下会维护一个包含多个子任务的双端队列。而对于每一个工做线程来讲,会从头部到尾部依次执行任务。这时,总会有一些线程执行的速度较快,很快就把全部任务执行完了。空闲下来的线程不会闲置下来,而是随机选择一个其余的线程从队列的尾巴上“偷走”一个任务。这个过程会一直继续下去,知道全部的任务都执行完毕。
工做窃取(work-stealing)算法是指某个线程从其余队列里窃取任务来执行。工做窃取的运行流程图以下:
工做窃取算法的优势是充分利用线程进行并行计算,并减小了线程间的竞争,其缺点是在某些状况下仍是存在竞争,好比双端队列里只有一个任务时。而且消耗了更多的系统资源,好比建立多个线程和多个双端队列。
ForkJoinPool
在官方文档中有以下定义:
ForkJoinPool
执行任务的线程池,继承了 AbstractExecutorService
类,该线程池是经过DefaultForkJoinWorkerThreadFactory
或者 InnoCuousForkJoinWorkerThreadFactory
线程工厂产生的工做线程 。
ForkJoinPool
主要经过 execute
、invoke
和 submit
这三个方法来处理任务 ForkJoinTask
。查看方法详细介绍可知:execute
方法异步执行给定任务,无返回值;invoke
方法执行给定的任务,在完成后返回其结果,结果类型与 ForkJoinTask
中的 V 类型一致;submit
方法执行任务 ForkJoinTask
并返回一个结果任务 ForkJoinTask
。
查看上述三个方法,实质上都执行的是 externalPush
方法,在该方法中有个任务队列 WorkQueue
,它是 ForkJoinPool
的内部类, WorkQueue
中有执行任务的线程(ForkJoinWorkerThread
owner
),还有这个线程须要处理的任务(ForkJoinTask<?>[] array
),新提交的任务就是加到 array 中。
ForkJoinWorkerThread
执行任务的工做线程,即 ForkJoinPool
线程池里面的线程,每一个线程都维护者一个双端队列 WorkQueue
,用于存放内部任务。
ForkJoinTask
ForkJoinTask
表明运行在 ForkJoinPool
中的任务。主要方法:
fork()
在当前线程运行的线程池中安排一个异步执行。简单的理解就是再建立一个子任务。join()
当任务完成的时候返回计算结果。invoke()
开始执行任务,若是必要,等待计算完成。子类: Recursive:递归
RecursiveAction
一个递归无结果的 ForkJoinTask
(没有返回值)RecursiveTask
一个递归有结果的 ForkJoinTask
(有返回值)RecursiveTask
实现类
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start; //起始值
private Long end; //结束值
public static final Long temp = 10000L;//临界值
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
Long length = end - start;
//判断是否拆分完毕
if(length <= temp){
Long sum = 0L;
//若是拆分完毕就相加
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else{
Long middle = (start+end)/2;
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
task1.fork();//拆分,并压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();
//合并结果
return task1.join()+task2.join();
}
}
}
复制代码
测试代码
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Long start = 0L;
Long end = 1000000000L;//10亿
work1(start,end); //5687
// work2(start,end); //4360
// work3(start,end); //195
}
//普通线程计算
public static void work1(Long start,Long end){
long startTime = System.currentTimeMillis();
Long sum=0L;
for (Long i = start; i<= end; i++) {
sum+=i;
}
long endTime = System.currentTimeMillis();
System.out.println("result="+sum+",time="+(endTime-startTime));
}
//ForkJoin实现
public static void work2(Long start,Long end) throws ExecutionException, InterruptedException {
long startTime = System.currentTimeMillis();
Long sum=0L;
ForkJoinPool pool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持
ForkJoinTask task = new ForkJoinDemo(start,end);
// ForkJoinTask result = pool.submit(task);
// sum = (Long) task.get();
sum = (Long) pool.invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("result="+sum+",time="+(endTime-startTime));
}
//并行流进行大数值运算
public static void work3(Long start,Long end) {
long startTime = System.currentTimeMillis();
Long sum=0L;
sum = LongStream.rangeClosed(start,end).parallel().reduce(0,Long::sum);
long endTime = System.currentTimeMillis();
System.out.println("result="+sum+",time="+(endTime-startTime));
}
}
复制代码
咱们前面讲并发编程一直都着重于多线程同步调用,除了同步线程,还存在异步线程。在此以前咱们来回顾一下同步和异步的定义。
同步:就是当任务A依赖于任务B的执行时,必须等待任务B执行完毕以后任务A才继续执行,此过程任务A被阻塞。任务要么都成功,要么都失败!想想咱们打电话的情景便可! 异步:任务A调用任务B,任务A不须要等到任务B执行完毕,任务B只是返回一个虚拟的结果给任务A,使得任务A可以继续作其余事情,等到任务B执行完成以后再通知任务A(回调)或者是任务A主动去请求任务B要结果。
Future 模式的核心思想是可以让主线程将原来须要同步等待的这段时间用来作其余的事情。(由于能够异步得到执行结果,因此不用一直同步等待去得到执行结果)
上图简单描述了普通模式和使用Future的区别,普通模式下,客户端访问服务端,等待结果返回很是耗时,此时客户端只能等待没法去作其余任务。而 Future 模式下,客户端向服务端发送完请求以后,先获得一个虚拟结果,真实的结果在将来某个时刻完成以后返回给客户端,而客户端在此期间能够去作其余任务。
Future的优势:比更底层的 Thread
更易用。要使用 Future
,一般只须要将耗时的操做封装在一个 Callable
对象中,再将它提交给 ExecutorService
。
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并得到Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
复制代码
当咱们提交一个Callable
任务后,咱们会同时得到一个Future
对象,而后,咱们在主线程某个时刻调用Future
对象的get()
方法,就能够得到异步执行的结果。在调用get()
时,若是异步任务已经完成,咱们就直接得到结果。若是异步任务尚未完成,那么get()
会阻塞,直到任务完成后才返回结果。
一个Future
接口表示一个将来可能会返回的结果,它定义的方法有:
get()
:获取结果(可能会等待)get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间;cancel(boolean mayInterruptIfRunning)
:取消当前任务;isDone()
:判断任务是否已完成。使用Future
得到异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,由于主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
作了改进,能够传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture
能够指定异步处理流程:
runAsync()
返回无结果的CompletableFuture
;
supplyAsync()
返回无结果的CompletableFuture
;
whenComplete()
处理正常和异常结果;
thenAccept()
处理正常结果;
exceptional()
处理异常结果;
thenApplyAsync()
用于串行化另外一个CompletableFuture
;
anyOf()
和allOf()
用于并行化多个CompletableFuture
。
CompletableFuture.runAsync()
返回一个CompletableFuture
,它须要一个实现了Runnable
接口的对象 ,无返回值(此处说的无返回值指的是 CompletableFuture)。
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
System.out.println("主线程优先执行");
completableFuture.get();
}
复制代码
执行结果为:
主线程优先执行
ForkJoinPool.commonPool-worker-1
复制代码
CompletableFuture.supplyAsync()
返回一个CompletableFuture
,它须要一个实现了Supplier
接口的对象 ,有返回值。
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
//建立一个CompletableFuture
CompletableFuture<Double> cfture = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);//lambda语法简化方法调用
// cfture.thenAccept(result ->{// 若是执行成功
// System.out.println(result);
// }).exceptionally(e ->{// 若是执行异常
// e.printStackTrace();
// return null;
// });
cfture.whenComplete((r1,r2) ->{
System.out.println("执行结果为:"+r1); //输出执行成功的结果
System.out.println("异常信息:"+r2); //输出异常信息
}).exceptionally(e ->{// 若是执行异常
e.printStackTrace();
return null;
});
// 主线程不要马上结束,不然CompletableFuture默认使用的线程池会马上关闭
TimeUnit.SECONDS.sleep(2);
System.out.println("主线程执行完毕");
}
static Double fetchPrice() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
复制代码
无异常时结果为:
执行结果为:6.110276836465158
异常信息:null
主线程执行完毕
复制代码
抛出异常结果为:
相比Future
,CompletableFuture
更强大的功能是,多个CompletableFuture
能够串行执行。
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
cfQuery.thenAccept((result) -> {
System.out.println("query result: " + result);
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要马上结束,不然CompletableFuture默认使用的线程池会马上关闭:
TimeUnit.SECONDS.sleep(2);
}
static String queryCode(String name) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
return name;
}
static Double fetchPrice(String code) {
try {
TimeUnit.MILLISECONDS.sleep(600);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
复制代码
除了串行执行外,多个CompletableFuture
还能够并行执行。例如,咱们考虑这样的场景:
同时重新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时重新浪和网易查询,只要任意一个返回结果,就完成操做。
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromBing = CompletableFuture.supplyAsync(() -> {
return queryName("hresh", "https://cn.bing.com/");
});
CompletableFuture<String> cfQueryFromBaidu = CompletableFuture.supplyAsync(() -> {
return queryName("hresh2", "https://cn.baidu.com/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromBing, cfQueryFromBaidu);
// 并行执行结果多是两个CompletableFuture中任意一个的返回结果
cfQuery.thenAccept((result) -> {
System.out.println("name: " + result);
});
// 主线程不要马上结束,不然CompletableFuture默认使用的线程池会马上关闭:
Thread.sleep(200);
}
static String queryName(String name, String url) {
System.out.println("query name from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return name;
}
}
复制代码