Vert.x Blueprint 系列教程(三) | Micro-Shop 微服务应用实战

本文章是 Vert.x 蓝图系列 的第三篇教程。全系列:javascript

本系列已发布至Vert.x官网:Vert.x Blueprint Tutorialsgit


前言

欢迎回到Vert.x 蓝图系列!当今,微服务架构 变得愈来愈流行,开发者们都想尝试一下微服务应用的开发和架构设计。使人激动的是,Vert.x给咱们提供了一系列用于微服务开发的组件,包括 Service Discovery (服务发现)、Circuit Breaker (断路器) 以及其它的一些组件。有了Vert.x微服务组件的帮助,咱们就能够快速利用Vert.x搭建咱们的微服务应用。在这篇蓝图教程中,咱们一块儿来探索一个利用Vert.x的各个组件开发的 Micro-Shop 微服务应用~github

经过本教程,你将会学习到如下内容:web

  • 微服务架构redis

  • 如何利用Vert.x来开发微服务应用sql

  • 异步开发模式docker

  • 响应式、函数式编程

  • 事件溯源 (Event Sourcing)

  • 经过分布式 Event Bus 进行异步RPC调用

  • 各类各样的服务类型(例如REST、数据源、Event Bus服务等)

  • 如何使用服务发现模块 (Vert.x Service Discovery)

  • 如何使用断路器模块 (Vert.x Circuit Breaker)

  • 如何利用Vert.x实现API Gateway

  • 如何进行权限认证 (OAuth 2 + Keycloak)

  • 如何配置及使用 SockJS - Event Bus Bridge

以及其它的一些东西。。。

本教程是 Vert.x 蓝图系列 的第三篇教程,对应的Vert.x版本为 3.3.2 。本教程中的完整代码已托管至 GitHub

踏入微服务之门

哈~你必定对“微服务”这个词很熟悉——至少听起来很熟悉~愈来愈多的开发者开始拥抱微服务架构,那么微服务到底是什么呢?一句话总结一下:

Microservices are small, autonomous services that work together.

咱们来深刻一下微服务的各类特性,来看看微服务为什么如此出色:

  • 首先,微服务的重要的一点是“微”。每一个微服务都是独立的,每一个单独的微服务组件都注重某一特定的逻辑。在微服务架构中,咱们将传统的单体应用拆分红许多互相独立的组件。每一个组件都由其特定的“逻辑边界”,所以组件不会过于庞大。不过话又说回来了,每一个组件应该有多小呢?这个问题可很差回答,它一般取决与咱们的业务与负载。正如Sam Newman在其《Building
    Microservices》书中所讲的那样:

We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.

所以,当咱们以为每一个组件不是特别大的时候,组件的大小可能就刚恰好。

  • 在微服务架构中,组件之间能够经过任意协议进行通讯,好比 HTTPAMQP

  • 每一个组件是独立的,所以咱们能够在不一样的组件中使用不一样的编程语言,不一样的技术 —— 这就是所谓的 polyglot support (不错,Vert.x也是支持多语言的!)

  • 每一个组件都是独立开发、部署以及发布的,因此这减小了部署及发布的难度。

  • 微服务架构一般与分布式系统如影随行,因此咱们还须要考虑分布式系统中的方方面面,包括可用性、弹性以及可扩展性。

  • 微服务架构一般被设计成为 面向失败的,由于在分布式系统中失败的场景很是复杂,咱们须要有效地处理失败的手段。

虽然微服务有如此多的优势,可是不要忘了,微服务可不是银弹,由于它引入了分布式系统中所带来的各类问题,所以设计架构时咱们都要考虑这些状况。

服务发现

在微服务架构中,每一个组件都是独立的,它们都不知道其余组件的位置,可是组件之间又须要通讯,所以咱们必须知道各个组件的位置。然而,把位置信息写死在代码中显然很差,所以咱们须要一种机制能够动态地记录每一个组件的位置 —— 这就是 服务发现。有了服务发现模块,咱们就能够将服务位置发布至服务发现模块中,其它服务就能够从服务发现模块中获取想要调用的服务的位置并进行调用。在调用服务的过程当中,咱们不须要知道对应服务的位置,因此当服务位置或环境变更时,服务调用能够不受影响,这使得咱们的架构更加灵活。

Vert.x提供了一个服务发现模块用于发布和获取服务记录。在Vert.x 服务发现模块,每一个服务都被抽象成一个Record(服务记录)。服务提供者能够向服务发现模块中发布服务,此时Record会根据底层ServiceDiscoveryBackend的配置存储在本地Map、分布式Map或Redis中。服务消费者能够从服务发现模块中获取服务记录,而且经过服务记录获取对应的服务实例而后进行服务调用。目前Vert.x原生支持好几种服务类型,好比 Event Bus 服务(即服务代理)、HTTP 端点消息源 以及 数据源。固然咱们也能够实现本身的服务类型,能够参考相关的文档。在后面咱们还会详细讲述如何使用服务发现模块,这里先简单作个了解。

异步的、响应式的Vert.x

异步与响应式风格都很适合微服务架构,而Vert.x兼具这两种风格!异步开发模式相信你们已经了然于胸了,而若是你们读过前几篇蓝图教程的话,响应式风格你们必定不会陌生。有了基于Future以及基于RxJava的异步开发模式,咱们能够为所欲为地对异步过程进行组合和变换,这样代码能够很是简洁,很是优美!在本蓝图教程中,咱们会见到大量基于Future和RxJava的异步方法。

Mirco Shop 微服务应用

好啦,如今你们应该对微服务架构有了一个大体的了解了,下面咱们来说一下本蓝图中的微服务应用。这是一个简单的 Micro-Shop 微服务应用 (目前只完成了基本功能),人们能够进行网上购物以及交易。。。当前版本的微服务应用包含下列组件:

  • 帐户服务:提供用户帐户的操做服务,使用MySQL做为后端存储。

  • 商品服务:提供商品的操做服务,使用MySQL做为后端存储。

  • 库存服务:提供商品库存的操做服务,如查询库存、增长库存即减小库存。使用Redis做为后端存储。

  • 网店服务:提供网店的操做即管理服务,使用MongoDB做为后端存储。

  • 购物车服务:提供购物车事件的生成以及购物车操做(添加、删除商品以及结算)服务。咱们经过此服务来说述 事件溯源

  • 订单服务:订单服务从Event Bus接收购物车服务发送的订单请求,接着处理订单并将订单发送至下层服务(本例中仅仅简单地存储至数据库中)。

  • Micro Shop 前端:此微服务的前端部分(SPA),目前已整合至API Gateway组件中。

  • 监视仪表板:用于监视微服务系统的状态以及日志、统计数据的查看。

  • API Gateway:整个微服务的入口,它负责将收到的请求按照必定的规则分发至对应的组件的REST端点中(至关于反向代理)。它也负责权限认证与管理,负载均衡,心跳检测以及失败处理(使用Vert.x Circuit Breaker)。

Micro Shop 微服务架构

咱们来看一下Micro Shop微服务应用的架构:

Microservice Architecture

用户请求首先通过API Gateway,再经其处理并分发至对应的业务端点。

咱们再来看一下每一个基础组件内部的结构(基础组件即图中最下面的各个业务组件)。

组件结构

每一个基础组件至少有两个Verticle:服务Verticle以及REST Verticle。REST Vertice提供了服务对应的REST端点,而且也负责将此端点发布至服务发现层。而服务Verticle则负责发布其它服务(如Event Bus服务或消息源)而且部署REST Verticle。

每一个基础组件中都包含对应的服务接口,如商品组件中包含ProductService接口。这些服务接口都是Event Bus 服务,由@ProxyGen注解修饰。上篇蓝图教程中咱们讲过,Vert.x Service Proxy能够自动为@ProxyGen注解修饰的接口生成服务代理类,所以咱们能够很方便地在Event Bus上进行异步RPC调用而不用写额外的代码。很酷吧!而且有了服务发现组件之后,咱们能够很是方便地将Event Bus服务发布至服务发现层,这样其它组件能够更方便地调用服务。

Component structure

组件之间的通讯

咱们先来看一下咱们的微服务应用中用到的服务类型:

  • HTTP端点 (e.g. REST 端点以及API Gateway) - 此服务的位置用URL描述

  • Event Bus服务 - 此服务的位置用Event Bus上的一个特定地址描述

  • 事件源 - 事件源服务对应Event Bus上某个地址的事件消费者。此服务的位置用Event Bus上的一个特定地址描述

所以,咱们各个组件之间能够经过HTTP以及Event Bus(本质是TCP)进行通讯,例如:

Interaction

API Gateway与其它组件经过HTTP进行通讯。

让咱们开始吧!

好啦,如今开始咱们的微服务蓝图旅程吧!首先咱们从GitHub上clone项目:

git clone https://github.com/sczyh30/vertx-blueprint-microservice.git

在本蓝图教程中,咱们使用 Maven 做为构建工具。咱们首先来看一下pom.xml配置文件。咱们能够看到,咱们的蓝图应用由许多模块构成:

<modules>
  <module>microservice-blueprint-common</module>
  <module>account-microservice</module>
  <module>product-microservice</module>
  <module>inventory-microservice</module>
  <module>store-microservice</module>
  <module>shopping-cart-microservice</module>
  <module>order-microservice</module>
  <module>api-gateway</module>
  <module>cache-infrastructure</module>
  <module>monitor-dashboard</module>
</modules>

每一个模块表明一个组件。看着配置文件,彷佛有很多组件呢!不要担忧,咱们将会一一探究这些组件。下面咱们先来看一下全部组件的基础模块 - microservice-blueprint-common

微服务基础模块

microservice-blueprint-common模块提供了一些微服务功能相关的辅助类以及辅助Verticle。咱们先来看一下两个base verticles - BaseMicroserviceVerticleRestAPIVerticle

Base Microservice Verticle

BaseMicroserviceVerticle提供了与微服务相关的初始化函数以及各类各样的辅助函数。其它每个Verticle都会继承此Verticle,所以这个基础Verticle很是重要。

首先咱们来看一下其中的成员变量:

protected ServiceDiscovery discovery;
protected CircuitBreaker circuitBreaker;
protected Set<Record> registeredRecords = new ConcurrentHashSet<>();

