响应式架构与 RxJava 在有赞零售的实践

随着有赞零售业务的快速发展,系统和业务复杂度也在不断提高。如何解决系统服务化后,多个系统之间的耦合,提高业务的响应时间与吞吐量,有效保证系统的健壮性和稳定性,是咱们面临的主要问题。结合目前技术体系和业务特色的思考,咱们在业务中实践了响应式架构以及RxJava框架,来解决系统与业务复杂所带来的问题。java

##实践响应式架构 响应式架构是指业务组件和功能由事件驱动,每一个组件异步驱动,能够并行和分布式部署及运行。编程

响应式架构能够带来如下优点:设计模式

  • 大幅度下降应用程序内部的耦合性
  • 事件传递形式简化了并行程序的开发工做,使开发人员无须与并发编程基础元素打交道,同时能够解决许多并发编程难题,如死锁等。
  • 响应式架构可以大幅度提升调用方法的安全性和速度。
  • 对复杂业务系统的领域建模,响应式架构能够自然支持。每一个系统组件就能够对应到一个业务实体,业务实体之间经过接收事件来完成一次业务操做。

咱们使用响应式架构主要是为解决多个系统间的屡次远程调用带来的分布式问题,尤为在长任务场景中,响应式架构显得尤为必要。安全

有赞连锁出现后,随着连锁商家经营规模的扩张,会在系统中建立新的门店。建立新门店会引起一系列业务初始化工做,例如店铺、员工、仓库、商品、库存等业务域,而且各业务域之间存在必定的依赖关系(如图1所示),例如商品依赖仓库初始化完成。架构

图1连锁新建分店系统依赖关系

图1 连锁新建分店系统依赖关系

商家新增门店时,在店铺初始化完成后,连锁系统发送店铺初始化成功消息,相应系统对事件进行响应,处理完成(成功/失败)后将回执给连锁系统,连锁系统根据相关业务的反馈,决定是继续通知下游业务,仍是结束整个过程。新建门店部分流程如图2所示。并发

在建立门店业务中,每一个系统响应连锁系统发出的消息,处理完成后进行回执。经过这种模式,业务系统自己不关心其余系统是否成功或失败,只需对通知的事件进行处理,总体初始化进度与异常处理由连锁系统来控制。这种设计使得各业务系统之间没有直接耦合并保持相互独立。 框架

图2 连锁体系新增分店消息驱动图

图2 连锁体系新增分店消息驱动图

上面的案例介绍了在复杂业务场景下系统间对响应式架构的实践,系统内部一样会遇到复杂业务场景。下面介绍下在系统内部应对复杂业务的实践。异步

##RxJava在有赞零售实践分布式

Rxjava是用来编写异步和基于消息的程序的类库。RxJava在Android有着普遍的使用,主要应用在用户界面绘制与服务端通信等场景。RxJava的核心思想是响应式编程以及事件、异步这两个特色。响应式编程是一种经过异步和事件流来构建程序的编程模型。在复杂的业务开发中,最棘手的问题就是如何清晰直观的展示复杂的业务逻辑,而且方便后续的业务维护与扩展。微服务

###响应式编程使得复杂业务逻辑更清晰

有赞零售的业务场景中有着复杂的业务逻辑,有赞目前提供多种产品供商家选择,商家在不一样产品进行切换时,为了商家更好的体验,不一样业务的切换会进行数据初始化与处理。例若有赞微商城转换到有赞零售。

这里拿着微商城升级零售的业务场景给你们举例。微商城升级为零售时须要对商品进行转换。首先初始化店铺基础信息。而后读取商品流,将微商城的商品类型转换成零售支持的商品类型。最后读取规格,为规格建立供应链商品库,建立门店商品与添加网店商品的供应链商品关联关系。总体转换流程如图3所示。图中也画出了能够并发处理的场景。

图3 微商城升级有赞零售流程

图3 微商城升级有赞零售流程

若是单纯使用设计模式来解决上面这种场景单1、但业务逻辑特别复杂的场景,是很难作到的。也能够看到除了初始化信息那一步,后面的商品模型转化自始至终在业务中流转的事件都是商品,这里就可使用RxJava来优化业务代码使得处理流程能够并发,加快升级速度。

