我的知识库java
上一篇,梳理http 数据同步策略的变动通知机制,本篇开始探究配置变动通知到达后, soul-web
端的处理响应。web
不一样数据变动的通知机制应当是一致的,故本篇以 selector 配置变动通知为切入点进行深刻。json
上回咱们说到 HttpSyncDataService 的 doLongPolling,在其内部发起通知订阅并接收响应通知:缓存
private void doLongPolling(final String server) { ... String listenerUrl = server + "/configs/listener"; ... try { // 发起监听请求 String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); log.debug("listener result: [{}]", json); groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); } catch (RestClientException e) { ... } // 处理变动通知 if (groupJson != null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); // 获取组配置 this.doFetchGroupConfig(server, changedGroups); } } }
在收到变动通知时,若存在配置组变动,则按变动组获取相应配置。async
获取组配置处理以下:ide
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) { ... String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); ... try { json = this.httpClient.getForObject(url, String.class); } catch (RestClientException e) { ... } // update local cache boolean updated = this.updateCacheWithJson(json); ... }
内部发起配置获取请求并更新本地缓存。post
由 HttpSyncDataService 实现本地缓存更新:fetch
private boolean updateCacheWithJson(final String json) { JsonObject jsonObject = GSON.fromJson(json, JsonObject.class); JsonObject data = jsonObject.getAsJsonObject("data"); // if the config cache will be updated? return factory.executor(data); }
转成 Json 对象后交由 DataRefreshFactory 进行处理。this
DataRefreshFactory 处理以下:url
public boolean executor(final JsonObject data) { final boolean[] success = {false}; ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data)); return success[0]; }
调用相应数据刷新类刷新数据。
统一由 AbstractDataRefresh 的 refresh 进行处理:
public Boolean refresh(final JsonObject data) { boolean updated = false; JsonObject jsonObject = convert(data); if (null != jsonObject) { ConfigData<T> result = fromJson(jsonObject); if (this.updateCacheIfNeed(result)) { updated = true; refresh(result.getData()); } } return updated; }
先更新本地缓存,再调用子类实现的 refresh。
此处的更新本地缓存处理,由子类 SelectorDataRefresh 的 updateCacheIfNeed 实现:
protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) { return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR); }
向父类 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置组。
父类 AbstractDataRefresh 的 updateCacheIfNeed 处理:
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) { // 首次初始化缓存 if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) { return true; } ResultHolder holder = new ResultHolder(false); GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> { // 必须比较最后更新时间 if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) { ... holder.result = true; return newVal; } ... return oldVal; }); return holder.result; }
经过比较新老缓存的 MD5 值来断定是否发生变动,存在变动则更新本地缓存(注意还有最后更新时间断定)。
SelectorDataRefresh 的 refresh 实现:
protected void refresh(final List<SelectorData> data) { if (CollectionUtils.isEmpty(data)) { log.info("clear all selector cache, old cache"); data.forEach(pluginDataSubscriber::unSelectorSubscribe); pluginDataSubscriber.refreshSelectorDataAll(); } else { // update cache for UpstreamCacheManager pluginDataSubscriber.refreshSelectorDataAll(); data.forEach(pluginDataSubscriber::onSelectorSubscribe); } }
CommonPluginDataSubscriber 实现订阅取消:
public void unSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE); }
subscribeDataHandler 对 selectorData 的 delete 处理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { ... } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { ... } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { ... } }); }
从 BaseDataCache 删除目标选择器数据,并移除选择器。
此处由 DividePluginDataHandler 提供 removeSelector 实现:
public void removeSelector(final SelectorData selectorData) { UpstreamCacheManager.getInstance().removeByKey(selectorData.getId()); }
根据 selector id 移除缓存的上游服务,注意只是从 UPSTREAM_MAP_TEMP 移除
public void removeByKey(final String key) { UPSTREAM_MAP_TEMP.remove(key); }
CommonPluginDataSubscriber 实现数据刷新:
public void refreshSelectorDataAll() { BaseDataCache.getInstance().cleanSelectorData(); }
注意这里的 refresh all 实际是作的 clean 操做。
BaseDataCache 的 cleanSelectorData 处理:
public void cleanSelectorData() { SELECTOR_MAP.clear(); }
直接清除 SELECTOR_MAP 全部数据。
CommonPluginDataSubscriber 实现订阅响应:
public void onSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); }
subscribeDataHandler 对 selectorData 的 update 处理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { ... } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { ... } } else if (data instanceof RuleData) { ... } }); }
缓存选择器数据到 BaseDataCache,并处理选择器。
此处由 DividePluginDataHandler 提供 handlerSelector 实现:
public void handlerSelector(final SelectorData selectorData) { UpstreamCacheManager.getInstance().submit(selectorData); }
提交选择器数据到 UpstreamCacheManager。
UpstreamCacheManager 的 submit 处理:
public void submit(final SelectorData selectorData) { final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class); if (null != upstreamList && upstreamList.size() > 0) { UPSTREAM_MAP.put(selectorData.getId(), upstreamList); UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList); } else { UPSTREAM_MAP.remove(selectorData.getId()); UPSTREAM_MAP_TEMP.remove(selectorData.getId()); } }
根据 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。
本篇梳理和分析了配置变动通知到达后 soul-web
端的处理流程,最终处理主要是更新本地配置缓存以及维护上游服务散列表。
soul-web
收到变动通知后处理流程以下:
soul-web 端收到响应
- 若配置组数据存在变动,则发起获取配置请求获取最新配置信息
- 更新配置组缓存
- 循环处理配置数据刷新事件
- 若最新配置数据为空,则删除本地配置数据并移除上游服务
- 若最新配置数据不为空,则缓存配置组数据并更新上游服务
- 若配置组数据无变动,不做处理