discovery以及circuitBreaker分别表明服务发现实例以及断路器实例,而registeredRecords表明当前已发布的服务记录的集合,用于在结束Verticle时注销服务。

start函数中主要是对服务发现实例和断路器实例进行初始化,配置文件从config()中获取。它的实现很是简单:

@Override
public void start() throws Exception {
  // init service discovery instance
  discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config()));

  // init circuit breaker instance
  JsonObject cbOptions = config().getJsonObject("circuit-breaker") != null ?
    config().getJsonObject("circuit-breaker") : new JsonObject();
  circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx,
    new CircuitBreakerOptions()
      .setMaxFailures(cbOptions.getInteger("max-failures", 5))
      .setTimeout(cbOptions.getLong("timeout", 10000L))
      .setFallbackOnFailure(true)
      .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L))
  );
}

下面咱们还提供了几个辅助函数用于发布各类各样的服务。这些函数都是异步的,而且基于Future:

protected Future<Void> publishHttpEndpoint(String name, String host, int port) {
  Record record = HttpEndpoint.createRecord(name, host, port, "/",
    new JsonObject().put("api.name", config().getString("api.name", ""))
  );
  return publish(record);
}

protected Future<Void> publishMessageSource(String name, String address) {
  Record record = MessageSource.createRecord(name, address);
  return publish(record);
}

protected Future<Void> publishJDBCDataSource(String name, JsonObject location) {
  Record record = JDBCDataSource.createRecord(name, location, new JsonObject());
  return publish(record);
}

protected Future<Void> publishEventBusService(String name, String address, Class serviceClass) {
  Record record = EventBusService.createRecord(name, address, serviceClass);
  return publish(record);
}

以前咱们提到过,每一个服务记录Record表明一个服务,其中服务类型由记录中的type字段标识。Vert.x原生支持的各类服务接口中都包含着好几个createRecord方法所以咱们能够利用这些方法来方便地建立服务记录。一般状况下咱们须要给每一个服务都指定一个name,这样以后咱们就能够经过名称来获取服务了。咱们还能够经过setMetadata方法来给服务记录添加额外的元数据。

你可能注意到在publishHttpEndpoint方法中咱们就提供了含有api-name的元数据,以后咱们会了解到,API Gateway在进行反向代理时会用到它。

下面咱们来看一下发布服务的通用方法 —— publish方法:

private Future<Void> publish(Record record) {
  Future<Void> future = Future.future();
  // publish the service
  discovery.publish(record, ar -> {
    if (ar.succeeded()) {
      registeredRecords.add(record);
      logger.info("Service <" + ar.result().getName() + "> published");
      future.complete();
    } else {
      future.fail(ar.cause());
    }
  });
  return future;
}

publish方法中,咱们调用了服务发现实例discoverypublish方法来将服务发布至服务发现模块。它一样也是一个异步方法,当发布成功时,咱们将此服务记录存储至registeredRecords中,输出日志而后通知future操做已完成。最后返回对应的future

注意,在Vert.x Service Discovery当前版本(3.3.2)的设计中,服务发布者须要在必要时手动注销服务,所以当Verticle结束时,咱们须要将注册的服务都注销掉:

@Override
public void stop(Future<Void> future) throws Exception {
  // In current design, the publisher is responsible for removing the service
  List<Future> futures = new ArrayList<>();
  for (Record record : registeredRecords) {
    Future<Void> unregistrationFuture = Future.future();
    futures.add(unregistrationFuture);
    discovery.unpublish(record.getRegistration(), unregistrationFuture.completer());
  }

  if (futures.isEmpty()) {
    discovery.close();
    future.complete();
  } else {
    CompositeFuture.all(futures)
      .setHandler(ar -> {
        discovery.close();
        if (ar.failed()) {
          future.fail(ar.cause());
        } else {
          future.complete();
        }
      });
  }
}

stop方法中,咱们遍历registeredRecords集合而且尝试注销每个服务,并将异步结果future添加至futures列表中。以后咱们调用CompositeFuture.all(futures)来依次获取每一个异步结果的状态。all方法返回一个组合的Future,当列表中的全部Future都成功赋值时方为成功状态,反之只要有一个异步结果失败,它就为失败状态。所以,咱们给它绑定一个Handler,当全部服务都被注销时,服务发现模块就能够安全地关闭了,不然结束函数会失败。

REST API Verticle

RestAPIVerticle抽象类继承了BaseMicroserviceVerticle抽象类。从名字上就能够看出,它提供了诸多的用于REST API开发的辅助方法。咱们在其中封装了诸如建立服务端、开启Cookie和Session支持,开启心跳检测支持(经过HTTP),各类各样的路由处理封装以及用于权限验证的路由处理器。在以后的章节中咱们将会见到这些方法。

好啦,如今咱们已经了解了整个蓝图应用中的两个基础Verticle,下面是时候探索各个模块了!在探索逻辑组件以前,咱们先来看一下其中最重要的组件之一 —— API Gateway。

API Gateway

咱们把API Gateway的内容单独归为一篇教程,请见:Vert.x 蓝图 - Micro Shop 微服务实战 (API Gateway篇)

Event Bus 服务 - 帐户、网店及商品服务

在Event Bus上进行异步RPC

在以前的 Vert.x Kue 蓝图教程 中咱们已经介绍过Vert.x中的异步RPC(也叫服务代理)了,这里咱们再来回顾一下,而且说一说如何利用服务发现模块更方便地进行异步RPC。

传统的RPC有一个缺点:消费者须要阻塞等待生产者的回应。这是一种阻塞模型,和Vert.x推崇的异步开发模式不相符。而且,传统的RPC不是真正面向失败设计的。还好,Vert.x提供了一种高效的、响应式的RPC —— 异步RPC。咱们不须要等待生产者的回应,而只须要传递一个Handler<AsyncResult<R>>参数给异步方法。这样当收到生产者结果时,对应的Handler就会被调用,很是方便,这与Vert.x的异步开发模式相符。而且,AsyncResult也是面向失败设计的。

Vert.x Service Proxy(服务代理组件)能够自动处理含有@ProxyGen注解的服务接口,生成相应的服务代理类。生成的服务代理类能够帮咱们将数据封装好后发送至Event Bus、从Event Bus接收数据,以及对数据进行编码和解码,所以咱们能够省掉很多代码。咱们须要作的就是遵循@ProxyGen注解的一些限定

好比,这里有一个Event Bus服务接口:

@ProxyGen
public interface MyService {
  @Fluent
  MyService retrieveData(String id, Handler<AsyncResult<JsonObject>> resultHandler);
}

咱们能够经过Vert.x Service Proxy组件生成对应的代理类。而后咱们就能够经过ProxyHelper类的registerService方法将此服务注册至Event Bus上:

MyService myService = MyService.createService(vertx, config);
ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);

有了服务发现组件以后,将服务发布至服务发现层就很是容易了。好比在咱们的蓝图应用中咱们使用封装好的方法:

publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)

OK,如今服务已经成功地发布至服务发现模块。如今咱们就能够经过EventBusService接口的getProxy方法来从服务发现层获取发布的Event Bus服务,而且像调用普通异步方法那样进行异步RPC:

EventBusService.<MyService>getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> {
  if (ar.succeeded()) {
    MyService myService = ar.result();
    myService.retrieveData(...);
  }
});

几个服务模块的通用特性

在咱们的Micro Shop微服务应用中,帐户、网店及商品服务有几个通用的特性及约定。如今咱们来解释一下。

在这三个模块中,每一个模块都包含:

  • 一个Event Bus服务接口。此服务接口定义了对实体存储的各类操做

  • 服务接口的实现

  • REST API Verticle,用于建立服务端并将其发布至服务发现模块

  • Main Verticle,用于部署其它的verticles以及将Event Bus服务和消息源发布至服务发现层

其中,用户帐户服务以及商品服务都使用 MySQL 做为后端存储,而网店服务则以 MongoDB 做为后端存储。这里咱们只挑两个典型的服务介绍如何经过Vert.x操做不一样的数据库:product-microservicestore-microserviceaccount-microservice的实现与product-microservice很是类似,你们能够查阅 GitHub 上的代码。

基于MySQL的商品服务

商品微服务模块提供了商品的操做功能,包括添加、查询(搜索)、删除与更新商品等。其中最重要的是ProductService服务接口以及其实现了。咱们先来看一下此服务接口的定义:

@VertxGen
@ProxyGen
public interface ProductService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "product-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.product";

  /**
   * Initialize the persistence.
   */
  @Fluent
  ProductService initializePersistence(Handler<AsyncResult<Void>> resultHandler);

  /**
   * Add a product to the persistence.
   */
  @Fluent
  ProductService addProduct(Product product, Handler<AsyncResult<Void>> resultHandler);

  /**
   * Retrieve the product with certain `productId`.
   */
  @Fluent
  ProductService retrieveProduct(String productId, Handler<AsyncResult<Product>> resultHandler);

  /**
   * Retrieve the product price with certain `productId`.
   */
  @Fluent
  ProductService retrieveProductPrice(String productId, Handler<AsyncResult<JsonObject>> resultHandler);

  /**
   * Retrieve all products.
   */
  @Fluent
  ProductService retrieveAllProducts(Handler<AsyncResult<List<Product>>> resultHandler);

  /**
   * Retrieve products by page.
   */
  @Fluent
  ProductService retrieveProductsByPage(int page, Handler<AsyncResult<List<Product>>> resultHandler);

  /**
   * Delete a product from the persistence
   */
  @Fluent
  ProductService deleteProduct(String productId, Handler<AsyncResult<Void>> resultHandler);

  /**
   * Delete all products from the persistence
   */
  @Fluent
  ProductService deleteAllProducts(Handler<AsyncResult<Void>> resultHandler);

}

正如咱们以前所提到的那样,这个服务接口是一个Event Bus服务,因此咱们须要给它加上@ProxyGen注解。这些方法都是异步的,所以每一个方法都须要接受一个Handler<AsyncResult<T>>参数。当异步操做完成时,对应的Handler会被调用。注意到咱们还给此接口加了@VertxGen注解。上篇蓝图教程中咱们提到过,这是为了开启多语言支持(polyglot language support)。Vert.x Codegen注解处理器会自动处理含有@VertxGen注解的类,并生成支持的其它语言的代码,如Ruby、JS等。。。这是很是适合微服务架构的,由于不一样的组件能够用不一样的语言进行开发!