最终咱们按照图3的流程处理升级逻辑,其中的并发场景,好比保存完零售商品后,并发处理库存、和销售渠道,使用rxjava封装的方法帮助咱们进行并发操做。以下所示代码结构清晰,对外屏蔽了复杂的并发处理逻辑。

Observable.zip(
    callAsync(()->处理库存相关操做),
    callAsync(()->更新商品库门店销售渠道),
    callAsync(()->建立商品库与网店商品关联关系),
    (sku1,sku2,sku3)-> sku
).blockingFirst();
复制代码

最终咱们的总体的代码

UpgradeItem.listItems(manager, shop)
    .flatMap(item-> fromCallable(()->更新为零售商品类型))
    .flatMap(item-> fromCallable(()->并发处理商品操做), true)
    .flatMap(item-> 商品流转化为sku流, true)
    .flatMap(sku-> fromCallable(()->保存零售商品))
    .flatMap(sku-> fromCallable(()->并发处理保存商品后续操做, true)
    .subscribeOn(Schedulers.io());
复制代码

整个商品处理流程就是上面这段代码,一目了然,后面扩展能够本身在中间加入处理流程,也能够在对应业务方法中修改逻辑。

###多服务、数据源组合 随着微服务架构兴起,咱们将不一样的业务域拆分红不一样的系统。这样方便了系统的维护,提高了系统的扩展性,可是给上层业务系统也带来了不少麻烦。每每咱们为了展现一个页面会涉及到2-3个或更多的应用,而屡次的分布式调用不但使得系统的rt增长,也使得核心页面的出错风险更高。

下降rt:在假设第三方接口已经达到性能顶点的状况下,并发是解决屡次分布式调用下降rt的经常使用方法。

自动降级:传统编程方法中,自动降级处理,意味着咱们代码中会出现一大堆try/catch,而使用rxjava,咱们能够直接定义当流处理异常时,程序须要怎么作,这样的代码看起来很是简洁。

商品搜索做为商品管理的核心入口,根据不一样场景聚合商品、优惠、库存等信息。因为商品列表页展现的信息涉及到多服务数据的整合,一方面须要保证整个接口的rt,另外一方面不但愿因为一个商品数据或外部服务的异常影响到整个商品列表的加载。所以该场景很是适用于RxJava。

图4 商品后台搜索业务流程

最终咱们的代码

1.根据入参获取商品加载器

//只有包含的merger才会加载
List<SkuAttrMerger> validMergers = 
    Observable.fromIterable(skuAttrMergers).filter(loader -> request.getAttributes().contains(loader.supportAttribute().getValue())).toList().blockingGet();
复制代码

2.根据es结果获取商品各个属性详情并加载到SkuAttrContext中(某类属性加载失败则忽略)

//调用load并发加载数据到商品属性上下文中
Observable.fromIterable(商品信息加载器列表)
.flatMap(商品信息加载器-> Observable.fromCallable(() ->异步加载商品信息))
.onErrorResumeNext(Observable.empty())//若是失败则忽略
.subscribeOn(Schedulers.io()),false,线程数(为加载器数 
量)).blockingSubscribe();
复制代码

3.组装搜索结果(若是某个sku组装失败则直接忽略)

//调用merge将数据合并到目标对象
商品搜索返回结果列表 = Observable.fromIterable(商品id列表)
    .map(商品id->初始化商品搜索结果返回对象)
    .flatMap(商品搜索结果返回对象-> {
        val observables=Observable.fromIterable(商品加载器列表)
            .map(loader -> Observable.fromCallable(() ->合并每一个sku的不一样属性)).toList().blockingGet();
        return Observable.zipIterable(observables, (a) -> sku, false, 线程数)
        .onErrorResumeNext(Observable.empty()); //若是失败则忽略
        }, false, 1)
    .toList()
    .blockingGet();
复制代码

##后记

本文主要介绍了响应式架构与RxJava在有赞零售的使用场景。目前咱们对响应式架构的实践方式是:在系统间使用消息中间件来进行实现,在系统内则使用RxJava实现异步化和响应式编程。对于响应式架构的思想,咱们也在探索阶段,并在部分业务场景进行实践。将来面对愈来愈复杂的零售业务场景,会用响应式架构全面实现系统业务的异步化。总的来讲响应式架构思想为提高复杂业务系统健壮性、灵活性提供了强有力的支撑。后面你们若是想更多的讨论响应式架构与编程的实践,欢迎联系咱们。

相关文章
相关标签/搜索