Java 9 Reactive Streams容许咱们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。java
若是您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Streams的简短说明。RxJava和Akka Streams一直是十分优秀的响应流实现库。如今java 9已经经过java.util.concurrent.Flow
API 引入了响应流支持。react
Reactive Streams是关于流的异步处理,所以应该有一个发布者(Publisher)和一个订阅者(Subscriber)。发布者发布数据流,订阅者使用数据。git
有时咱们必须在Publisher和Subscriber之间转换数据。处理器(Processor)是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者能理解它。咱们能够拥有一系列(chain )处理器。github
从上面的图中能够清楚地看出,Processor既能够做为订阅者也能够做为发布者。数据库
Java 9 Flow API实现了Reactive Streams规范。Flow API是Iterator和Observer模式的组合。Iterator在pull
模型上工做,用于应用程序从源中拉取项目;而Observer在push
模型上工做,并在item
从源推送到应用程序时做出反应。编程
Java 9 Flow API订阅者能够在订阅发布者时请求N个项目。而后将项目从发布者推送到订阅者,直到推送玩全部项目或遇到某些错误。
app
让咱们快速浏览一下Flow API类和接口。框架
java.util.concurrent.Flow
:这是Flow API的主要类。该类封装了Flow API的全部重要接口。这是一个final类,咱们不能扩展它。java.util.concurrent.Flow.Publisher
:这是一个功能接口,每一个发布者都必须实现它的subscribe
方法,并添加相关的订阅者以接收消息。java.util.concurrent.Flow.Subscriber
:每一个订阅者都必须实现此接口。订阅者中的方法以严格的顺序进行调用。此接口有四种方法:
onSubscribe
:这是订阅者订阅了发布者后接收消息时调用的第一个方法。一般咱们调用subscription.request
开始从处理器(Processor)接收项目。onNext
:当从发布者收到项目时调用此方法,这是咱们实现业务逻辑以处理流,而后从发布者请求更多数据的方法。onError
:当发生不可恢复的错误时调用此方法,咱们能够在此方法中执行清理操做,例如关闭数据库链接。onComplete
:这就像finally
方法,而且在发布者没有发布其余项目或发布者关闭时调用。咱们能够用它来发送流成功处理的通知。java.util.concurrent.Flow.Subscription
:这用于在发布者和订阅者之间建立异步非阻塞连接。订阅者调用其request
方法来向发布者请求项目。它还有cancel
取消订阅的方法,即关闭发布者和订阅者之间的连接。java.util.concurrent.Flow.Processor
:此接口同时扩展了Publisher
和Subscriber
接口,用于在发布者和订阅者之间转换消息。java.util.concurrent.SubmissionPublisher
:一个Publisher实现,它将提交的项目异步发送给当前订阅者,直到它关闭为止。它使用Executor框架,咱们将在响应流示例中使用该类来添加订阅者,而后向其提交项目。让咱们从一个简单的例子开始,咱们将实现Flow API Subscriber接口并使用SubmissionPublisher来建立发布者和发送消息。异步
假设咱们有一个Employee类,用于建立从发布者发送到订阅者的流消息。ide
package com.journaldev.reactive.beans; public class Employee { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Employee(int i, String s) { this.id = i; this.name = s; } public Employee() { } @Override public String toString() { return "[id="+id+",name="+name+"]"; } }
咱们还有一个实用程序类来为咱们的示例建立一个员工列表。
package com.journaldev.reactive_streams; import java.util.ArrayList; import java.util.List; import com.journaldev.reactive.beans.Employee; public class EmpHelper { public static List<Employee> getEmps() { Employee e1 = new Employee(1, "Pankaj"); Employee e2 = new Employee(2, "David"); Employee e3 = new Employee(3, "Lisa"); Employee e4 = new Employee(4, "Ram"); Employee e5 = new Employee(5, "Anupam"); List<Employee> emps = new ArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5); return emps; } }
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Employee; public class MySubscriber implements Subscriber<Employee> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item"); } @Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done"); } public int getCounter() { return counter; } }
Subscription
变量以保持引用,以即可以在onNext
方法中进行请求。counter
变量以记录处理的项目数,请注意它的值在onNext方法中增长。这将在咱们的main方法中用于在结束主线程以前等待执行完成。onSubscribe
方法中调用订阅请求以开始处理。另请注意,onNext
在处理项目后再次调用方法,要求对下一个从发布者发布的项目进行处理。onError
和onComplete
在例子中没有太多逻辑,但在实际状况中,它们应该用于在发生错误时执行纠正措施或在处理成功完成时清理资源。 咱们将使用SubmissionPublisherPublisher
做为示例,让咱们看一下响应流实现的测试程序。
package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; public class MyReactiveApp { public static void main(String args[]) throws InterruptedException { // Create Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Register Subscriber MySubscriber subs = new MySubscriber(); publisher.subscribe(subs); List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // logic to wait till processing of all messages are over while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // close the Publisher publisher.close(); System.out.println("Exiting the app"); } }
在上述代码中,最重要的部分是发布者subscribe
和submit
方法的调用。咱们应该始终关闭发布者以免任何内存泄漏。
执行上述程序时,咱们将获得如下输出。
Subscribed Publishing Items to Subscriber onSubscribe requested 1 item Processing Employee [id=1,name=Pankaj] Processing Employee [id=2,name=David] Processing Employee [id=3,name=Lisa] Processing Employee [id=4,name=Ram] Processing Employee [id=5,name=Anupam] Exiting the app All Processing Done
请注意,若是咱们在处理全部项目以前,主线程已经退出了,那么咱们将获得不想要的结果。
处理器用于在发布者和订阅者之间转换消息。假设咱们有另外一个用户但愿处理不一样类型的消息。假设这个新的消息类型是Freelancer
。
package com.journaldev.reactive.beans; public class Freelancer extends Employee { private int fid; public int getFid() { return fid; } public void setFid(int fid) { this.fid = fid; } public Freelancer(int id, int fid, String name) { super(id, name); this.fid = fid; } @Override public String toString() { return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; } }
咱们有一个新订阅者使用Freelancer流数据。
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.journaldev.reactive.beans.Freelancer; public class MyFreelancerSubscriber implements Subscriber<Freelancer> { private Subscription subscription; private int counter = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer"); this.subscription = subscription; this.subscription.request(1); //requesting data from publisher System.out.println("onSubscribe requested 1 item for Freelancer"); } @Override public void onNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++; this.subscription.request(1); } @Override public void onError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); } @Override public void onComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); } public int getCounter() { return counter; } }
代码重要的部分是实现Processor
接口。因为咱们想要使用它SubmissionPublisher
,咱们会扩展它并在适合的地方使用它。
package com.journaldev.reactive_streams; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> { private Subscription subscription; private Function<Employee,Freelancer> function; public MyProcessor(Function<Employee,Freelancer> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); } }
Function
将用于将Employee对象转换为Freelancer对象。onNext
方法中的Freelancer消息,而后使用SubmissionPublisher
submit
方法将其发送给订阅者。package com.journaldev.reactive_streams; import java.util.List; import java.util.concurrent.SubmissionPublisher; import com.journaldev.reactive.beans.Employee; import com.journaldev.reactive.beans.Freelancer; public class MyReactiveAppWithProcessor { public static void main(String[] args) throws InterruptedException { // Create End Publisher SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); // Create Processor MyProcessor transformProcessor = new MyProcessor(s -> { return new Freelancer(s.getId(), s.getId() + 100, s.getName()); }); //Create End Subscriber MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); //Create chain of publisher, processor and subscriber publisher.subscribe(transformProcessor); // publisher to processor transformProcessor.subscribe(subs); // processor to subscriber List<Employee> emps = EmpHelper.getEmps(); // Publish items System.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i)); // Logic to wait for messages processing to finish while (emps.size() != subs.getCounter()) { Thread.sleep(10); } // Closing publishers publisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); } }
阅读程序中的注释以正确理解它,最重要的变化是发布者 - 处理器 - 订阅者链的建立。执行上述程序时,咱们将获得如下输出。
Subscribed for Freelancer Publishing Items to Subscriber onSubscribe requested 1 item for Freelancer Processing Freelancer [id=1,name=Pankaj,fid=101] Processing Freelancer [id=2,name=David,fid=102] Processing Freelancer [id=3,name=Lisa,fid=103] Processing Freelancer [id=4,name=Ram,fid=104] Processing Freelancer [id=5,name=Anupam,fid=105] Exiting the app All Processing Done for MyFreelancerSubscriber Done
咱们能够使用Subscription cancel
方法中止在订阅者中接收消息。
请注意,若是咱们取消订阅,则订阅者将不会收到
onComplete
或onError
信号。
如下是一个示例代码,其中订阅者只消费3条消息,而后取消订阅。
@Override public void onNext(Employee item) { System.out.println("Processing Employee "+item); counter++; if(counter==3) { this.subscription.cancel(); return; } this.subscription.request(1); }
请注意,在这种状况下,咱们在处理全部消息以前中止主线程的逻辑将进入无限循环。咱们能够为此场景添加一些额外的逻辑,若是订阅者已中止处理或取消订阅,就使用一些全局变量来标志该状态。
当发布者以比订阅者消费更快的速度生成消息时,会产生背压。Flow API不提供任何关于背压或处理它的信号的机制。但咱们能够设计本身的策略来处理它,例如微调用户或下降信息产生率。您能够阅读RxJava deals with Back Pressure。
Java 9 Flow API是响应式编程和建立异步非阻塞应用程序的良好举措。可是,只有在全部系统API都支持它时,才能建立真正的响应式应用程序。
原文地址:Java 9 Reactive Streams written by Pankaj
完整代码:Github