每一个方法的含义都在注释中给出了,这里就不解释了。

商品服务接口的实现位于ProductServiceImpl类中。商品信息存储在MySQL中,所以咱们能够经过 Vert.x-JDBC 对数据库进行操做。咱们在 第一篇蓝图教程 中已经详细讲述过Vert.x JDBC的使用细节了,所以这里咱们就不过多地讨论细节了。这里咱们只关注如何减小代码量。由于一般简单数据库操做的过程都是千篇一概的,所以作个封装是颇有必要的。

首先来回顾一下每次数据库操做的过程:

  1. JDBCClient中获取数据库链接SQLConnection,这是一个异步过程

  2. 执行SQL语句,绑定回调Handler

  3. 最后不要忘记关闭数据库链接以释放资源

对于正常的CRUD操做来讲,它们的实现都很类似,所以咱们封装了一个JdbcRepositoryWrapper类来实现这些通用逻辑。它位于io.vertx.blueprint.microservice.common.service包中:

JdbcRepositoryWrapper class structure

咱们提供了如下的封装方法:

  • executeNoResult: 执行含参数的SQL语句 (经过updateWithParams方法)。执行结果是不须要的,所以只须要接受一个 Handler<AsyncResult<Void>> 类型的参数。此方法一般用于insert之类的操做。

  • retrieveOne: 执行含参数的SQL语句,用于获取某一特定实体(经过 queryWithParams方法)。此方法是基于Future的,它返回一个Future<Optional<JsonObject>>类型的异步结果。若是结果集为空,那么返回一个空的Optional monad。若是结果集不为空,则返回第一个结果并用Optional进行包装。

  • retrieveMany: 获取多个实体,返回Future<List<JsonObject>>做为异步结果。

  • retrieveByPage: 与retrieveMany 方法类似,但包含分页逻辑。

  • retrieveAll: similar to retrieveMany method but does not require query parameters as it simply executes statement such as SELECT * FROM xx_table.

  • removeOne and removeAll: remove entity from the database.

固然这与Spring JPA相比的不足之处在于SQL语句得本身写,本身封装也不是很方便。。。考虑到Vert.x JDBC底层也只是使用了Worker线程池包装了原生的JDBC(不是真正的异步),咱们也能够结合Spring Data的相关组件来简化开发。另外,Vert.x JDBC使用C3P0做为默认的数据库链接池,C3P0的性能我想你们应该都懂。。。所以换成性能更优的HikariCP是颇有必要的。

回到JdbcRepositoryWrapper中来。这层封装能够大大地减小代码量。好比,咱们的ProductServiceImpl实现类就能够继承JdbcRepositoryWrapper类,而后利用这些封装好的方法。看个例子 —— retrieveProduct方法的实现:

@Override
public ProductService retrieveProduct(String productId, Handler<AsyncResult<Product>> resultHandler) {
  this.retrieveOne(productId, FETCH_STATEMENT)
    .map(option -> option.map(Product::new).orElse(null))
    .setHandler(resultHandler);
  return this;
}

咱们惟一须要作的只是将结果变换成须要的类型。是否是很方便呢?

固然这不是惟一方法。在下面的章节中,咱们将会讲到一种更Reactive,更Functional的方法 —— 利用Rx版本的Vert.x JDBC。另外,用vertx-sync也是一种不错的选择(相似于async/await)。

好啦!看完服务实现,下面轮到REST API了。咱们来看看RestProductAPIVerticle的实现:

public class RestProductAPIVerticle extends RestAPIVerticle {

  public static final String SERVICE_NAME = "product-rest-api";

  private static final String API_ADD = "/add";
  private static final String API_RETRIEVE = "/:productId";
  private static final String API_RETRIEVE_BY_PAGE = "/products";
  private static final String API_RETRIEVE_PRICE = "/:productId/price";
  private static final String API_RETRIEVE_ALL = "/products";
  private static final String API_DELETE = "/:productId";
  private static final String API_DELETE_ALL = "/all";

  private final ProductService service;

  public RestProductAPIVerticle(ProductService service) {
    this.service = service;
  }

  @Override
  public void start(Future<Void> future) throws Exception {
    super.start();
    final Router router = Router.router(vertx);
    // body handler
    router.route().handler(BodyHandler.create());
    // API route handler
    router.post(API_ADD).handler(this::apiAdd);
    router.get(API_RETRIEVE).handler(this::apiRetrieve);
    router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage);
    router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice);
    router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll);
    router.patch(API_UPDATE).handler(this::apiUpdate);
    router.delete(API_DELETE).handler(this::apiDelete);
    router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll));

    enableHeartbeatCheck(router, config());

    // get HTTP host and port from configuration, or use default value
    String host = config().getString("product.http.address", "0.0.0.0");
    int port = config().getInteger("product.http.port", 8082);

    // create HTTP server and publish REST service
    createHttpServer(router, host, port)
      .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port))
      .setHandler(future.completer());
  }

  private void apiAdd(RoutingContext context) {
    try {
      Product product = new Product(new JsonObject(context.getBodyAsString()));
      service.addProduct(product, resultHandler(context, r -> {
        String result = new JsonObject().put("message", "product_added")
          .put("productId", product.getProductId())
          .encodePrettily();
        context.response().setStatusCode(201)
          .putHeader("content-type", "application/json")
          .end(result);
      }));
    } catch (DecodeException e) {
      badRequest(context, e);
    }
  }

  private void apiRetrieve(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProduct(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrievePrice(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.retrieveProductPrice(productId, resultHandlerNonEmpty(context));
  }

  private void apiRetrieveByPage(RoutingContext context) {
    try {
      String p = context.request().getParam("p");
      int page = p == null ? 1 : Integer.parseInt(p);
      service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily));
    } catch (Exception ex) {
      badRequest(context, ex);
    }
  }

  private void apiRetrieveAll(RoutingContext context) {
    service.retrieveAllProducts(resultHandler(context, Json::encodePrettily));
  }

  private void apiDelete(RoutingContext context) {
    String productId = context.request().getParam("productId");
    service.deleteProduct(productId, deleteResultHandler(context));
  }

  private void apiDeleteAll(RoutingContext context, JsonObject principle) {
    service.deleteAllProducts(deleteResultHandler(context));
  }

}

此Verticle继承了RestAPIVerticle,所以咱们能够利用其中诸多的辅助方法。首先来看一下启动过程,即start方法。首先咱们先调用super.start()来初始化服务发现组件,而后建立Router,绑定BodyHandler以便操做请求正文,而后建立各个API路由并绑定相应的处理函数。接着咱们调用enableHeartbeatCheck方法开启简单的心跳检测支持。最后咱们经过封装好的createHttpServer建立HTTP服务端,并经过publishHttpEndpoint方法将HTTP端点发布至服务发现模块。

其中createHttpServer方法很是简单,咱们只是把vertx.createHttpServer方法变成了基于Future的:

protected Future<Void> createHttpServer(Router router, String host, int port) {
  Future<HttpServer> httpServerFuture = Future.future();
  vertx.createHttpServer()
    .requestHandler(router::accept)
    .listen(port, host, httpServerFuture.completer());
  return httpServerFuture.map(r -> null);
}

至于各个路由处理逻辑如何实现,能够参考 Vert.x Blueprint - Todo Backend Tutorial 获取相信信息。

最后咱们打开此微服务模块中的Main Verticle - ProductVerticle类。正如咱们以前所提到的,它负责发布服务以及部署REST Verticle。咱们来看一下其start方法:

@Override
public void start(Future<Void> future) throws Exception {
  super.start();

  // create the service instance
  ProductService productService = new ProductServiceImpl(vertx, config()); // (1)
  // register the service proxy on event bus
  ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2)
  // publish the service in the discovery infrastructure
  initProductDatabase(productService) // (3)
    .compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4)
    .compose(servicePublished -> deployRestService(productService)) // (5)
    .setHandler(future.completer()); // (6)
}

首先咱们建立一个ProductService服务实例(1),而后经过registerService方法将服务注册至Event Bus(2)。接着咱们初始化数据库表(3),将商品服务发布至服务发现层(4)而后部署REST Verticle(5)。这是一系列的异步方法的组合操做,很溜吧!最后咱们将future.completer()绑定至组合后的Future上,这样当全部异步操做都OK的时候,Future会自动完成。

固然,不要忘记在配置里指定api.name。以前咱们在 API Gateway章节 提到过,API Gateway的反向代理部分就是经过对应REST服务的 api.name 来进行请求分发的。默认状况下api.nameproduct:

{
  "api.name": "product"
}

基于MongoDB的网店服务

网店服务用于网店的操做,如开店、关闭、更新数据。正常状况下,开店都须要人工申请,不过在本蓝图教程中,咱们把这一步简化掉了。网店服务模块的结构和商品服务模块很是类似,因此咱们就不细说了。咱们这里仅仅瞅一瞅如何使用Vert.x Mongo Client。

使用Vert.x Mongo Client很是简单,首先咱们须要建立一个MongoClient实例,过程相似于JDBCClient

private final MongoClient client;

public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) {
  this.client = MongoClient.createNonShared(vertx, config);
}

而后咱们就能够经过它来操做Mongo了。好比咱们想执行存储(save)操做,咱们能够这样写:

@Override
public void saveStore(Store store, Handler<AsyncResult<Void>> resultHandler) {
  client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId())
      .put("name", store.getName())
      .put("description", store.getDescription())
      .put("openTime", store.getOpenTime()),
    ar -> {
      if (ar.succeeded()) {
        resultHandler.handle(Future.succeededFuture());
      } else {
        resultHandler.handle(Future.failedFuture(ar.cause()));
      }
    }
  );
}

这些操做都是异步的,所以你必定很是熟悉这种模式!固然若是不喜欢基于回调的异步模式的话,你也能够选择Rx版本的API~

更多关于Vert.x Mongo Client的使用细节,请参考官方文档

基于Redis的商品库存服务

