只有光头才能变强。文本已收录至个人GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3yhtml
本文知识点架构:前端
若是有关注我公众号文章的同窗就会发现,最近我不定时转发了一些比较好的WebFlux的文章,由于我最近在学。java
我以前也说过,学习一项技术以前,先要了解为何要学这项技术。其实此次学习WebFlux
也没有多大的原生动力,主要是在咱们组内会轮流作一次技术分享,而我又不知道分享什么比较好...react
以前在初学大数据相关的知识,可是这一块的时间线会拉得比较长,感受赶不及小组内分享(而组内的同窗又大部分都懂大数据,就只有我一个菜鸡,泪目)。因此,想的是:“要不我学点新东西搞搞?”。因而就花了点时间学WebFlux
啦~git
这篇文章主要讲解什么是WebFlux
,带领你们入个门,但愿对你们有所帮助(至少看完这篇文章,知道WebFlux是干吗用的)程序员
咱们从Spring
的官网拉下一点点就能够看到介绍WebFlux
的地方了github
从官网的简介中咱们能得出什么样的信息?web
Spring5
提供了一整套响应式(非阻塞)的技术栈供咱们使用(包括Web控制器、权限控制、数据访问层等等)。而左侧的图则是技术栈的对比啦;spring
总结起来,WebFlux只是响应式编程中的一部分(在Web控制端),因此通常咱们用它与SpringMVC来对比。编程
在上面提到了响应式编程(Reactive Programming),而WebFlux只是响应式编程的其中一个技术栈而已,因此咱们先来探讨一下什么是响应式编程
从维基百科里边咱们获得的定义:
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式
在维基百科上也举了个小例子:
意思大概以下:
a=b+c
,这就意味着a
的值是由b
和c
计算出来的。若是b
或者c
后续有变化,不会影响到a
的值a:=b+c
,这就意味着a
的值是由b
和c
计算出来的。但若是b
或者c
的值后续有变化,会影响到a
的值我认为上面的例子已经能够帮助咱们理解变化传递(propagation of change)
那数据流(data stream)和声明式(declarative)怎么理解呢?那能够提一提咱们的Stream流了。以前写过Lambda表达式和Stream流的文章,你们能够先去看看:
Lambda的语法是这样的(Stream流的使用会涉及到不少Lambda表达式的东西,因此通常先学Lambda再学Stream流):
Stream流的使用分为三个步骤(建立Stream流、执行中间操做、执行最终操做):
执行中间操做实际上就是给咱们提供了不少的API去操做Stream流中的数据(求和/去重/过滤)等等
说了这么多,怎么理解数据流和声明式呢?实际上是这样的:
好比下面的代码;将数组中的数据变成数据流,经过显式声明调用.sum()
来处理数据流中的数据,获得最终的结果:
public static void main(String[] args) { int[] nums = { 1, 2, 3 }; int sum2 = IntStream.of(nums).parallel().sum(); System.out.println("结果为:" + sum2); }
如图下所示:
上面讲了响应式编程是什么:
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式
也讲解了数据流/变化传递/声明式是什么意思,但说到响应式编程就离不开异步非阻塞。
从Spring官网介绍WebFlux的信息咱们就能够发现asynchronous, nonblocking
这样的字样,由于响应式编程它是异步的,也能够理解成变化传递它是异步执行的。
以下图,合计的金额会受其余的金额影响(更新的过程是异步的):
咱们的JDK8 Stream流是同步的,它就不适合用于响应式编程(但基础的用法是须要懂的,由于响应式流编程都是操做流嘛)
而在JDK9 已经支持响应式流了,下面咱们来看一下
响应式流的规范早已经被提出了:里面提到了:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure -----> http://www.reactive-streams.org/
翻译再加点信息:
响应式流(Reactive Streams)经过定义一组实体,接口和互操做方法,给出了实现异步非阻塞 背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等。
规范里头实际上就是定义了四个接口:
Java 平台直到 JDK 9才提供了对于Reactive的完整支持,JDK9也定义了上述提到的四个接口,在java.util.concurrent
包上
一个通用的流处理架构通常会是这样的(生产者产生数据,对数据进行中间处理,消费者拿到数据消费):
到这里咱们再看回响应式流的接口,咱们应该就能懂了:
在响应式流上提到了back pressure(背压)这么一个概念,其实很是好理解。在响应式流实现异步非阻塞是基于生产者和消费者模式的,而生产者消费者很容易出现的一个问题就是:生产者生产数据多了,就把消费者给压垮了。
而背压说白了就是:消费者能告诉生产者本身须要多少许的数据。这里就是Subscription接口所作的事。
下面咱们来看看JDK9接口的方法,或许就更加能理解上面所说的话了:
// 发布者(生产者) public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } // 订阅者(消费者) public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } // 用于发布者与订阅者之间的通讯(实现背压:订阅者可以告诉生产者须要多少数据) public interface Subscription { public void request(long n); public void cancel(); } // 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费 public interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
代码中有大量的注释,我就很少BB了,建议直接复制跑一下看看:
class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 须要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("处理器接受到数据: " + item); // 过滤掉小于0的, 而后发布出去 if (item > 0) { this.submit("转换后的数据:" + item); } // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者再也不接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 咱们能够告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 所有数据处理完了(发布者关闭了) System.out.println("处理器处理完了!"); // 关闭发布者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定义发布者, 发布的数据类型是 Integer // 直接使用jdk自带的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定义处理器, 对数据进行过滤, 并转换为String类型 MyProcessor processor = new MyProcessor(); // 3. 发布者 和 处理器 创建订阅关系 publiser.subscribe(processor); // 4. 定义最终订阅者, 消费 String 类型数据 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 须要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者再也不接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 咱们能够告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 所有数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 5. 处理器 和 最终订阅者 创建订阅关系 processor.subscribe(subscriber); // 6. 生产数据, 并发布 publiser.submit(-111); publiser.submit(111); // 7. 结束后 关闭发布者 // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 publiser.close(); // 主线程延迟中止, 不然数据没有消费就退出 Thread.currentThread().join(1000); } }
输出的结果以下:
流程实际上很是简单的:
参考资料:
Java 8 的 Stream 主要关注在流的过滤,映射,合并,而 Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调
说白了就是:响应式流是异步非阻塞+流量控制的(能够告诉生产者本身须要多少的量/取消订阅关系)
展望响应式编程的场景应用:
好比一个日志监控系统,咱们的前端页面将再也不须要经过“命令式”的轮询的方式不断向服务器请求数据而后进行更新,而是在创建好通道以后,数据流从系统源源不断流向页面,从而展示实时的指标变化曲线;再好比一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。
扯了一大堆,终于回到WebFlux了。通过上面的基础,咱们如今已经可以得出一些结论的了:
咱们再回来看官网的图:
Spring官方为了让咱们更加快速/平滑到WebFlux上,以前SpringMVC那套都是支持的。也就是说:咱们能够像使用SpringMVC同样使用着WebFlux。
WebFlux使用的响应式流并非用JDK9平台的,而是一个叫作Reactor响应式流库。因此,入门WebFlux其实更可能是了解怎么使用Reactor的API,下面咱们来看看~
Reactor是一个响应式流,它也有对应的发布者(Publisher
),Reactor的发布者用两个类来表示:
而订阅者则是Spring框架去完成
下面咱们来看一个简单的例子(基于WebFlux环境构建):
// 阻塞5秒钟 private String createStr() { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } return "some string"; } // 普通的SpringMVC方法 @GetMapping("/1") private String get1() { log.info("get1 start"); String result = createStr(); log.info("get1 end."); return result; } // WebFlux(返回的是Mono) @GetMapping("/2") private Mono<String> get2() { log.info("get2 start"); Mono<String> result = Mono.fromSupplier(() -> createStr()); log.info("get2 end."); return result; }
首先,值得说明的是,咱们构建WebFlux环境启动时,应用服务器默认是Netty的:
咱们分别来访问一下SpringMVC的接口和WebFlux的接口,看一下有什么区别:
SpringMVC:
WebFlux:
从调用者(浏览器)的角度而言,是感知不到有什么变化的,由于都是得等待5s才返回数据。可是,从服务端的日志咱们能够看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。
这正是WebFlux的好处:可以以固定的线程来处理高并发(充分发挥机器的性能)。
WebFlux还支持服务器推送(SSE - >Server Send Event),咱们来看个例子:
/** * Flux : 返回0-n个元素 * 注:须要指定MediaType * @return */ @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE) private Flux<String> flux() { Flux<String> result = Flux .fromStream(IntStream.range(1, 5).mapToObj(i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } return "flux data--" + i; })); return result; }
效果就是每秒会给浏览器推送数据:
很是感谢人才们能看到这里,若是这个文章写得还不错,以为「三歪」我有点东西的话 求点赞 求关注️ 求分享👥 求留言💬 对暖男我来讲真的 很是有用!!!
WebFlux我还没写完,这篇写了WebFlux支持SpringMVC那套注解来开发,下篇写写如何使用WebFlux另外一种模式(Functional Endpoints)来开发以及一些常见的问题还须要补充一下~
本已收录至个人GitHub精选文章,欢迎Star: https://github.com/ZhongFuCheng3y/3y乐于输出干货的Java技术公众号:Java3y。公众号内有300多篇原创技术文章、海量视频资源、精美脑图,关注便可获取!
创做不易,各位的支持和承认,就是我创做的最大动力,咱们下篇文章见! 求点赞 求关注️ 求分享👥 求留言💬