如上为Skywalking的总体领域概念设计,基于领域模型设计,咱们能够获取不少信息:java
存储插件化git
存储模块化es6
存储能力多样性面试
总体源码结构以下:算法
存储能力主要包括:express
elasticsearchapache
influxdb微信
jaeger架构
jdbc-hikaricp并发
zipkin
这里只是简单分析elasticsearch7存储的源码,也是很是概要的分析,为何呢主要是想带着你们分析,让你们也具有源码分析的能力,并热爱分析各类框架的源码。
首先看storage-elasticsearch7-plugin目录下的resources/META-INF.services目录下的org.apache.skywalking.oap.server.library.module.ModuleProvider文件,这个就是模块化设计
## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.##
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider
其次看StorageModuleElasticsearch7Provider
prepare()方法
public void prepare() throws ServiceNotProvidedException { if (!StringUtil.isEmpty(config.getNameSpace())) { //获取配置中心关于Elasticsearch7的配置-es的命名空间 config.setNameSpace(config.getNameSpace().toLowerCase()); } if (!StringUtil.isEmpty(config.getSecretsManagementFile())) { MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor( 10, readableContents -> { final byte[] secretsFileContent = readableContents.get(0); if (secretsFileContent == null) { return; } Properties secrets = new Properties(); secrets.load(new ByteArrayInputStream(secretsFileContent)); config.setUser(secrets.getProperty("user", null)); config.setPassword(secrets.getProperty("password", null)); config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearch7Client == null) { //In the startup process, we just need to change the username/password } else { // The client has connected, updates the config and connects again. elasticSearch7Client.setUser(config.getUser()); elasticSearch7Client.setPassword(config.getPassword()); elasticSearch7Client.setTrustStorePass(config.getTrustStorePass()); elasticSearch7Client.connect(); } }, config.getSecretsManagementFile(), config.getTrustStorePass()); /** * By leveraging the sync update check feature when startup. */ monitor.start(); }
//初始化客户端,包括es集群节点、es协议以及信任的存储路径 elasticSearch7Client = new ElasticSearch7Client( config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config .getTrustStorePass(), config.getUser(), config.getPassword(), indexNameConverters(config.getNameSpace()) );
//注册各类DAO客户端,完成基于DAO插件模块的设计的初始化 this.registerServiceImplementation( IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests() )); this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client)); this.registerServiceImplementation( INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO( elasticSearch7Client, config.getResultWindowMaxSize() )); this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize())); this.registerServiceImplementation( IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize())); this.registerServiceImplementation( IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation( IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearch7Client));}
start()方法
public void start() throws ModuleStartException { MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); //开启健康检查 elasticSearch7Client.registerChecker(healthChecker); try { //开启链接 elasticSearch7Client.connect(); //完成es在OAP端的安装(由于要区分es6和es7),因此就作了这么一个模块 StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); }}
源码分析-分布式链路追踪:Skywalking 往期文章:
Skywalking、SpringCloudGateway以及SpringWebFlux如何融合
源码分析Nacos往期文章:
欢迎加入做者的知识星球,里面会有不少精彩的内容:
欢迎加我的微信号,交流技术:
做者近期文章运营计划:
已经开通gitchat帐户,并持续输出了不少高质量的文章
我的微信公众号-架构师玄学之路已经和开源中国原创计划绑定,会实时推送到开源中国
已经开通infoq平台帐号,而且会持续输出关于架构师的经典文章
开通知识星球,开始试运行,欢迎你们来关注
gitchat精彩文章列表:
从高性能、高可用及高并发角度剖析 RocketMQ
一篇文章就能搞定基础面试:Java 并发包(JUC)及应用场景,助你能够反撸面试官
调侃面试官,分布式选举算法 Raft 在 Nacos 中的应用
你所不知道的 RocketMQ 的集群管理:副本机制
分布式链路追踪:Skywalking 底层存储模型设计
分布式链路追踪:Skywalking 的链路模型设计
分布式链路追踪:Skywalking 探针模型设计
分布式链路追踪:集群管理设计
分布式链路追踪 SkyWalking:配置管理设计
分布式链路追踪 Skywalking:底层通讯设计
分布式链路追踪 Skywalking:告警和度量架构设计
分布式链路追踪 Skywalking:插件化和模块化架构设计
SkyWalking 分布式链路追踪:最新 Kafka 通讯模型设计
分布式链路追踪:Spring-Cloud-Sleuth 探针模型设计
Spring Cloud Sleuth:分布式链路追踪之通讯模型设计
做者-游侠,一名对技术、架构、业务和管理如何融合,并孜孜不倦的高级码农。
本文分享自微信公众号 - 架构师玄学之路(andy_aty)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。