商品库存服务负责操做商品的库存数量,好比添加库存、减小库存以及获取当前库存数量。库存使用Redis来存储。

与以前的Event Bus服务不一样,咱们这里的商品库存服务是基于Future的,而不是基于回调的。因为服务代理模块不支持处理基于Future的服务接口,所以这里咱们就不用异步RPC了,只发布一个REST API端点,全部的调用都经过REST进行。

首先来看一下InventoryService服务接口:

public interface InventoryService {

  /**
   * Create a new inventory service instance.
   *
   * @param vertx  Vertx instance
   * @param config configuration object
   * @return a new inventory service instance
   */
  static InventoryService createService(Vertx vertx, JsonObject config) {
    return new InventoryServiceImpl(vertx, config);
  }

  /**
   * Increase the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param increase  increase amount
   * @return the asynchronous result of current amount
   */
  Future<Integer> increase(String productId, int increase);

  /**
   * Decrease the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @param decrease  decrease amount
   * @return the asynchronous result of current amount
   */
  Future<Integer> decrease(String productId, int decrease);

  /**
   * Retrieve the inventory amount of a certain product.
   *
   * @param productId the id of the product
   * @return the asynchronous result of current amount
   */
  Future<Integer> retrieveInventoryForProduct(String productId);

}

接口定义很是简单,含义都在注释中给出了。接着咱们再看一下服务的实现类InventoryServiceImpl类。在Redis中,全部的库存数量都被存储在inventory:v1命名空间中,并以商品号productId做为标识。好比商品A123456会被存储至inventory:v1:A123456键值对中。

Vert.x Redis提供了incrbydecrby命令,能够很方便地实现库存增长和减小功能,代码相似。这里咱们只看库存增长功能:

@Override
public Future<Integer> increase(String productId, int increase) {
  Future<Long> future = Future.future();
  client.incrby(PREFIX + productId, increase, future.completer());
  return future.map(Long::intValue);
}

因为库存数量不会很是大,Integer就足够了,所以咱们须要经过Long::intValue方法引用来将Long结果变换成Integer类型的。

retrieveInventoryForProduct方法的实现也很是短小精悍:

@Override
public Future<Integer> retrieveInventoryForProduct(String productId) {
  Future<String> future = Future.future();
  client.get(PREFIX + productId, future.completer());
  return future.map(r -> r == null ? "0" : r)
    .map(Integer::valueOf);
}

咱们经过get命令来获取值。因为结果是String类型的,所以咱们须要自行将其转换为Integer类型。若是结果为空,咱们就认为商品没有库存,返回0

至于REST Verticle(在此模块中也为Main Verticle),其实现模式与前面的大同小异,这里就不展开说了。不要忘记在config.json中指定api.name:

{
  "api.name": "inventory",
  "redis.host": "redis",
  "inventory.http.address": "inventory-microservice",
  "inventory.http.port": 8086
}

事件溯源 - 购物车服务

好了,如今咱们与基础服务模块告一段落了。下面咱们来到了另外一个重要的服务模块 —— 购物车微服务。此模块负责购物车的获取、购物车事件的添加以及结算功能。与传统的实现不一样,这里咱们要介绍一种不一样的开发模式 —— 事件溯源(Event Sourcing)。

解道Event Sourcing

在传统的数据存储模式中,咱们一般直接将数据自己的状态存储至数据库中。这在通常场景中是没有问题的,但有些时候,咱们不只想获取到数据,还想获取数据操做的过程(即此数据是通过怎样的操做生成的),这时候咱们就能够利用事件溯源(Event Sourcing)来解决这个问题。

事件溯源保证了数据状态的变换都以一系列的事件的形式存储在数据库中。因此,咱们不只能够获取每一个变换的事件,并且能够经过过去的事件来组合出过去任意时刻的数据状态!这真是极好的~注意,有一点很重要,咱们不能更改已经保存的事件以及它们的序列 —— 也就是说,事件存储是只能添加而不能删除的,而且须要不可变。是否是感受和数据库事务日志的原理差很少呢?

在微服务架构中,事件溯源模式能够带来如下的好处:

  • 咱们能够从过去的事件序列中组建出任意时刻的数据状态

  • 每一个过去的事件都得以保存,所以这使得补偿事务成为可能

  • 咱们能够从事件存储中获取事件流,而且以异步、响应式风格对其进行变换和处理

  • 事件存储一样能够看成为数据日志

事件存储的选择也须要好好考虑。Apache Kafka很是适合这种场景,在此版本的Micro Shop微服务中,为了简化其实现,咱们简单地使用了MySQL做为事件存储。下个版本咱们将把Kafka整合进来。

注:在实际生产环境中,购物车一般被存储于Session或缓存内。本章节仅为介绍事件溯源而使用事件存储模式。

购物车事件

咱们来看一下表明购物车事件的CartEvent数据对象:

@DataObject(generateConverter = true)
public class CartEvent {

  private Long id;
  private CartEventType cartEventType;
  private String userId;
  private String productId;
  private Integer amount;

  private long createdAt;

  public CartEvent() {
    this.createdAt = System.currentTimeMillis();
  }

  public CartEvent(JsonObject json) {
    CartEventConverter.fromJson(json, this);
  }

  public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) {
    this.cartEventType = cartEventType;
    this.userId = userId;
    this.productId = productId;
    this.amount = amount;
    this.createdAt = System.currentTimeMillis();
  }

  public static CartEvent createCheckoutEvent(String userId) {
    return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0);
  }

  public static CartEvent createClearEvent(String userId) {
    return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0);
  }

  public JsonObject toJson() {
    JsonObject json = new JsonObject();
    CartEventConverter.toJson(this, json);
    return json;
  }

  public static boolean isTerminal(CartEventType eventType) {
    return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT;
  }
}

一个购物车事件存储着事件的类型、发生的时间、操做用户、对应的商品ID以及商品数量变更。在咱们的蓝图应用中,购物车事件一共有四种,它们用CartEventType枚举类表示:

public enum CartEventType {
  ADD_ITEM, // 添加商品至购物车
  REMOVE_ITEM, // 从购物车中删除商品
  CHECKOUT, // 结算并清空
  CLEAR_CART // 清空
}

其中CHECKOUTCLEAR_CART事件是对整个购物车实体进行操做,对应的购物车事件参数相似,所以咱们写了两个静态方法来建立这两种事件。

另外咱们还注意到一个静态方法isTerminal,它用于检测当前购物车事件是否为一个“终结”事件。所谓的“终结”,指的是到此就对整个购物车进行操做(结算或清空)。在从购物车事件流构建出对应的购物车状态的时候,此方法很是有用。

购物车实体

看完了购物车事件,咱们再来看一下购物车。购物车实体用ShoppingCart数据对象表示,它包含着一个商品列表表示当前购物车中的商品即数量:

private List<ProductTuple> productItems = new ArrayList<>();

其中ProductTuple数据对象包含着商品号、商品卖家ID、单价以及当前购物车中次商品的数目amount

为了方便,咱们还在ShoppingCart类中放了一个amountMap用于暂时存储商品数量:

private Map<String, Integer> amountMap = new HashMap<>();

因为它只是暂时存储,咱们不但愿在对应的JSON数据中看到它,因此把它的getter和setter方法都注解上@GenIgnore

在事件溯源模式中,咱们要从一系列的购物车事件构建对应的购物车状态,所以咱们须要一个incorporate方法将每一个购物车事件“合并”至购物车内以变动对应的商品数目:

public ShoppingCart incorporate(CartEvent cartEvent) {
  // 此事件必须为添加或删除事件
  boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM)
    .anyMatch(cartEventType ->
      cartEvent.getCartEventType().equals(cartEventType));

  if (ifValid) {
    amountMap.put(cartEvent.getProductId(),
      amountMap.getOrDefault(cartEvent.getProductId(), 0) +
        (cartEvent.getAmount() * (cartEvent.getCartEventType()
          .equals(CartEventType.ADD_ITEM) ? 1 : -1)));
  }

  return this;
}

实现却是比较简单,咱们首先来检查要合并的事件是否是添加商品或移除商品事件,若是是的话,咱们就根据事件类型以及对应的数量变动来改变当前购物车中该商品的数量(amountMap)。

使用Rx版本的Vert.x JDBC

咱们如今已经了解购物车微服务中的实体类了,下面该看看购物车事件存储服务了。

以前用callback-based API写Vert.x JDBC操做总感受心累,还好Vert.x支持与RxJava进行整合,而且几乎每一个Vert.x组件都有对应的Rx版本!是否是瞬间感受整我的都变得Reactive了呢~(⊙o⊙) 这里咱们就来使用Rx版本的Vert.x JDBC来写咱们的购物车事件存储服务。也就是说,里面全部的异步方法都将是基于Observable的,颇有FRP风格!

咱们首先定义了一个简单的CRUD接口SimpleCrudDataSource

public interface SimpleCrudDataSource<T, ID> {

  Observable<Void> save(T entity);

  Observable<T> retrieveOne(ID id);

  Observable<Void> delete(ID id);

}

接着定义了一个CartEventDataSource接口,定义了购物车事件获取的相关方法:

public interface CartEventDataSource extends SimpleCrudDataSource<CartEvent, Long> {

  Observable<CartEvent> streamByUser(String userId);

}

能够看到这个接口只有一个方法 —— streamByUser方法会返回某一用户对应的购物车事件流,这样后面咱们就能够对其进行流式变换操做了!

下面咱们来看一下服务的实现类CartEventDataSourceImpl。首先是save方法,它将一个事件存储至事件数据库中:

@Override
public Observable<Void> save(CartEvent cartEvent) {
  JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
    .add(cartEvent.getUserId())
    .add(cartEvent.getProductId())
    .add(cartEvent.getAmount())
    .add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
  return client.getConnectionObservable()
    .flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params))
    .map(r -> null);
}

看看咱们的代码,在对比对比普通的callback-based的Vert.x JDBC,是否是更加简洁,更加Reactive呢?咱们能够很是简单地经过getConnectionObservable方法获取数据库链接,而后组合updateWithParamsObservable方法执行对应的含参SQL语句。只须要两行有木有!而若是用callback-based的风格的话,你只能这么写:

