从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。
但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是由于先入为主的缘由(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 MongoDB 版本。
不管如何,因为 Reactive 的发展,将来使用异步驱动应该是一个趋势。php
在使用 Async Driver 以前,须要对 Reactive 的概念有一些熟悉。html
响应式(Reactive)是一种异步的、面向数据流的开发方式,最先是来自于.NET 平台上的 Reactive Extensions 库,随后被扩展为各类编程语言的实现。
在著名的 Reactive Manifesto(响应式宣言) 中,对 Reactive 定义了四个特征:java
在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,因而乎就有了 2013年发起的 响应式流规范(Reactive Stream Specification)。react
https://www.reactive-streams.org/git
其中,对于响应式流的处理环节又作了以下定义:github
Java 平台则是在 JDK 9 版本上发布了对 Reactive Streams 的支持。mongodb
下面介绍响应式流的几个关键接口:数据库
当订阅成功后,可使用 Subscription 的 request(long n) 方法来请求发布者发布 n 条数据。发布者可能产生3种不一样的消息通知,分别对应 Subscriber 的另外3个回调方法。编程
数据通知:对应 onNext 方法,表示发布者产生的数据。
错误通知:对应 onError 方法,表示发布者产生了错误。
结束通知:对应 onComplete 方法,表示发布者已经完成了全部数据的发布。
在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知以后,不会再有其余通知产生。缓存
这几个接口的关系以下图所示:
图片出处:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html
MongoDB 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 Reactive Stream 的上述接口。
> 除了 reactivestream 以外,MongoDB 的异步驱动还包含 RxJava 等风格的版本,有兴趣的读者能够进一步了解
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
接下来,经过一个简单的例子来演示一下 Reactive 方式的代码风格:
org.mongodb mongodb-driver-reactivestreams 1.11.0
> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson, mongodb-driver-async组件
//服务器实例表 List servers = new ArrayList(); servers.add(new ServerAddress("localhost", 27018)); //配置构建器 MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); //传入服务器实例 settingsBuilder.applyToClusterSettings( builder -> builder.hosts(servers)); //构建 Client 实例 MongoClient mongoClient = MongoClients.create(settingsBuilder.build());
//得到数据库对象 MongoDatabase database = client.getDatabase(databaseName); //得到集合 MongoCollection collection = database.getCollection(collectionName); //异步返回Publisher FindPublisher publisher = collection.find(); //订阅实现 publisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { System.out.println("start..."); //执行请求 s.request(Integer.MAX_VALUE); } @Override public void onNext(Document document) { //得到文档 System.out.println("Document:" + document.toJson()); } @Override public void onError(Throwable t) { System.out.println("error occurs."); } @Override public void onComplete() { System.out.println("finished."); } });
注意到,与使用同步驱动不一样的是,collection.find()方法返回的不是 Cursor,而是一个 FindPublisher对象,这是Publisher接口的一层扩展。
并且,在返回 Publisher 对象时,此时并无产生真正的数据库IO请求。 真正发起请求须要经过调用 Subscription.request()方法。
在上面的代码中,为了读取由 Publisher 产生的结果,经过自定义一个Subscriber,在onSubscribe 事件触发时就执行 数据库的请求,以后分别对 onNext、onError、onComplete进行处理。
尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想若是对于每一个数据库操做都要完成一个Subscriber 逻辑,那么开发的工做量是巨大的。
为了尽量复用重复的逻辑,能够对Subscriber的逻辑作一层封装,包含以下功能:
代码以下:
public class ObservableSubscriber implements Subscriber { //响应数据 private final List received; //错误信息 private final List errors; //等待对象 private final CountDownLatch latch; //订阅器 private volatile Subscription subscription; //是否完成 private volatile boolean completed; public ObservableSubscriber() { this.received = new ArrayList(); this.errors = new ArrayList(); this.latch = new CountDownLatch(1); } @Override public void onSubscribe(final Subscription s) { subscription = s; } @Override public void onNext(final T t) { received.add(t); } @Override public void onError(final Throwable t) { errors.add(t); onComplete(); } @Override public void onComplete() { completed = true; latch.countDown(); } public Subscription getSubscription() { return subscription; } public List getReceived() { return received; } public Throwable getError() { if (errors.size() > 0) { return errors.get(0); } return null; } public boolean isCompleted() { return completed; } /** * 阻塞必定时间等待结果 * * @param timeout * @param unit * @return * @throws Throwable */ public List get(final long timeout, final TimeUnit unit) throws Throwable { return await(timeout, unit).getReceived(); } /** * 一直阻塞等待请求完成 * * @return * @throws Throwable */ public ObservableSubscriber await() throws Throwable { return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /** * 阻塞必定时间等待完成 * * @param timeout * @param unit * @return * @throws Throwable */ public ObservableSubscriber await(final long timeout, final TimeUnit unit) throws Throwable { subscription.request(Integer.MAX_VALUE); if (!latch.await(timeout, unit)) { throw new MongoTimeoutException("Publisher onComplete timed out"); } if (!errors.isEmpty()) { throw errors.get(0); } return this; } }
借助这个基础的工具类,咱们对于文档的异步操做就变得简单多了。
好比对于文档查询的操做能够改造以下:
ObservableSubscriber subscriber = new ObservableSubscriber(); collection.find().subscribe(subscriber); //结果处理 subscriber.get(15, TimeUnit.SECONDS).forEach( d -> { System.out.println("Document:" + d.toJson()); });
固然,这个例子还有能够继续完善,好比使用 List 做为缓存,则要考虑数据量的问题,避免将所有(或超量) 的文档一次性转入内存。