Vert.x 用RxJava响应式编程 译<十一>

TIP:相应的代码在step-8文件夹中(https://github.com/vert-x3/vertx-guide-for-java-devs)java

到目前为止,咱们已经探讨了Vert.x的多个领域,使用基于APIs的回调方法,这种编程模型在多钟编程语言中能够良好实现。而后,它会变得有的冗长乏味,尤为是处理多个时间源或者多个数据流。git

这就是RxJava闪亮的地方,Vert.x能够与之无缝链接。github

使用RxJava APIssql

除了回调的API,Vert.x模块还提供了一个"Rxified"API,须要咱们在maven的pom.xml中加入依赖:数据库

而后咱们把Verticles修改一下,须要将原来类中继承的io.vertx.core.AbstractVerticle修改为io.vertx.rxjava.core.AbstractVerticle,这有什么不一样呢?前面的类继承了后者而且加入io.vertx.rxjava.core.Vertx属性。编程

io.vertx.rxjava.core.Vertx定义了额外的rxSomething(…​)方法,等同于callback-based的存在。微信

让咱们看看MainVerticle,来找一找在实践中更好的实现:异步

rxDeploy方法没有把Handler<AsyncResult<String>>做为一个final的参数,并放回一个Single<String>maven

以及,这个操做并非在方法被调用执行,在你订阅这个Single执行,当这个操做完成,它返回部署的 id 或者异常抛出的信息。编程语言

依次部署verticles

重构MainVerticle,咱们须要肯定部署操做触发有序:

    1.flatMap中执行dbVerticleDeployment方法的结果,HttpServerVerticle部署的任务平常调度。

    2.当被订阅的时候执行,成功或者失败,MainVerticle的future会是完成或者失败。

Rxifying" HttpServerVerticle

若是你按序看这个guide,编辑这个代码,那么你的HttpServerVerticle使用的仍是基于callback-based的API。在使用RxJava API执行异步的操做以前,你须要先重构HttpServerVerticle

Import RxJava versions of Vert.x classes

    1.咱们的backupHandler()方法依旧使用HttpResponse类,所以这是必须导入的。RxJava版本中提供的HttpResponse在某些状况下能够被替换,在step-8文件夹中的没有导入这个类,由于经过lambda表达式推到的缘由。

委托"Rxified" vertx实例

当你有一个io.vertx.rxjava.core.Vertx的时候能够调用一个io.vertx.core.Vertx实例,须要调整Verticle’s srart()建立WikiDatabaseService实例的方法:

同时执行受权查询

在前面的例子中,咱们看到使用RxJava operators 和 Rxified Vert.x API 来依次执行异步操做。可是有时候这是不须要的,或者你只是想让他们同时运行出于性能的缘由。

对于这样的情景,HttpServerVerticle中的JWT 贴可能生成程序是个好的例子。为了建立一个token,咱们须要权限查询完成,可是查询是独立的:

    1.建立了三个Single 对象,表明了不一样的权限查询

    2.当三个操做完成,zip操做带着results回调。

数据库链接

从链接池中获取一个数据库链接,须要作的事情就是JDBCClient调用rxGetConnection

Single<SQLConnection> connection = dbClient.rxGetConnection();

这个方法返回Single<Connection>,这可使你容易的sql查询

Single<ResultSet> resultSet = dbClient.rxQueryWithParams(
  sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));

当SQLConnection没有用的时候,咱们怎么释放这个链接呢?一个简单方便的方式是调用关闭close方法当

Single<SQLConnection>没有被订阅:

    1.当链接被请求的时候,咱们把它放入Single对象中

    2.当没有被订阅的时候,Single调用close方法

如今当咱们须要的额时候就可使用getConnection来执行sql查询

为callbacks和RxJava之间搭桥

这个时候,你可能混淆了RxJava 代码和callback-based API,例如service proxy 接口定义为callbacks,可是实现使用了Vert.x Rxified API。

在这个例子中,io.vertx.rx.java.RxHelper能够适配Handler<AsyncResult<T>>到RxJava Subscriber<T>:

1.fetchAllPagesData是一个一个异步的service proxy操做,在Handler<AsyncResult<List<JsonObject>>>调用中定义

2.toSubscriber适配resultHandler到Subscriber<List<JsonObject>>

数据流

RxJava不只仅在组合不一样的时间源作的很好,对于数据流也是很是好的,不像Vert.x或者JDK中的fututre,Observable能够发布事件流,而不是单个,它配备了一套普遍的数据操做,咱们可使用其中的把一部分来重构咱们的fetchAllPages database verticle 方法:

    1.利用flatMapObservable咱们能够从Single<Result>发起中建立Observable

    2.from方法将数据库results迭代入Observable

    3.由于咱们只须要网页名称,能够映射的每一个JSONObject到第一列

    4.客户端指望数据是按字母顺序排列

    5.event bus service由一个JsonArray组成,collect用来JsonArray::new建立一个新的,而后添加JsonArray::add

 

 

原文连接:http://vertx.io/docs/guide-for-java-devs/

个人微信公众号:

相关文章
相关标签/搜索