client.getConnection(ar -> {
  if (ar.succeeded) {
    SQLConnection connection = ar.result();
    connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> {
      // ...
    })
  } else {
    resultHandler.handle(Future.failedFuture(ar.cause()));
  }
})

所以,使用RxJava是很是愉快的一件事!固然vertx-sync也是一个不错的选择。

固然,不要忘记返回的Observablecold 的,所以只有在它被subscribe的时候,数据才会被发射。

不过话说回来了,Vert.x JDBC底层本质仍是阻塞型的调用,要实现真正的异步数据库操做,咱们能够利用 Vert.x MySQL / PostgreSQL Client 这个组件,底层使用Scala写的异步数据库操做库,不过目前还不是很稳定,你们能够本身尝尝鲜。

下面咱们再来看一下retrieveOne方法,它从数据存储中获取特定ID的事件:

@Override
public Observable<CartEvent> retrieveOne(Long id) {
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id)))
    .map(ResultSet::getRows)
    .filter(list -> !list.isEmpty())
    .map(res -> res.get(0))
    .map(this::wrapCartEvent);
}

很是简洁明了,就像以前咱们的基于Future的范式类似,所以这里就再也不详细解释了~

下面咱们来看一下里面最重要的方法 —— streamByUser方法:

@Override
public Observable<CartEvent> streamByUser(String userId) {
  JsonArray params = new JsonArray().add(userId).add(userId);
  return client.getConnectionObservable()
    .flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params))
    .map(ResultSet::getRows)
    .flatMapIterable(item -> item) // list merge into observable
    .map(this::wrapCartEvent);
}

其核心在于它的SQL语句STREAM_STATEMENT

SELECT * FROM cart_event c
WHERE c.user_id = ? AND c.created_at > coalesce(
    (SELECT created_at FROM cart_event
       WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART")
     ORDER BY cart_event.created_at DESC
     LIMIT 1),
    0)
ORDER BY c.created_at ASC;

此SQL语句执行时会获取与当前购物车相关的全部购物车事件。注意到咱们有许多用户,每一个用户可能会有许多购物车事件,它们属于不一样时间的购物车,那么如何来获取相关的事件呢?方法是 —— 首先咱们获取最近一次“终结”事件发生对应的时间,那么当前购物车相关的购物车事件就是在此终结事件发生后全部的购物车事件。

明白了这一点,咱们再回到streamByUser方法中来。既然此方法是从数据库中获取一个事件列表,那么为何此方法返回Observable<CartEvent>而不是Observable<List<CartEvent>>呢?咱们来看看其中的奥秘 —— flatMapIterable算子,它将一个序列变换为一串数据流。因此,这里的Observable<CartEvent>与Vert.x中的Future以及Java 8中的CompletableFuture就有些不一样了。CompletableFuture更像是RxJava中的Single,它仅仅发送一个值或一个错误信息,而Observable自己则就像是一个数据流,数据源源不断地从发布者流向订阅者。以前retrieveOnesave方法中返回的Observable的使用更像是一个Single,可是在streamByUser方法中,Observable是真真正正的事件数据流。咱们将会在购物车服务ShoppingCartService中处理事件流。

哇!如今你必定又被Rx这种函数响应式风格所吸引了~在下面的部分中,咱们将探索购物车服务及其实现,基于Future,一样很是Reactive!

根据购物车事件序列构建对应的购物车状态

咱们首先来看一下ShoppingCartService —— 购物车服务接口,它也是一个Event Bus服务:

@VertxGen
@ProxyGen
public interface ShoppingCartService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-cart-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart";

  @Fluent
  ShoppingCartService addCartEvent(CartEvent event, Handler<AsyncResult<Void>> resultHandler);

  @Fluent
  ShoppingCartService getShoppingCart(String userId, Handler<AsyncResult<ShoppingCart>> resultHandler);

}

这里咱们定义了两个方法:addCartEvent用于将购物车事件存储至事件存储中;getShoppingCart方法用于获取某个用户当前购物车的状态。

下面咱们来看一下其实现类 —— ShoppingCartServiceImpl。首先是addCartEvent方法,它很是简单:

@Override
public ShoppingCartService addCartEvent(CartEvent event, Handler<AsyncResult<Void>> resultHandler) {
  Future<Void> future = Future.future();
  repository.save(event).toSingle().subscribe(future::complete, future::fail);
  future.setHandler(resultHand
  return this;
}

正如以前咱们所提到的,这里save方法返回的Observable其实更像个Single,所以咱们将其经过toSingle方法变换为Single,而后经过subscribe(future::complete, future::fail)将其转化为Future以便于给其绑定一个Handler<AsyncResult<Void>>类型的处理函数。

getShoppingCart方法的逻辑位于aggregateCartEvents方法中,此方法很是重要,而且是基于Future的。咱们先来看一下代码:

private Future<ShoppingCart> aggregateCartEvents(String userId) {
  Future<ShoppingCart> future = Future.future();
  // aggregate cart events into raw shopping cart
  repository.streamByUser(userId) // (1)
    .takeWhile(cartEvent -> !CartEvent.isTerminal(cartEvent.getCartEventType())) // (2)
    .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3)
    .toSingle()
    .subscribe(future::complete, future::fail); // (4)

  return future.compose(cart ->
    getProductService() // (5)
      .compose(service -> prepareProduct(service, cart)) // (6) prepare product data
      .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items
  );
}

咱们来详细地解释一下。首先咱们先建立个Future,而后先经过repository.streamByUser(userId)方法获取事件流(1),而后咱们使用takeWhile算子来获取全部的ADD_ITEMREMOVE_ITEM类型的事件(2)。takeWhile算子在断定条件变为假时中止发射新的数据,所以当事件流遇到一个终结事件时,新的事件就再也不往外发送了,以前的事件将会继续被传递。

下面就是产生购物车状态的过程了!咱们经过reduce算子将事件流来“聚合”成购物车实体(3)。这个过程能够总结为如下几步:首先咱们先建立一个空的购物车,而后依次将各个购物车事件“合并”至购物车实体中。最后聚合而成的购物车实体应该包含一个完整的amountMap

如今此Observable已经包含了咱们想要的初始状态的购物车了。咱们将其转化为Single而后经过subscribe(future::complete, future::fail)转化为Future(4)。

如今咱们须要更多的信息以组件一个完整的购物车,因此咱们首先组合getProductService异步方法来从服务发现层获取商品服务(5),而后经过prepareProduct方法来获取须要的商品数据(6),最后经过generateCurrentCartFromStream异步方法组合出完整的购物车实体(7)。这里面包含了好几个组合过程,咱们来一一解释。

首先来看getProductService异步方法。它用于从服务发现层获取商品服务,而后返回其异步结果:

private Future<ProductService> getProductService() {
  Future<ProductService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ProductService.SERVICE_NAME),
    future.completer());
  return future;
}

如今咱们获取到商品服务了,那么下一步天然是获取须要的商品数据了。这个过程经过prepareProduct异步方法实现:

private Future<List<Product>> prepareProduct(ProductService service, ShoppingCart cart) {
  List<Future<Product>> futures = cart.getAmountMap().keySet() // (1)
    .stream()
    .map(productId -> {
      Future<Product> future = Future.future();
      service.retrieveProduct(productId, future.completer());
      return future; // (2)
    })
    .collect(Collectors.toList()); // (3)
  return Functional.sequenceFuture(futures); // (4)
}

在此实现中,首先咱们从amountMap中获取购物车中全部商品的ID(1),而后咱们根据每一个ID异步调用商品服务的retrieveProduct方法而且以Future包装(2),而后将此流转化为List<Future<Product>>类型的列表(3)。咱们这里想得到的是全部商品的异步结果,即Future<List<Product>>,那么如何转换呢?这里我写了一个辅助函数sequenceFuture来实现这样的变换,它位于io.vertx.blueprint.microservice.common.functional包下的Functional类中:

public static <R> Future<List<R>> sequenceFuture(List<Future<R>> futures) {
  return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1)
    .map(v -> futures.stream()
        .map(Future::result) // (2)
        .collect(Collectors.toList()) // (3)
    );
}

此方法对于想将一个Future序列变换成单个Future的状况很是有用。这里咱们首先调用CompositeFutureImpl类的all方法(1),它返回一个组合的Future,当且仅当序列中全部的Future都成功完成时,它为成功状态,不然为失败状态。下面咱们就对此组合Future作变换:获取每一个Future对应的结果(由于all方法已经强制获取全部结果),而后归结成列表(3)。

回到以前的组合中来!如今咱们获得了咱们须要的商品信息列表List<Product>,接下来就根据这些信息来构建完整的购物车实体了!咱们来看一下generateCurrentCartFromStream方法的实现:

private Future<ShoppingCart> generateCurrentCartFromStream(ShoppingCart rawCart, List<Product> productList) {
  Future<ShoppingCart> future = Future.future();
  // check if any of the product is invalid
  if (productList.stream().anyMatch(e -> e == null)) { // (1)
    future.fail("Error when retrieve products: empty");
    return future;
  }
  // construct the product items
  List<ProductTuple> currentItems = rawCart.getAmountMap().entrySet() // (2)
    .stream()
    .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3)
      item.getValue())) // (4) amount value
    .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero
    .collect(Collectors.toList());

  ShoppingCart cart = rawCart.setProductItems(currentItems); // (6)
  return Future.succeededFuture(cart); // (7)
}

看起来很是混乱的样子。。。不要担忧,咱们慢慢来~注意这个方法自己不是异步的,但咱们须要表示此方法成功或失败两种状态(即AsyncResult),因此此方法仍然返回Future。首先咱们建立一个Future,而后经过anyMatch方法检查商品列表是否合法(1)。若不合法,返回一个失败的Future;若合法,咱们对每一个商品依次构建出对应的ProductTuple。在(3)中,咱们经过这个构造函数来构建ProductTuple:

public ProductTuple(Product product, Integer amount) {
  this.productId = product.getProductId();
  this.sellerId = product.getSellerId();
  this.price = product.getPrice();
  this.amount = amount;
}

其中第一个参数是对应的商品实体。为了从列表中获取对应的商品实体,咱们写了一个getProductFromStream方法:

