Stream你们应该都很熟悉了,java8中为全部的集合类都引入了Stream的概念。优雅的链式操做,流式处理逻辑,相信用过的人都会爱不释手。java
每一个数据流都有一个生产者一个消费者。生产者负责产生数据,而消费者负责消费数据。若是是同步系统,生产一个消费一个没什么问题。可是若是在异步系统中,就会产生问题。react
由于生产者没法感知消费者的状态,不知道消费者究竟是繁忙状态仍是空闲状态,是否有能力去消费更多的数据。编程
通常来讲数据队列的长度都是有限的,即便没有作限制,可是系统的内存也是有限的。当太多的数据没有被消费的话,会致使内存溢出或者数据得不到即便处理的问题。异步
这时候就须要back-pressure了。编程语言
若是消息接收方消息处理不过来,则能够通知消息发送方,告知其正在承受压力,须要下降负载。back-pressure是一种消息反馈机制,从而使系统得以优雅地响应负载, 而不是在负载下崩溃。工具
而reactive stream的目的就是用来管理异步服务的流数据交换,并可以让接收方自主决定接受数据的频率。back-pressure就是reactive stream中不可或缺的一部分。测试
更多内容请访问 www.flydean.com
上面咱们讲到了reactive stream的做用,你们应该对reactive stream有了一个基本的了解。这里咱们再给reactive stream作一个定义:spa
reactive stream就是一个异步stream处理的标准,它的特色就是非阻塞的back pressure。code
reactive stream只是一个标准,它定义了实现非阻塞的back pressure的最小区间的接口,方法和协议。blog
因此reactive stream其实有不少种实现的,不只仅是java可使用reactive stream,其余的编程语言也能够。
reactive stream只是定义了最基本的功能,各大实如今实现了基本功能的同时能够自由扩展。
目前reactive stream最新的java版本是1.0.3,是在2019年8月23发布的。它包含了java API,协议定义文件,测试工具集合和具体的实现例子。
在介绍java版本的reactive stream以前,咱们先回顾一下reactive stream须要作哪些事情:
为了实现这4个功能,reactive stream定义了4个接口,Publisher,Subscriber,Subscription,Processor。这四个接口其实是一个观察者模式的实现。接下来咱们详细来分析一下各个接口的做用和约定。
先看下Publisher的定义:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Publisher就是用来生成消息的。它定义了一个subscribe方法,传入一个Subscriber。这个方法用来将Publisher和Subscriber进行链接。
一个Publisher能够链接多个Subscriber。
每次调用subscribe创建链接,都会建立一个新的Subscription,Subscription和subscriber是一一对应的。
一个Subscriber只可以subscribe一次Publisher。
若是subscribe失败或者被拒绝,则会出发Subscriber.onError(Throwable)方法。
先看下Subscriber的定义:
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscriber就是消息的接收者。
在Publisher和Subscriber创建链接的时候会触发onSubscribe(Subscription s)方法。
当调用Subscription.request(long)方法时,onNext(T t)会被触发,根据request请求参数的大小,onNext会被触发一次或者屡次。
在发生异常或者结束时会触发onError(Throwable t)或者onComplete()方法。
先看下Subscription的定义:
public interface Subscription { public void request(long n); public void cancel(); }
Subscription表明着一对一的Subscriber和Publisher之间的Subscribe关系。
request(long n)意思是向publisher请求多少个events,这会触发Subscriber.onNext方法。
cancel()则是请求Publisher中止发送信息,并清除资源。
先看下Processor的定义:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Processor便是Subscriber又是Publisher,它表明着一种处理状态。
在JDK中java.util.concurrent.Flow就是reactive stream语义的一种实现。
Flow从JDK9就开始有了。咱们看下它的结构:
从上图咱们能够看到在JDK中Flow是一个final class,而Subscriber,Publisher,Subscription,Processor都是它的内部类。
咱们会在后面的文章中继续讲解JDK中Flow的使用。敬请期待。
reactive stream的出现有效的解决了异步系统中的背压问题。只不过reactive stream只是一个接口标准或者说是一种协议,具体的实现还须要本身去实现。
本文做者:flydean程序那些事本文连接:http://www.flydean.com/reactive-stream-protocol/
本文来源:flydean的博客
欢迎关注个人公众号:程序那些事,更多精彩等着您!