本文创意来自一次业务需求,此次须要接入一个第三方外部服务。因为这个服务只提供异步 API,为了避免影响现有系统同步处理的方式,接入该外部服务时,应用对外屏蔽这种差别,内部实现异步请求同步。html
全文摘要:git
现有一个系统,总体架构以下所示:github
这是一个很常见的同步设计方案,上游系统须要等待下游系统接口返回调用结果。apache
如今须要接入另一个第三方服务 B,该服务与服务 A 最大区别在于,这是一个异步 API
。调用以后,仅仅返回受理成功,处理结果后续经过异步通知返回。编程
接入以后,总体架构以下所示:网络
因为网络隔离策略,通知接收程序与通讯服务须要单独分开部署。若没此要求,能够将通讯服务 B 与通知接收程序合并成一个应用。架构
另外图中全部应用采用双节点部署。并发
为了避免影响 OpenAPI
上游系统同步处理逻辑,通讯服务 B 调用第三方服务以后,不能马上返回,须要等待结果通知,拿到具体返回结果。这就须要通讯服务 B 内部将异步转为同步。异步
这就是一个典型的异步转同步问题,整个过程涉及两个问题。socket
问题 1 的解决方案参考了 Dubbo 设计思路。
咱们在使用 Dubbo 调用远程服务时,默认状况下,这是一种阻塞式调用方式,即 Consumer 端代码一直阻塞等待,直到 Provider 端返回为止。
因为 Dubbo 底层基于 Netty
发送网络请求,这其是一个异步的过程。为了让业务线程能同步等待,这个过程就须要将异步转为同步。
Dubbo 发起远程调用代码位于 DubboInvoker#doInvoke
:
Dubbo 版本为:2.6.X 版本。2,7.X 重构
DefaultFuture
,可是本质原理仍是同样。
默认状况下,Dubbo 支持同步调用方式,这里将会建立 DefaultFuture
对象。
这里有个很是重要逻辑,每一个请求生成一个惟一 ID,而后将 ID
与 DefaultFuture
映射关系,存入 Map
中。
这个请求 ID 在之因此这么重要,是由于消费者并发调用服务发送请求,同时将会有多个业务线程进入阻塞。当收到响应以后,咱们须要唤醒正确的等待线程,并将处理结果返回。
经过 ID 这个惟一映射的关系,很天然能够找到其对应 DefaultFuture
,唤醒其对应的业务线程。
业务线程调用 DefaultFuture#get
方法进入阻塞。这段代码比较简单,经过调用 Condition#await
阻塞线层。
当消费者接收到服务提供者的返回结果,将会调用 DefaultFuture#received
方法。
经过响应对象中的惟一 ID,找到其对应 DefaultFuture
对象,从而将结果设置 DefaultFuture
对象中,而后唤醒的相应的业务线程。
这里实际有个优化点,使用 done#signalAll 代替 done#signal。使用 condition 等待通知机制的时候须要注意这一点。
正常状况下,当消费者接收到响应以后,将会从 FUTURES
这个 Map
移除 DefaultFuture
。
可是在异常状况下,服务提供者若处理缓慢,不能及时返回响应结果,消费者业务线程将会由于超时苏醒。这种状况下 FUTURES
积压了无效 DefaultFuture
对象。若是不及时清理,极端状况下,将会发生 OOM 。
DefaultFuture
内部将会开启一个异步线程,定时轮询 FUTURES
判断 DefaultFuture
超时时间,及时清理已经无效(超时)的 DefaultFuture
。
根据 Dubbo 解决思路,问题 1 解决办法就比较简单了。具体流程以下:
Map
存储对应关系,并使业务线程阻塞等待这个设计过程须要注意设置合理的超时时间,这个超时时间须要考虑远程服务调用耗时,能够参考以下公式:
业务线程等待时间=通讯服务 B 接口的超时时间 - 调用第三方服务 B 接口消耗时间
这里就不贴出具体的代码,详细代码参考 Dubbo DefaultFuture
。
接下来重点看下通知服务如何将结果转发给正确的通讯服务 B 的节点。这里想到两种方案:
通讯服务 B 使用 SocketServer 构建一个服务接收程序,当通知接收程序收到第三方服务 B 通知时,经过 Socket
将结果转发给通讯服务 B。
整个系统架构以下所示:
因为生产服务双节点部署,通知接收程序就不能写死转发地址。这里咱们将请求 ID 与通讯服务 B socket
服务地址关系存入 Redis
中,而后通知接收程序经过 ID 找到正确的地址。
这个方案说实话有点复杂。
第一 SocketServer 编码难度较大,编写一个高效 SocketServer 就比较难,一不当心可能产生各类 Bug。
第二通讯服务 B 服务地址配置在配置文件中,因为两个节点地址不一样,这就致使同一应用存在不一样配置。这对于后面维护就很不友好。
第三额外引入 Redis
依赖,系统复杂度变高。
相对 SocketServer
方案,MQ
方案相对简单,这里采用 MQ
广播消费的方式,架构如图所示:
通知接收程序收到异步通知以后,直接将结果发送到 MQ
。
通讯服务 B 开启广播消费模式,拉取 MQ
消息。
通讯服务 B_1 拉取消息,经过请求 ID 映射关系,没找到内部等待的线程,知道这不是本身的等待消息,因而 B_1 直接丢弃便可。
通讯服务 B_2 拉取消息,经过请求 ID 映射关系,顺利找到正在等待的线程,而后能够唤醒等待线程,返回最后的结果。
对比 SocketServer
方案,MQ
方案总体流程比较简单,编程难度低,也没用存在特殊的配置。
不过这个方案十分依赖 MQ
消息实时性,若 MQ
消息投递延迟很高,这就会致使通讯服务 B 业务线程超时苏醒,业务异常返回。
这里咱们选择使用 RocketMQ
,长轮询 Pull
方式,可保证消息很是实时,
综上,这里采用 MQ
的方案。
异步转同步咱们须要解决同步阻塞,以及如何唤醒的问题。
阻塞/唤醒能够分别使用 Condition#await/signalAll
。不过这个过程咱们须要生成一个惟一请求 ID,而且保存这个 ID 与业务线程映射关系。后续等到结果返回咱们才能经过惟一 ID 唤醒正确等待线程。
只要了解上面几点,异步转同步的问题就就能够迎刃而解。
另外,若是你也有碰到异步转同步问题,本文的方案但愿对你有帮助。若是你有其余设计方案,欢迎留言,一块儿讨论~
这篇文章其实写了挺久的,写的挺可贵。以前很早想到写这篇文章,可是没想好到底咋写,艰难产出。
看到这里,点个关注呀,点个赞呗。别下次必定啊,大哥。写文章很辛苦的,须要来点正反馈。
才疏学浅,不免会有纰漏,若是你发现了错误的地方,还请你留言给我指出来,我对其加以修改。
感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注
欢迎关注个人公众号:程序通事,得到平常干货推送。若是您对个人专题内容感兴趣,也能够关注个人博客:studyidea.cn