private Product getProductFromStream(List<Product> productList, String productId) {
  return productList.stream()
    .filter(product -> product.getProductId().equals(productId))
    .findFirst()
    .get();
}

当每一个商品的ProductTuple都构建完毕的时候,咱们就能够将列表赋值给对应的购物车实体了(6),而且返回购物车实体结果(7)。如今咱们终于整合出一个完整的购物车了!

结算 - 根据购物车产生订单

如今咱们已经选好了本身喜好的商品,把购物车填的慢慢当当了,下面是时候进行结算了!咱们这里一样定义了一个结算服务接口CheckoutService,它只包含一个特定的方法:checkout

@VertxGen
@ProxyGen
public interface CheckoutService {

  /**
   * The name of the event bus service.
   */
  String SERVICE_NAME = "shopping-checkout-eb-service";

  /**
   * The address on which the service is published.
   */
  String SERVICE_ADDRESS = "service.shopping.cart.checkout";

  /**
   * Order event source address.
   */
  String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order";

  /**
   * Create a shopping checkout service instance
   */
  static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) {
    return new CheckoutServiceImpl(vertx, discovery);
  }

  void checkout(String userId, Handler<AsyncResult<CheckoutResult>> handler);

}

接口很是简单,下面咱们来看其实现 —— CheckoutServiceImpl类。尽管接口只包含一个checkout方法,但咱们都知道结算过程可不简单。。。它包含库存检测、付款(这里暂时省掉了)以及生成订单的逻辑。咱们先来看看checkout方法的源码:

@Override
public void checkout(String userId, Handler<AsyncResult<CheckoutResult>> resultHandler) {
  if (userId == null) { // (1)
    resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user")));
    return;
  }
  Future<ShoppingCart> cartFuture = getCurrentCart(userId); // (2)
  Future<CheckoutResult> orderFuture = cartFuture.compose(cart ->
    checkAvailableInventory(cart).compose(checkResult -> { // (3)
      if (checkResult.getBoolean("res")) { // (3)
        double totalPrice = calculateTotalPrice(cart); // (4)
        // 建立订单实体
        Order order = new Order().setBuyerId(userId) // (5)
          .setPayId("TEST")
          .setProducts(cart.getProductItems())
          .setTotalPrice(totalPrice);
        // 设置订单流水号,而后向订单组件发送订单并等待回应
        return retrieveCounter("order") // (6)
          .compose(id -> sendOrderAwaitResult(order.setOrderId(id))) // (7)
          .compose(result -> saveCheckoutEvent(userId).map(v -> result)); // (8)
      } else {
        // 库存不足,结算失败
        return Future.succeededFuture(new CheckoutResult()
          .setMessage(checkResult.getString("message"))); // (9)
      }
    })
  );

  orderFuture.setHandler(resultHandler); // (10)
}

好吧,咱们又看到了大量的compose。。。是的,这里咱们又组合了不少基于Future的异步方法。首先咱们先来判断给定的userId是否合法(1),若是不合法的话马上让Future失败掉;若用户合法,咱们就经过getCurrentCart方法获取给定用户的当前购物车状态(2)。这个过程是异步的,因此此方法返回Future<ShoppingCart>类型的异步结果:

private Future<ShoppingCart> getCurrentCart(String userId) {
  Future<ShoppingCartService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ShoppingCartService.SERVICE_NAME),
    future.completer());
  return future.compose(service -> {
    Future<ShoppingCart> cartFuture = Future.future();
    service.getShoppingCart(userId, cartFuture.completer());
    return cartFuture.compose(c -> {
      if (c == null || c.isEmpty())
        return Future.failedFuture(new IllegalStateException("Invalid shopping cart"));
      else
        return Future.succeededFuture(c);
    });
  });
}

getCurrentCart方法中,咱们经过EventBusService接口的getProxy方法从服务发现层获取购物车服务;而后咱们调用购物车服务的getShoppingCart方法获取购物车。这里咱们还须要检验购物车是否为空,购物车不为空的话就返回异步结果,为空的话结算显然不合适,返回不合法错误。

你可能已经注意到了checkout方法会产生一个CheckoutResult类型的异步结果,这表明结算的结果:

@DataObject(generateConverter = true)
public class CheckoutResult {
  private String message; // 结算结果信息
  private Order order; // 若成功,此项为订单实体
}

回到咱们的checkout方法中来。如今咱们要从获取到的cartFuture进行一系列的操做,最终获得Future<CheckoutResult>类型的结算结果。那么进行哪些操做呢?首先咱们组合checkAvailableInventory异步方法,它用于获取商品库存检测数据,后面咱们讲详细讨论其实现。接着咱们检查获取到的商品库存数据,判断是否全部库存都充足(3)。若是不充足的话,咱们直接返回一个CheckoutResult并标记库存不足的信息(9)。若是库存充足,咱们就计算出此订单的总价(4)而后生成订单Order(5)。订单用Order数据对象表示,它包含如下信息:

  • 买家ID

  • 每一个所选商品的数量、单价以及卖家ID

  • 商品总价

生成初始订单以后,咱们须要从计数器服务中生成该订单的流水号(6),接着经过Event Bus向订单组件中发送订单数据,而且等待结帐结果CheckoutResult(7)。这些都作完之后,咱们向事件存储中添加购物车结算事件(8)。最后咱们向最终获得的orderFuture绑定resultHandler处理函数(10)。当结帐结果回复过来的时候,处理函数将会被调用。

下面咱们来解释一下上面出现过的一些异步过程。首先是最早提到的用于准备库存数据的checkAvailableInventory方法:

private Future<JsonObject> checkAvailableInventory(ShoppingCart cart) {
  Future<List<JsonObject>> allInventories = getInventoryEndpoint().compose(client -> { // (1)
    List<Future<JsonObject>> futures = cart.getProductItems() // (2)
      .stream()
      .map(product -> getInventory(product, client)) // (3)
      .collect(Collectors.toList());
    return Functional.sequenceFuture(futures); // (4)
  });
  return allInventories.map(inventories -> {
    JsonObject result = new JsonObject();
    // get the list of products whose inventory is lower than the demand amount
    List<JsonObject> insufficient = inventories.stream()
      .filter(item -> item.getInteger("inventory") - item.getInteger("amount") < 0) // (5)
      .collect(Collectors.toList());
    // insufficient inventory exists
    if (insufficient.size() > 0) {
      String insufficientList = insufficient.stream()
        .map(item -> item.getString("id"))
        .collect(Collectors.joining(", ")); // (6)
      result.put("message", String.format("Insufficient inventory available for product %s.", insufficientList))
        .put("res", false); // (7)
    } else {
      result.put("res", true); // (8)
    }
    return result;
  });
}

有点复杂呢。。。首先咱们经过getInventoryEndpoint方法来从服务发现层获取商品库存组件对应的REST端点(1)。这是对HttpEndpoint接口的getClient方法的简单封装:

private Future<HttpClient> getInventoryEndpoint() {
  Future<HttpClient> future = Future.future();
  HttpEndpoint.getClient(discovery,
    new JsonObject().put("name", "inventory-rest-api"), // service name
    future.completer());
  return future;
}

接着咱们又要组合另外一个Future。在这个过程当中,咱们从购物车中获取商品列表(2),而后将每一个ProductTuple都变换成对应的商品ID以及对应库存(3)。以前咱们已经获取到库存服务REST端点对应的HttpClient了,下面咱们就能够经过客户端来获取每一个商品的库存。获取库存的过程是在getInventory方法中实现的:

private Future<JsonObject> getInventory(ProductTuple product, HttpClient client) {
  Future<Integer> future = Future.future(); // (A)
  client.get("/" + product.getProductId(), response -> { // (B)
    if (response.statusCode() == 200) { // (C)
      response.bodyHandler(buffer -> {
        try {
          int inventory = Integer.valueOf(buffer.toString()); // (D)
          future.complete(inventory);
        } catch (NumberFormatException ex) {
          future.fail(ex);
        }
      });
    } else {
      future.fail("not_found:" + product.getProductId()); // (E)
    }
  })
    .exceptionHandler(future::fail)
    .end();
  return future.map(inv -> new JsonObject()
    .put("id", product.getProductId())
    .put("inventory", inv)
    .put("amount", product.getAmount())); // (F)
}

过程很是简洁明了。首先咱们先建立一个Future<Integer>来保存库存数量异步结果(A)。而后咱们调用clientget方法来发送获取库存的请求(B)。在对回应的处理逻辑responseHandler中,若是结果状态为 200 OK(C),咱们就能够经过bodyHandler来解析回应正文并将其转换为Integer类型(D)。这几个过程都完成后,对应的future会被赋值为对应的库存数量;若是结果状态不正常(好比400或404),那么咱们就能够认为获取失败,将future置为失败状态(E)。

只有库存数量是不够的(由于咱们不知道库存对应哪一个商品),所以为了方便起见,咱们将库存数量和对应的商品号以及购物车中选定的数量都塞进一个JsonObject中,最后将Future<Integer>变换为Future<JsonObject>类型的结果(F)。

再回到checkAvailableInventory方法中来。在(3)过程以后,咱们有的到了一个Future列表,因此咱们再次调用Functional.sequenceFuture方法将其变换成Future<List<JsonObject>>类型(4)。如今咱们能够来检查每一个库存是否都充足了!咱们建立了一个列表insufficient专门存储库存不足的商品,这是经过filter算子实现的(5)。若是库存不足的商品列表不为空,那就是说有商品库存不足,因此咱们须要获取每一个库存不足的商品ID并把其概括成一串信息。这里咱们经过collect算子实现的:collect(Collectors.joining(", ")) (6)。这个小trick仍是很好使的,好比列表[TST-0001, TST-0002, BK-16623]会被归结成 "TST-0001, TST-0002, BK-16623" 这样的字符串。生成库存不足商品的信息之后,咱们将此信息置于JsonObject中。同时,咱们在此JsonObject中用一个bool型的res来表示商品库存是否充足,所以这里咱们将res的值设为false(7)。

若是以前得到的库存不足的商品列表为空,那么就表明全部商品余额充足,咱们就将res的值设为true(8),最后返回异步结果future

