hello,你们好,我是小黑,又和你们见面啦~~java
在配置中心中,有一个经典的 pub/sub 场景:某个配置项发生变动以后,须要实时的同步到各个服务端节点,同时推送给客户端集群。redis
在以前实现的简易版配置中心中是经过 redis 的 pub/sub 来实现的。这种实现虽然简单,但却强依赖了 redis。数据库
配置中心做为一个基础组件,若是能尽量的减小外部依赖,那对使用方来讲必定是更友好的。那么,有没有可能不使用 MQ 来实现 pub/sub 的场景呢?答案是确定的。服务器
Apollo 在实现上述场景时,并无选用基于 MQ 来进行实现,而是经过数据库实现了一个简单的消息队列。示意图以下:app
大体实现方式以下:异步
下面,就让咱们带着这几个问题来学习一下源码吧。(画外音:思路比源码更重要)学习
Admin Service 在配置发布后会调用 DatabaseMessageSender#sendMessage
方法,该方法主要作了两件事情:spa
DatabaseMessageSender#toClean
队列中。为何要记录当前保存的 ReleaseMessage Id 呢?线程
在 DatabaseMessageSender
中有个定时任务,会去清除比当前 ID 小的 ReleaseMessage。设计
Config Service 中经过 ReleaseMessageScanner
组件会每秒(默认配置下)扫描一次 ReleaseMessage 表,来获取最新的消息。
有了这个基于 DB 的 pub/sub,Admin Service 在配置发布以后,每一个 Config Service 都会经过 DB 来感知到这个消息,而后再通知给客户端。
那 Config Service 又是如何通知客户端的呢?
在 Apollo 的设计中,配置发生更新以后,并非服务端主动推给客户端的,并且客户端经过长轮询的方式向服务端询问是否有配置发生了变动。大体思路为:若是在 60 秒内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端;若是有该客户端关心的配置发布,请求就会当即返回,客户端从返回的结果中获取到配置变化的 namespace 后,会当即请求 Config Service 获取该 namespace 的最新配置。
客户端的相关代码在 RemoteConfigLongPollService#doLongPollingRefresh
,代码比较简单,感兴趣的同窗能够自行查阅。
这里咱们重点看一下服务端是如何实现的。
在传统的 servlet
模型中,每一个请求都是由某个线程处理的,若是一个请求处理的时间较长,那么这种基于线程池的同步模型很快就会把全部线程耗尽,致使服务器没法响应新的请求。
在 servlet 3.0
中引入了异步支持,容许对一个请求进行异步处理,工做线程在此期间不会被阻塞,能够继续处理传入的客户端请求。
从 Spring 3.2 开始,可使用 DeferredResult
来实现异步处理。使用 DeferredResult
时,能够设置超时,超时以后自动返回超时错误响应。同时,能够在另外一个线程中,能够调用其 setResult()
写入结果返回。
在 Apollo 客户端长轮询的地址为 /notifications/v2
,对应的服务端代码为 NotificationControllerV2
。
在 NotificationControllerV2
中就使用了 Spring 的 DeferredResult
来实现的。本文重在解决问题的思路,就不展现源码了,感兴趣的同窗能够本身阅读一下源码。不过,小黑同窗写了一个简单的 demo 来帮助咱们理解一下 DeferredResult
的使用。
@Slf4j @RestController public class DeferredResultDemoController { private final Multimap<String, DeferredResult<String>> deferredResults = ArrayListMultimap.create(); @GetMapping("/info") public DeferredResult<String> info(String key) { // 设置 1 秒超时时间,设置超时是返回的结果 DeferredResult<String> result = new DeferredResult<>(1000L, "key not change"); // 将 result 放到 deferredResults 中, key 即为当前请求所关心的配置项 deferredResults.put(key, result); // 若是超时,移除当前 DeferredResult,并打印日志,同时返回 DeferredResult 构造器中传入的结果 result.onTimeout(() -> { deferredResults.remove(key, result); log.info("time out key not change"); }); // 若是完成了,则从 deferredResults 中移除当前 DeferredResult result.onCompletion(() -> deferredResults.remove(key, result)); return result; } @PostConstruct public void init() { new Thread(() -> { while (true) { try { TimeUnit.MILLISECONDS.sleep(700); } catch (InterruptedException e) { log.info(e.getMessage(), e); } // 定时任务,模拟配置更新 // 当 hello key 发生变动以后,从 deferredResults 获取到相关的 DeferredResult,经过 setResult 方法设置返回结果,同时移除 deferredResults if (deferredResults.containsKey("hello")) { Collection<DeferredResult<String>> results = deferredResults.removeAll("hello"); results.forEach(stringDeferredResult -> stringDeferredResult.setResult("hello key change :" + System.currentTimeMillis())); } } }).start(); } }