dubbo对服务运行的监控,是经过从provider和consumer方收集调用信息存盘后,再由监控中心对数据分析绘表的方式完成的。
具体实现是provider和consumer向监控中心推数据。
今天以服务消费方为例,经过源码分析下消费方向监控中心上报数据的过程。
配置监控中心的两种方式:java
<!--1,表示从注册中心发现监控中心地址--> <dubbo:monitor protocol="registry"></dubbo:monitor> <!--2,直连监控中心服务器地址--> <dubbo:monitor address="10.47.17.170"></dubbo:monitor> <!--配置过滤器monitor,dubbo是经过过滤器实现调用信息上报的--> <dubbo:reference id="demoService" interface="demo.dubbo.api.DemoService" timeout="6000" filter="monitor"/>
以上spring配置里的<dubbo:monitor>标签的解析,在ReferenceBean的afterPropertiesSet方法中,逻辑以下spring
public void afterPropertiesSet() throws Exception { //....其余代码略 if (getMonitor() == null && (getConsumer() == null || getConsumer().getMonitor() == null) && (getApplication() == null || getApplication().getMonitor() == null)) { //解析MonitorConfig类,从容器中获取monitorConfig对象 Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false); if (monitorConfigMap != null && monitorConfigMap.size() > 0) { MonitorConfig monitorConfig = null; for (MonitorConfig config : monitorConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (monitorConfig != null) { throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config); } monitorConfig = config; } } //把解析后对象赋值给monitor属性,后面构造代理会用到 if (monitorConfig != null) { setMonitor(monitorConfig); } } } }
在构造代理逻辑在ReferenceConfig类的createProxy方法中,由于咱们这里走注册中心发现监控中心,因此看下面一段逻辑:api
//构造注册中心url List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { //经过注册中心的url构造monitor Url(***跟踪下loadMonitor***) URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { //放置监控url到map key为“monitor”(***重点在这里***) map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); }
跟到AbstractInterfaceConfig类的loadMonitor方法:缓存
/*** * 构造监控中心URL * @param registryURL * @return */ protected URL loadMonitor(URL registryURL) { if (monitor == null) { //没有配置监控中心,从dubbo.monitor.address属性中获取 String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address"); //获取监控中心服务发现协议,好比经过注册中心 String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol"); if (monitorAddress != null && monitorAddress.length() > 0 || monitorProtocol != null && monitorProtocol.length() > 0) { //都没有配置,new一个对象 monitor = new MonitorConfig(); } else { //没有注册中心 return null; } } //把属性文件中的的值,填充到monitor对象里 appendProperties(monitor); Map<String, String> map = new HashMap<String, String>(); // //这里接口固定是MonitorService.class.getName(),就是固定经过这个接口提供服务上报服务 //这里的MonitorService服务是由监控中心实现并注册的到注册中心。 map.put(Constants.INTERFACE_KEY, MonitorService.class.getName()); map.put("dubbo", Version.getVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } //把monitor对象里的属性,放到map里去,key是对象属性名 appendParameters(map, monitor); String address = monitor.getAddress(); String sysaddress = System.getProperty("dubbo.monitor.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } //设置监控protocal if (ConfigUtils.isNotEmpty(address)) { if (!map.containsKey(Constants.PROTOCOL_KEY)) { if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) { map.put(Constants.PROTOCOL_KEY, "logstat"); } else {//没有logstat spi扩展,就用dubbo协议 map.put(Constants.PROTOCOL_KEY, "dubbo"); } } //构造经过address和map,构造url return UrlUtils.parseURL(address, map); } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) { //若是monitor配置是经过注册中心发现,监控服务,设置protocol是dubbo, 添加参数 protocol=registry,refer=StringUtils.toQueryString(map) return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)); } return null; }
以上逻辑构造了monitorUrl并经过 monitor key放入url的参数中。
因为dubbo是经过过滤器上报监控数据的,(关于dubbo使用过滤器机制,还要从dubbo aop实现入手),下面分析下具体过滤器如何使用monitorUrl的,能够看懂文章开始咱们配置的过滤器是“monitor”
因此这里,看下Filter的monitor spi实现,MonitorFilter类,具体在invoke方法里:服务器
//调用过程拦截 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()以前获取context信息 String remoteHost = context.getRemoteHost(); long start = System.currentTimeMillis(); // 记录起始时间戮 getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数++ try { Result result = invoker.invoke(invocation); // 让调用链往下执行 //上报调用统计信息(***看这里**) collect(invoker, invocation, result, remoteHost, start, false); return result; } catch (RpcException e) { collect(invoker, invocation, null, remoteHost, start, true); throw e; } finally { getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数++ } } else { return invoker.invoke(invocation); } } //具体 private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { try { // ---- 服务信息获取 ---- long elapsed = System.currentTimeMillis() - start; // 计算调用耗时 int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数 String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY); String service = invoker.getInterface().getName(); // 获取服务名称 String method = RpcUtils.getMethodName(invocation); // 获取方法名 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); //经过 monitor key 获取监控url (***看这里**),这里monitorFactory是spi机制生成的MonitorFactory$Adaptive //这里实际是走的DubboMonitorFactroy类的getMonitor方法 Monitor monitor = monitorFactory.getMonitor(url); int localPort; String remoteKey; String remoteValue; if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { // ---- 服务消费方监控 ---- localPort = 0; remoteKey = MonitorService.PROVIDER; remoteValue = invoker.getUrl().getAddress(); } else { // ---- 服务提供方监控 ---- localPort = invoker.getUrl().getPort(); remoteKey = MonitorService.CONSUMER; remoteValue = remoteHost; } String input = "", output = ""; if (invocation.getAttachment(Constants.INPUT_KEY) != null) { input = invocation.getAttachment(Constants.INPUT_KEY); } if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { output = result.getAttachment(Constants.OUTPUT_KEY); } //经过上面构造的监控上报工具,上报数据(***看这里**) monitor.collect(new URL(Constants.COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + "/" + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), Constants.INPUT_KEY, input, Constants.OUTPUT_KEY, output)); } catch (Throwable t) { logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } }
看下DubboMonitorFactroy类的getMonitor方法,实如今其父类AbstractMonitorFactory中:并发
public Monitor getMonitor(URL url) { //这里设置上报服务接口MonitorService url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName()); String key = url.toServiceStringWithoutResolving(); LOCK.lock(); try { //从缓存中获取 Monitor monitor = MONITORS.get(key); if (monitor != null) { return monitor; } //经过url建立monitor,在子类DubboMonitorFactroy中实现 monitor = createMonitor(url); if (monitor == null) { throw new IllegalStateException("Can not create monitor " + url); } MONITORS.put(key, monitor); return monitor; } finally { // 释放锁 LOCK.unlock(); } }
DubboMonitorFactroy里实现的createMonitor方法:app
protected Monitor createMonitor(URL url) { //这里会经过url的protocol参数获取协议值,若是是经过注册中心发现监控中心服务的方式,这里 //protocol的值是registry,不然就是dubbo url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo")); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(MonitorService.class.getName()); } String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY); if (filter == null || filter.length() == 0) { filter = ""; } else { filter = filter + ","; } //监控中心服务配置多个的场景,这里默认使用failsafe容错机制 url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false), Constants.REFERENCE_FILTER_KEY, filter + "-monitor"); //这里protocol也是Protocol$Adpative的,若是协议是registry 要走经过注册中心发现服务那一套逻辑。 Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url); //建立服务代理代理 MonitorService monitorService = proxyFactory.getProxy(monitorInvoker); //最后构造BubboMonitor对象 return new DubboMonitor(monitorInvoker, monitorService); }
这里看下DubboMonitor类继承图,能够看到它实现了MonitorService接口异步
//构造函数 public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) { this.monitorInvoker = monitorInvoker; this.monitorService = monitorService; this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000); // 启动统计信息收集定时器,设置上报频率monitorInterval,因此说,上报数据是异步的 sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { // send方法收集统计信息 try { (***看这里***) send(); } catch (Throwable t) { // 防护性容错 logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t); } } }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS); } //从本地静态变量中获取统计信息,经过远程服务monitorService接口方法上报。 public void send() { if (logger.isInfoEnabled()) { logger.info("Send statistics to monitor " + getUrl()); } String timestamp = String.valueOf(System.currentTimeMillis()); for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) { // 获取已统计数据 Statistics statistics = entry.getKey(); AtomicReference<long[]> reference = entry.getValue(); long[] numbers = reference.get(); long success = numbers[0]; long failure = numbers[1]; long input = numbers[2]; long output = numbers[3]; long elapsed = numbers[4]; long concurrent = numbers[5]; long maxInput = numbers[6]; long maxOutput = numbers[7]; long maxElapsed = numbers[8]; long maxConcurrent = numbers[9]; // 发送汇总信息 URL url = statistics.getUrl() .addParameters(MonitorService.TIMESTAMP, timestamp, MonitorService.SUCCESS, String.valueOf(success), MonitorService.FAILURE, String.valueOf(failure), MonitorService.INPUT, String.valueOf(input), MonitorService.OUTPUT, String.valueOf(output), MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), MonitorService.MAX_INPUT, String.valueOf(maxInput), MonitorService.MAX_OUTPUT, String.valueOf(maxOutput), MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed), MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent) ); //调用监控中心发布的MonitorService服务,上报调用统计信息 monitorService.collect(url); // 减掉已统计数据 long[] current; long[] update = new long[LENGTH]; do { current = reference.get(); if (current == null) { update[0] = 0; update[1] = 0; update[2] = 0; update[3] = 0; update[4] = 0; update[5] = 0; } else { update[0] = current[0] - success; update[1] = current[1] - failure; update[2] = current[2] - input; update[3] = current[3] - output; update[4] = current[4] - elapsed; update[5] = current[5] - concurrent; } } while (!reference.compareAndSet(current, update)); } } //而DubboMonitor自己的collect方法,供信息上报处,过滤器中调用 //每次的调用信息,放入本地静态变量statisticsMap中, public void collect(URL url) { // 读写统计变量 int success = url.getParameter(MonitorService.SUCCESS, 0); int failure = url.getParameter(MonitorService.FAILURE, 0); int input = url.getParameter(MonitorService.INPUT, 0); int output = url.getParameter(MonitorService.OUTPUT, 0); int elapsed = url.getParameter(MonitorService.ELAPSED, 0); int concurrent = url.getParameter(MonitorService.CONCURRENT, 0); // 初始化原子引用 Statistics statistics = new Statistics(url); AtomicReference<long[]> reference = statisticsMap.get(statistics); if (reference == null) { statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>()); reference = statisticsMap.get(statistics); } // CompareAndSet并发加入统计数据 long[] current; long[] update = new long[LENGTH]; do { current = reference.get(); if (current == null) { update[0] = success; update[1] = failure; update[2] = input; update[3] = output; update[4] = elapsed; update[5] = concurrent; update[6] = input; update[7] = output; update[8] = elapsed; update[9] = concurrent; } else { update[0] = current[0] + success; update[1] = current[1] + failure; update[2] = current[2] + input; update[3] = current[3] + output; update[4] = current[4] + elapsed; update[5] = (current[5] + concurrent) / 2; update[6] = current[6] > input ? current[6] : input; update[7] = current[7] > output ? current[7] : output; update[8] = current[8] > elapsed ? current[8] : elapsed; update[9] = current[9] > concurrent ? current[9] : concurrent; } } while (!reference.compareAndSet(current, update)); }
以上梳理了下,服务消费方配置监控中心并上报调用数据的流程,
服务提供方上报监控中心的流程是同样的。一样使用这个过滤器完成。
下次再梳理下,监控中心自己的处理逻辑。ide