再回到那一串组合中。咱们接着经过calculateTotalPrice方法来计算购物车中商品的总价,以便为订单生成提供信息。这个过程很简单:

return cart.getProductItems().stream()
  .map(p -> p.getAmount() * p.getPrice()) // join by product id
  .reduce(0.0d, (a, b) -> a + b);

正如以前在checkout方法中提到的那样,在建立原始订单以后,咱们会对结果进行三个组合:retrieveCounter -> sendOrderAwaitResult -> saveCheckoutEvent。咱们来看一下。

咱们首先从缓存组件的计数器服务中生成当前订单的流水号:

private Future<Long> retrieveCounter(String key) {
  Future<Long> future = Future.future();
  EventBusService.<CounterService>getProxy(discovery,
    new JsonObject().put("name", "counter-eb-service"),
    ar -> {
      if (ar.succeeded()) {
        CounterService service = ar.result();
        service.addThenRetrieve(key, future.completer());
      } else {
        future.fail(ar.cause());
      }
    });
  return future;
}

固然你也能够直接用数据库自带的AUTO INCREMENT计数器,不过当有多台数据库服务器的时候,咱们须要保证计数器在集群内的一致性。

接着咱们经过saveCheckoutEvent方法存储购物车结算事件,其实现和getCurrentCart方法很是相似。它们都是先从服务发现层中获取购物车服务,而后再异步调用对应的逻辑:

private Future<Void> saveCheckoutEvent(String userId) {
  Future<ShoppingCartService> future = Future.future();
  EventBusService.getProxy(discovery,
    new JsonObject().put("name", ShoppingCartService.SERVICE_NAME),
    future.completer());
  return future.compose(service -> {
    Future<Void> resFuture = Future.future();
    CartEvent event = CartEvent.createCheckoutEvent(userId);
    service.addCartEvent(event, resFuture.completer());
    return resFuture;
  });
}

向订单模块发送订单

生成订单流水号之后,如今咱们的订单实体已是完整的了,能够向下层订单服务组件发送了。咱们来看一下其实现 —— sendOrderAwaitResult方法:

private Future<CheckoutResult> sendOrderAwaitResult(Order order) {
  Future<CheckoutResult> future = Future.future();
  vertx.eventBus().send(CheckoutService.ORDER_EVENT_ADDRESS, order.toJson(), reply -> {
    if (reply.succeeded()) {
      future.complete(new CheckoutResult((JsonObject) reply.result().body()));
    } else {
      future.fail(reply.cause());
    }
  });
  return future;
}

咱们将订单实体发送至Event Bus上的一个特定地址中,这样在订单服务组件中,订单服务就可以从Event Bus上获取发送的订单并对其进行处理和分发。注意到咱们调用的send函数同时还接受一个Handler<AsyncResult<Message<T>>>类型的参数,这意味着咱们须要等待消息接收者发送回的回复消息。这实际上是一种相似于 请求/回复模式 的消息模式。若是咱们成功地接收到回复消息,咱们就将其转化为订单结果CheckoutResult而且给future赋值;若是咱们收到了失败的消息,或者接受消息超时,咱们就将future标记为失败。

好啦!在经历了一系列的“组合”过程以后,咱们终于完成了对checkout方法的探索。是否是感受很Reactive呢?

因为订单服务并不知道咱们发送的地址,咱们须要向服务发现层中发布一个 消息源,这里的消息源其实就是咱们将订单发送的位置。订单就能够经过服务发现层获取对应的消费者MessageConsumer,而后今后处接受订单。咱们将会在CartVerticle中发布此消息源,不过在看CartVerticle的实现以前,咱们先来瞥一眼购物车服务的REST Verticle。

购物车服务REST API

在购物车服务相关的REST Verticle里有三个主要的API:

  • GET /cart - 获取当前用户的购物车状态

  • POST /events - 向购物车事件存储中添加一个新的与当前用户相关的购物车事件

  • POST /checkout - 发出购物车结算请求

注意这三个API都须要权限(登陆用户),所以它们的路由处理函数都包装着requireLogin方法。这一点已经在以前的API Gateway章节中提到过:

// api route handler
router.post(API_CHECKOUT).handler(context -> requireLogin(context, this::apiCheckout));
router.post(API_ADD_CART_EVENT).handler(context -> requireLogin(context, this::apiAddCartEvent));
router.get(API_GET_CART).handler(context -> requireLogin(context, this::apiGetCart));

它们的路由函数实现却是很是简单,咱们这里只看一个apiAddCartEvent方法:

private void apiAddCartEvent(RoutingContext context, JsonObject principal) {
  String userId = Optional.ofNullable(principal.getString("userId"))
    .orElse(TEST_USER); // (1)
  CartEvent cartEvent = new CartEvent(context.getBodyAsJson()); // (2)
  if (validateEvent(cartEvent, userId)) {
    shoppingCartService.addCartEvent(cartEvent, resultVoidHandler(context, 201)); // (3)
  } else {
    context.fail(400); // (4)
  }
}

首先咱们从当前的用户凭证principal中获取用户ID。若是当前用户凭证中获取不到ID,那么咱们就暂时用TEST_USER来替代(1)。而后咱们根据请求正文来建立购物车事件CartEvent(2)。咱们同时须要验证购物车事件中的用户与当前做用域内的用户是否相符。若相符,则调用服务的addCartEvent方法将事件添加至事件存储中,并在成功时返回 201 状态(3)。若是请求正文中的购物车事件不合法,咱们就返回 400 Bad Request* 状态(4)。

Cart Verticle

CartVerticle是购物车服务组件的Main Verticle,用于发布各类服务。这里咱们会发布三个服务:

  • shopping-checkout-eb-service: 结算服务,这是一个 Event Bus 服务

  • shopping-cart-eb-service: 购物车服务,这是一个 Event Bus 服务

  • shopping-order-message-source: 发送订单的消息源,这是一个 消息源服务

同时咱们的CartVerticle也负责部署RestShoppingAPIVerticle。注意不要忘掉设置api.name:

{
  "api.name": "cart"
}

这是购物车部分的UI:

Cart Page

订单服务

好啦!如今咱们已经提交告终算请求,在底层订单已经发送至订单微服务组件中了。因此下一步天然就是订单服务的责任了 —— 分发订单以及处理订单。在当前版本的Micro Shop实现中,咱们仅仅将订单存储至数据库中并变动对应的商品库存数额。在正常的生产环境中,咱们一般会将订单push到消息队列中,而且在下层服务中从消息队列中pull订单并进行处理。

订单存储服务的实现与以前太相似了,所以这里就不讲OrderService及其实现的细节了。你们能够自行查看相关代码

咱们的订单处理逻辑写在RawOrderDispatcher这个verticle中,下面咱们就来看一下。

消费消息源发送来的数据

首先咱们须要从消息源中根据服务名称获取消息消费者,而后从消费者处获取发送来的订单。这能够经过MessageSource接口的getConsumer方法实现:

