Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,做为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不须要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操做和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。java
“背压(反压)back pressure”概念很关键。首先异步消费者会向生产者订阅接收消息,而后当有新的信息可用时,消费者会经过以前订阅时提供的回调函数被再次激活调用。若是生产者发出的信息比消费者可以处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费愈来愈多的资源,埋下潜在的崩溃风险。为了防止这一点,须要有一种机制使消费者能够通知生产者,下降消息的生成速度。生产者能够采用多种策略来实现这一要求,这种机制称为背压。数组
响应式流模型很是简单——订阅者向发布者发送多个元素的异步请求,发布者向订阅者异步发送多个或稍少的元素。响应式流会在pull模型和push模型流处理机制之间动态切换。 当订阅者较慢时,它使用pull模型,当订阅者更快时使用push模型。并发
简单来讲,在响应式流下订阅者能够与发布者沟通,若是使用JMS就应该知道,订阅者只能被动接收发布者所产生的消息数据。这就比如没有水龙头的水管同样,我只能被动接收水管里流过来的水,没法关闭也没法减小。而响应式流就至关于给水管加了个水龙头,在消费者这边能够控制水流的增长、减小及关闭。框架
响应式流模型图:异步
发布者(Publisher)是潜在的无限数量的有序元素的生产者。发布者可能有多个来自订阅者的待处理请求。ide
订阅者(Subscriber)从发布者那里订阅并接收元素。订阅者能够请求更多的元素。函数
JDK9 经过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流。在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于创建流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:this
Flow类结构以下:.net
Publisher
是可以发出元素的发布者,Subscriber
是接收元素并作出响应的订阅者。当执行Publisher
里的subscribe
方法时,发布者会回调订阅者的onSubscribe
方法,这个方法中,一般订阅者会借助传入的Subscription
向发布者请求n个数据。而后发布者经过不断调用订阅者的onNext
方法向订阅者发出最多n个数据。若是数据所有发完,则会调用onComplete
告知订阅者流已经发完;若是有错误发生,则经过onError
发出错误数据,一样也会终止流。线程
其中,Subscription
至关因而链接Publisher
和Subscriber
的“纽带”。由于当发布者调用subscribe
方法注册订阅者时,会经过订阅者的回调方法onSubscribe
传入Subscription
对象,以后订阅者就可使用这个Subscription
对象的request
方法向发布者“要”数据了。背压机制正是基于此来实现的。
以下图:
Processor
则是集Publisher
和Subscriber
于一身,至关因而发布者与订阅者之间的一个”中间人“,能够经过Processor
进行一些中间操做:
/** * A component that acts as both a Subscriber and Publisher. * * @param <T> the subscribed item type * @param <R> the published item type */ public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
以下图:
参考:
1.如下代码简单演示了SubmissionPublisher 和这套发布-订阅框架的基本使用方式:
package com.example.demo; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * @program: demo * @description: Flow Demo * @author: 01 * @create: 2018-10-04 13:25 **/ public class FlowDemo { public static void main(String[] args) throws Exception { // 1. 定义发布者, 发布的数据类型是 Integer // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口 SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>(); // 2. 定义订阅者 Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { // 保存订阅关系, 须要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者已经达到了目标, 能够调用cancel告诉发布者再也不接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 咱们能够告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 所有数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 3. 发布者和订阅者 创建订阅关系 publiser.subscribe(subscriber); // 4. 生产数据, 并发布 // 这里忽略数据生产过程 for (int i = 0; i < 3; i++) { System.out.println("生成数据:" + i); // submit是个block方法 publiser.submit(i); } // 5. 结束后 关闭发布者 // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 publiser.close(); // 主线程延迟中止, 不然数据没有消费就会退出 Thread.currentThread().join(1000); // debug的时候, 下面这行须要有断点 // 不然主线程结束没法debug System.out.println(); } }
运行结果以下:
上文中提到过能够调节发布者的数据产出速度,那么这个速度是如何调节的呢?关键就在于submit方法,该方法是一个阻塞方法。须要先说明的是SubmissionPublisher里有一个数据缓冲区,用于缓冲发布者产生的数据,而这个缓冲区是利用一个Object数组实现的,缓冲区最大长度为256。咱们能够在onSubscribe方法里打上断点,查看到这个缓冲区:
当这个缓冲区的数据满了以后,submit方法就会进入阻塞状态,发布者数据的产生速度就会变慢,以此实现调节发布者的数据产出速度。
2.第二个例子演示告终合Processor的使用方式,代码以下:
package com.example.demo; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * Processor, 须要继承SubmissionPublisher并实现Processor接口 * * 输入源数据 integer, 过滤掉小于0的, 而后转换成字符串发布出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 须要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("处理器接受到数据: " + item); // 过滤掉小于0的, 而后发布出去 if (item > 0) { this.submit("转换后的数据:" + item); } // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者再也不接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 咱们能够告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 所有数据处理完了(发布者关闭了) System.out.println("处理器处理完了!"); // 关闭发布者 this.close(); } } /** * 带 process 的 flow demo * @author 01 */ public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定义发布者, 发布的数据类型是 Integer // 直接使用jdk自带的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>(); // 2. 定义处理器, 对数据进行过滤, 并转换为String类型 MyProcessor processor = new MyProcessor(); // 3. 发布者 和 处理器 创建订阅关系 publiser.subscribe(processor); // 4. 定义最终订阅者, 消费 String 类型数据 Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 须要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者再也不接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 咱们能够告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 所有数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 5. 处理器 和 最终订阅者 创建订阅关系 processor.subscribe(subscriber); // 6. 生产数据, 并发布 // 这里忽略数据生产过程 publiser.submit(-111); publiser.submit(111); // 7. 结束后 关闭发布者 // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 publiser.close(); // 主线程延迟中止, 不然数据没有消费就退出 Thread.currentThread().join(1000); } }
运行结果以下: