为了应对高并发场景下到服务端编程需求,微软最早提出了一种异步编程到方案Reactive Programming
,也就是反应式编程。react
以后在Java社区就出现了RxJava和Akka Stream等技术方案,让Java平台在反应式编程上有了多种选择。算法
函数式编程编程
反应式编程通常是基于函数式编程实现的,函数式编程有以下特色:安全
反应式编程是一种基于数据流,传递变化,声明式的编程范式。服务器
事件驱动网络
思想是组件之间交互经过松耦合的生产者和消费者来实现的,而且事件以异步,非阻塞方式进行发送和接收。数据结构
事件驱动是系统经过推模式实现的,也就是生产者在消息产生时推送数据给消费者进行处理,而不是让消费者不断轮询或等待数据实现的。并发
响应及时框架
因为反应式是异步的,好比进行数据处理的话,在交出任务以后就快速返回,而不是阻塞的等待任务执行完毕再返回。任务的执行给到后台线程执行,等任务处理完成以后返回,好比Java8的CompletableFuture。异步
事件弹性
事件驱动系统是松耦合的,上下游之间不是直接依赖,可是在Debug时成本更高一些。
Spring Reactor是Pivotal基于反应式编程实现的一种方案。是一种非阻塞,事件驱动的编程方案,使用函数式编程实现。
观察者模式
反应式编程和命令式编程在迭代器上的实现:
背压
若是Publisher发布消息太快,超过Subscriber处理速度该怎么办?响应式编程引入了背压概念,使得Subscriber可以控制消费消息的速度。
在Java生态中,Netflix的RxJava,TypeSafe的Scala,Akaka,pivatol的Sping,Reactor都是反应式编程的框架。
Stream不是集合元素,不是数据结构,也不保存数据,只是关于算法和计算,更像一种能够编程的迭代器。
Stream能够并行操做,迭代器只能命令式的,串型操做。 并行操做是将数据分红多段,每个在不一样线程中处理,最后将结果一块儿输出。这样能够大大利用硬件资源。
Reactor主要模块基于Netty实现:
核心类:
反应式编程概念总结:
Reactor使用方式上基本分为三步:
Reactor编程须要先建立出Mono或Flux。
同步调用结果建立对象
Mono<String> helloWorld = Mono.just("Hello World"); // 能够指定序列中包含的所有元素 Flux<String> fewWords = Flux.just("Hello","World"); Flux<String> manyWords = Flux.fromIterable(words);
这种方式通常用在通过一系列非IO型操做后,获得一个对应的对象,当须要将这个对象交给IO操做时,能够经过这种方式转换成Mono或Flux。
异步调用结果建立
若是异步获得结果,好比CompletableFuture能够建立一个Mono:
Mono.fromFuture(completableFuture);
若是这个异步调用不返回CompletableFuture,而有本身的回调方法,那么可使用:
static<T>Mono<T>create(Consumer<MonoSink<T>>callback)
Mono.create(sink ->{ ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url,String.class); entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>(){ @Override public void onSuccess(ResponseEntity<String> result){ sink.success(result.getBody()); } @Override public void onFailure(Throwable ex) { sink.error(ex); } }); });
在进行Mono和Flux处理阶段,通常使用filter
,map
,flatMap
,then
,zip
,reduce
等。
map,flatMap,then 三个频率使用比较高。
then
是下一步意思,表明执行顺序的下一步,不表示下一步依赖于上一步。then方法参数只是一个Mono,入参不是上一步的执行结果。
flatMap和map的参数是Function,是上一步执行的结果。
flatMap和map
传统的命令式编程
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
对应的反应式编程
Mono.just(params) .flatMap(v -> doStep1(v)) .flatMap(v -> doStep2(v)) .flatMap(v -> doStep3(v));
flatMap入参Function的返回值要求是Mono对象。map的入参Function只要求返回一个普通对象。 对于一些返回值是Mono的方法,想将调用串联起链式调用,必须使用flatMap,而不是map。
通常使用Mono.zip,Tuple2等。
传统编程方式并发执行是经过线程池+Future方式实现的。可是在作Future.get时是阻塞的。 Reactor中使用Mono和Flux中的zip方法以下:
Mono<CustomType1> item1Mono = ...; Mono<CustomType2> item2Mono = ...; Mono.zip(items -> { CustomType1 item1 = CustomType1.class.cast(items[0]); CustomType2 item2 = CustomType2.class.cast(items[1]); // Do merge return mergeResult; }, item1Mono, item2Mono);
这样item1Mono 和 item2Mono 过程是并行执行的。
使用zip方法时须要作类型强转换,类型强转换是不安全的
通常使用:Flux.fromIterable(),Flux.reduce()方法。
好比:
Data initData = ...; List<SubData> list = ...; Flux.fromIterable(list) .reduce(initData,(data,itemInList) -> { // Do something on data and itemInList return data; });
直接消费的Mono和Flux就是调用subscriber方法,其余的WebFlux接口能够直接返回框架的Response输出就能够了。
Serverlet3.1支持了异步处理方式,Servlet线程不须要一直阻塞的等待任务执行。Servlet在接收到请求后,将请求委托给业务线程完成,本身则直接返回继续接收新的请求。
因此Servlet3.1适用于那些业务处理很是耗时场景,这样能够减小服务器资源占用,能够提升并发处理速度,可是对于自己响应较为迅速的应用来讲收益不大。
WebFlux的异步处理是基于Reactor实现的,是将输入流适配成Mono或Flux进行统一处理。
在最新的Spring Cloud Gateway中也是基于Netty和WebFlux实现的。
Flux和Mono
Flux和Mono属于事件发布者,相似于生产者,为消费者提供订阅接口。在实现发生时,Flux和Mono会回调消费者对应的方法通知消费者处理事件。 Flux能够触发多个事件,Mono只触发一个事件。
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println ); Mono.fromCallable(System::currentTimeMillis) .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time))) .timeout(Duration.ofSeconds(3), errorHandler::fallback) .doOnSuccess(r -> serviceM.incrementSuccess()) .subscribe(System.out::println);
选型注意
若是在框架中使用了WebFlux,他依赖的安全认证,数据访问都必须使用Reactive API,在存储层目前Reactive只支持MongoDB,Redis和Couchbase等几种不支持事务管理的NoSql,须要注意。
WebFlux并不能将接口耗时减小,只是能够减小线程扩展,提高系统的吞吐和伸缩能力。因为其为异步非阻塞Web框架,因此适用于IO密集型服务,好比咱们交易网关这种。
WebFlux支持两种编程模式:
能够经过Reactive Streams实现背压控制。
ServerRequest和ServerResponse是JDK8友好访问底层HTTP消息的不可变接口。彻底是响应式的。
在使用lambda写处理函数时,若是多个处理函数可能缺少可读性且不易于维护。能够将相关处理函数分组到一个处理程序或控制器类中。