@Override
public void start(Future<Void> future) throws Exception {
  super.start();
  MessageSource.<JsonObject>getConsumer(discovery,
    new JsonObject().put("name", "shopping-order-message-source"),
    ar -> {
      if (ar.succeeded()) {
        MessageConsumer<JsonObject> orderConsumer = ar.result();
        orderConsumer.handler(message -> {
          Order wrappedOrder = wrapRawOrder(message.body());
          dispatchOrder(wrappedOrder, message);
        });
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
}

获取到对应的MessageConsumer之后,咱们就能够经过handler方法给其绑定一个Handler<Message<T>>类型的处理函数,在此处理函数中咱们就能够对获取的消息进行各类操做。这里咱们的message body是JsonObject类型的,因此咱们首先将其转化为订单实体,而后就能够对其进行分发和处理了。对应的逻辑在dispatchOrder方法中。

“处理”订单

咱们来看一下dispatchOrder方法中的简单的“分发处理”逻辑:

private void dispatchOrder(Order order, Message<JsonObject> sender) {
  Future<Void> orderCreateFuture = Future.future();
  orderService.createOrder(order, orderCreateFuture.completer()); // (1)
  orderCreateFuture
    .compose(orderCreated -> applyInventoryChanges(order)) // (2)
    .setHandler(ar -> {
      if (ar.succeeded()) {
        CheckoutResult result = new CheckoutResult("checkout_success", order); // (3)
        sender.reply(result.toJson()); // (4)
        publishLogEvent("checkout", result.toJson(), true); // (5)
      } else {
        sender.fail(5000, ar.cause().getMessage()); // (6)
        ar.cause().printStackTrace();
      }
    });
}

首先咱们先建立一个Future表明向数据库中添加订单的异步结果。而后咱们调用订单服务的createOrder方法将订单存储至数据库中(1)。能够看到咱们给此方法传递的处理函数是orderCreateFuture.completer(),这样当添加操做结束时,对应的Future就会被赋值。下一步咱们组合一个异步方法 —— applyInventoryChanges方法,用于变动商品库存数量(2)。若是这两个过程都成功完成的话,咱们就建立一个表明结算成功的CheckoutResult实体(3),而后调用reply方法向消息发送者回复结算结果(4)。以后咱们向Event Bus发送结算事件来通知日志组件记录日志(5)。若是其中有过程失败的话,咱们须要对消息发送者sender调用fail方法来通知操做失败(6)。

很简单吧?下面咱们来看一下applyInventoryChanges方法的实现,看看如何变动商品库存数量:

private Future<Void> applyInventoryChanges(Order order) {
  Future<Void> future = Future.future();
  // 从服务发现层获取REST端点
  Future<HttpClient> clientFuture = Future.future();
  HttpEndpoint.getClient(discovery,
    new JsonObject().put("name", "inventory-rest-api"), // 服务名称
    clientFuture.completer());
  // 经过调用REST API来变动对应的库存
  return clientFuture.compose(client -> {
    List<Future> futures = order.getProducts()
      .stream()
      .map(item -> { // 变换成对应的异步结果
        Future<Void> resultFuture = Future.future();
        String url = String.format("/%s/decrease?n=%d", item.getProductId(), item.getAmount());
        client.put(url, response -> {
          if (response.statusCode() == 200) {
            resultFuture.complete();
          } else {
            resultFuture.fail(response.statusMessage());
          }
        })
          .exceptionHandler(resultFuture::fail)
          .end();
        return resultFuture;
      })
      .collect(Collectors.toList());
    // 每一个Future必须都success,生成的组合Future才会success
    CompositeFuture.all(futures).setHandler(ar -> {
      if (ar.succeeded()) {
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });
    return future;
  });
}

相信你必定不会对此方法的实现感到陌生,由于它和咱们以前在购物车服务中讲的getInventory方法很是相似。咱们首先获取库存组件对应的HTTP客户端,接着对订单中每一个商品,根据其数额来调用REST API减小对应的库存。调用REST API获取结果的过程是异步的,所以这里咱们又获得了一个List<Future>。可是这里咱们并不须要每一个Future的实际结果。咱们只须要每一个Future的状态,所以这里仅需调用CompositeFuture.all方法获取全部Future的组合Future

至于组件中的OrderVerticle,它只作了三件微小的事情:发布订单服务、部署用于订单分发处理的RawOrderDispatcher以及部署REST Verticle。

Micro Shop SPA整合

在咱们的Micro Shop项目中,咱们提供了一个用Angular.js写的简单的SPA前端页面。那么问题来了,如何将其整合至咱们的微服务中?

注意:当前版本中,为了方便起见,咱们将SPA部分整合进了api-gateway模块中。在生产环境下UI部分一般要单独部署。

有了Vert.x Web的魔力,咱们只须要作的是配置一下路由,让其能够处理静态资源便可!只须要一行:

router.route("/*").handler(StaticHandler.create());

默认状况下静态资源映射的目录是webroot目录,固然你也能够在建立StaticHandler的时候来配置映射目录。

监控仪表板与统计数据

监控仪表板(Monitor Dashboard)一样也是一个SPA前端应用。在本章节中咱们会涉及到如下内容:

  • 如何配置SockJS - EventBus bridge

  • 如何在浏览器中接受来自Event Bus的信息

  • 如何利用 Vert.x Dropwizard Metrics 来获取Vert.x组件的统计数据

SockJS - Event Bus Bridge

不少时候咱们想要在浏览器中接收来自Event Bus的消息并进行处理。听起来很神奇吧~并且你应该可以想象到,Vert.x支持这么作!Vert.x提供了 SockJS - Event Bus Bridge 来支持服务的和客户端(一般是浏览器端)经过Event Bus进行通讯。

为了开启SockJS - Event Bus Bridge支持,咱们须要配置SockJSHandler以及对应的路由器:

// event bus bridge
SockJSHandler sockJSHandler = SockJSHandler.create(vertx); // (1)
BridgeOptions options = new BridgeOptions()
  .addOutboundPermitted(new PermittedOptions().setAddress("microservice.monitor.metrics")) // (2)
  .addOutboundPermitted(new PermittedOptions().setAddress("events.log"));

sockJSHandler.bridge(options); // (3)
router.route("/eventbus/*").handler(sockJSHandler); // (4)

首先咱们建立一个SockJSHandler (1),它用于处理Event Bus信息。默认状况下,为了安全起见,Vert.x不容许任何消息经过Event Bus传输至浏览器端,所以咱们须要对其进行配置。咱们能够建立一个BridgeOptions而后设定容许单向传输消息的地址。这里有两种地址:Outbound 以及 Inbound。Outbound地址容许服务端向浏览器端经过Event Bus发送消息,而Inbound地址容许浏览器端向服务端经过Event Bus发送消息。这里咱们只须要两个Outbound Address:microservice.monitor.metrics用做传输统计数据,events.log用做传输日志消息(2)。接着咱们就能够将配置好的BridgeOptions设置给Bridge(3),最后配置对应的路由。浏览器端的SockJS客户端会使用/eventbus/*路由路径来进行通讯。

将统计数据发送至Event Bus

在微服务架构中,监控(Monitoring)也是重要的一环。有了Vert.x的各类Metrics组件,如 Vert.x Dropwizard MetricsVert.x Hawkular Metrics,咱们能够从对应的组件中获取到统计数据。

这里咱们使用 Vert.x Dropwizard Metrics。使用方法很简单,首先建立一个MetricsService实例:

MetricsService service = MetricsService.create(vertx);

接着咱们就能够调用getMetricsSnapshot方法获取各类组件的统计数据。此方法接受一个实现了Measured接口的类。Measured接口定义了获取Metrics Data的一种规范,Vert.x中主要的类,如VertxEventBus都实现了此接口。所以传入不一样的Measured实现就能够获取不一样的数据。这里咱们传入了Vertx实例来获取更多的统计数据。获取的统计数据的格式为JsonObject

// send metrics message to the event bus
vertx.setPeriodic(metricsInterval, t -> {
  JsonObject metrics = service.getMetricsSnapshot(vertx);
  vertx.eventBus().publish("microservice.monitor.metrics", metrics);
});

咱们设定了一个定时器,每隔一段时间就向microservice.monitor.metrics地址发送当前的统计数据。

若是想了解统计数据都包含什么,请参考 Vert.x Dropwizard metrics 官方文档

如今是时候在浏览器端接收并展现统计数据以及日志消息了~

在浏览器端接收Event Bus上的消息

为了在浏览器端接收Event Bus上的消息,咱们首先须要这两个库: vertx3-eventbus-client以及sockjs。你能够经过npm或bower来下载这两个库。而后咱们就能够在代码中建立一个EventBus实例,而后注册处理函数:

var eventbus = new EventBus('/eventbus');

eventbus.onopen = () => {
  eventbus.registerHandler('microservice.monitor.metrics', (err, message) => {
      $scope.metrics = message.body;
      $scope.$apply();
  });
}

咱们能够经过message.body来获取对应的消息数据。

以后咱们将会运行这个仪表板来监视整个微服务应用的状态。

展现时间!

哈哈,如今咱们已经看完整个Micro Shop微服务的源码了~看源码看的也有些累了,如今到了展现时间了!这里咱们使用Docker Compose来编排容器并运行咱们的微服务应用,很是方便。

注意:建议预留 4GB 内存来运行此微服务应用。

构建项目以及容器

在咱们构建整个项目以前,咱们须要先经过bower获取api-gatewaymonitor-dashboard这两个组件中前端代码对应的依赖。它们的bower.json文件都在对应的src/main/resources/webroot目录中。咱们分别进入这两个目录并执行:

bower install

而后咱们就能够构建整个项目了:

mvn clean install

构建完项目之后,咱们再来构建容器(须要root权限):

cd docker
sudo ./build.sh

构建完成后,咱们就能够来运行咱们的微服务应用了:

sudo ./run.sh

当整个微服务初始化完成的时候,咱们就能够在浏览器中浏览网店页面了,默认地址是 https://localhost:8787

第一次运行?

若是咱们是第一次运行此微服务应用(或以前删除了全部的容器),咱们必须手动配置Keycloak服务器。首先咱们须要在hosts文件中添加一条记录:

0.0.0.0    keycloak-server

而后咱们须要访问 http://keycloak-server:8080而后进入管理员登陆页面。默认状况下用户名和密码都是 admin。进入管理台以后,咱们须要建立一个 Realm,名字随意(实例中给的是Vert.x)。而后进入此Realm,而且为咱们的应用建立一个Client,相似于这样:

Keycloak configuration

建立完之后,咱们进入 Installation 选项卡中来复制对应的JSON配置文件。咱们须要将复制的内容覆盖掉api-gateway/src/config/docker.json中对应的配置。好比:

{
  "api.gateway.http.port": 8787,
  "api.gateway.http.address": "localhost",
  "circuit-breaker": {
    "name": "api-gateway-cb",
    "timeout": 10000,
    "max-failures": 5
  },
  // 下面的都是Keycloak相关的配置
  "realm": "Vert.x",
  "realm-public-key": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAkto9ZZm69cmdA9e7X4NUSo8T4CyvrYzlRiJdhr+LMqELdfN3ghEY0EBpaROiOueva//iUc/KViYGiAHVXEQ3nr3kytF6uZs9iwqkshKvltpxkOm2Qpj/FSRsCyHlB8Ahbt5xBmzH2mI1VDIxmVTdEBze4u6tLoi4ieo72b2q/dz09yrEokRm/sSYqzNgfE0i1JY6DI8C7FaKszKTK5DRGMIAib8wURrTyf8au0iiisKEXOHKEjo/g0uHCFGSOKqPOprNNIWYwedV+qaQa9oSah2IpwNgFNRLtHpvbcanftMLQOQIR0iufIJ+bHrNhH0RISZhTzcGX3pSIBw/HaERwQIDAQAB",
  "auth-server-url": "http://127.0.0.1:8180/auth",
  "ssl-required": "external",
  "resource": "vertx-blueprint",
  "credentials": {
    "secret": "ea99a8e6-f503-4bdb-afbd-9ae322ee7089"
  },
  "use-resource-role-mappings": true
}

咱们还须要建立几个用户(User)以便后面经过这些用户来登陆。

更详细的Keycloak的配置过程及解释请参考Paulo的教程: Vertx 3 and Keycloak tutorial,很是详细。

修改完对应的配置文件以后,咱们必须从新构建api-gateway模块的容器,而后从新启动此容器。

欢乐的购物时间!

完成配置以后,咱们就来访问前端页面吧!

SPA Frontend

如今咱们能够访问 https://localhost:8787/login 进行登陆,它会跳转至Keycloak的用户登陆页面。若是登录成功,它会自动跳转回Micro Shop的主页。如今咱们能够尽情地享受购物时间了!这真是极好的!

咱们也能够来访问Monitor Dashboard,默认地址是 http://localhost:9100

Monitor Dashboard

一颗赛艇!

完结!

不错不错!咱们终于到达了微服务旅途的终点!恭喜!咱们很是但愿你可以喜欢此蓝图教程,而且掌握到关于Vert.x和微服务的知识 :-)

如下是关于微服务和分布式系统的一些推荐阅读材料:

享受微服务的狂欢吧!


My Blog: 「千载弦歌,芳华如梦」 - sczyh30's blog

若是您对Vert.x感兴趣,欢迎加入Vert.x中国用户组QQ群,一块儿探讨。群号:515203212

相关文章
相关标签/搜索