源码分析-分布式链路追踪:Skywalking存储插件能力-elasticsearch

如上为Skywalking的总体领域概念设计,基于领域模型设计,咱们能够获取不少信息:java

  • 存储插件化git

  • 存储模块化es6

  • 存储能力多样性面试

总体源码结构以下:算法

存储能力主要包括:express

  • elasticsearchapache

  • influxdb微信

  • jaeger架构

  • jdbc-hikaricp并发

  • zipkin

这里只是简单分析elasticsearch7存储的源码,也是很是概要的分析,为何呢主要是想带着你们分析,让你们也具有源码分析的能力,并热爱分析各类框架的源码。

首先看storage-elasticsearch7-plugin目录下的resources/META-INF.services目录下的org.apache.skywalking.oap.server.library.module.ModuleProvider文件,这个就是模块化设计

## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.##
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider


其次看StorageModuleElasticsearch7Provider

  • prepare()方法

@Overridepublic void prepare() throws ServiceNotProvidedException { if (!StringUtil.isEmpty(config.getNameSpace())) { //获取配置中心关于Elasticsearch7的配置-es的命名空间 config.setNameSpace(config.getNameSpace().toLowerCase()); } if (!StringUtil.isEmpty(config.getSecretsManagementFile())) { MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor( 10, readableContents -> { final byte[] secretsFileContent = readableContents.get(0); if (secretsFileContent == null) { return; } Properties secrets = new Properties(); secrets.load(new ByteArrayInputStream(secretsFileContent)); config.setUser(secrets.getProperty("user", null)); config.setPassword(secrets.getProperty("password", null)); config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearch7Client == null) { //In the startup process, we just need to change the username/password } else { // The client has connected, updates the config and connects again. elasticSearch7Client.setUser(config.getUser()); elasticSearch7Client.setPassword(config.getPassword()); elasticSearch7Client.setTrustStorePass(config.getTrustStorePass()); elasticSearch7Client.connect(); } }, config.getSecretsManagementFile(), config.getTrustStorePass()); /** * By leveraging the sync update check feature when startup. */ monitor.start(); }
//初始化客户端,包括es集群节点、es协议以及信任的存储路径 elasticSearch7Client = new ElasticSearch7Client( config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config .getTrustStorePass(), config.getUser(), config.getPassword(), indexNameConverters(config.getNameSpace()) );
//注册各类DAO客户端,完成基于DAO插件模块的设计的初始化 this.registerServiceImplementation( IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests() )); this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearch7Client)); this.registerServiceImplementation( INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO( elasticSearch7Client, config.getResultWindowMaxSize() )); this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation( ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize())); this.registerServiceImplementation( IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize())); this.registerServiceImplementation( IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client)); this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation( IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO( elasticSearch7Client, config.getProfileTaskQueryMaxSize() )); this.registerServiceImplementation( UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearch7Client));}
  • start()方法

@Overridepublic void start() throws ModuleStartException { MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); //开启健康检查 elasticSearch7Client.registerChecker(healthChecker); try { //开启链接 elasticSearch7Client.connect();  //完成es在OAP端的安装(由于要区分es6和es7),因此就作了这么一个模块 StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); }}


源码分析-分布式链路追踪:Skywalking 往期文章:

Skywalking、SpringCloudGateway以及SpringWebFlux如何融合


源码分析Nacos往期文章:

Nacos源码分析系列之Naming模块-集群篇-初级版

Nacos源码分析系列之Naming模块-集群篇-理论概念

Nacos源码分析系列之Naming模块-如何运行篇

Nacos源码分析系列之总体分层架构


欢迎加入做者的知识星球,里面会有不少精彩的内容:


欢迎加我的微信号,交流技术:


做者近期文章运营计划:

  • 已经开通gitchat帐户,并持续输出了不少高质量的文章

  • 我的微信公众号-架构师玄学之路已经和开源中国原创计划绑定,会实时推送到开源中国

  • 已经开通infoq平台帐号,而且会持续输出关于架构师的经典文章

  • 开通知识星球,开始试运行,欢迎你们来关注


gitchat精彩文章列表:

  • 从高性能、高可用及高并发角度剖析 RocketMQ

  • 一篇文章就能搞定基础面试:Java 并发包(JUC)及应用场景,助你能够反撸面试官

  • 调侃面试官,分布式选举算法 Raft 在 Nacos 中的应用

  • 你所不知道的 RocketMQ 的集群管理:副本机制

  • 分布式链路追踪:Skywalking 底层存储模型设计

  • 分布式链路追踪:Skywalking 的链路模型设计

  • 分布式链路追踪:Skywalking 探针模型设计

  • 分布式链路追踪:集群管理设计

  • 分布式链路追踪 SkyWalking:配置管理设计

  • 分布式链路追踪 Skywalking:底层通讯设计

  • 分布式链路追踪 Skywalking:告警和度量架构设计

  • 分布式链路追踪 Skywalking:插件化和模块化架构设计

  • SkyWalking 分布式链路追踪:最新 Kafka 通讯模型设计

  • 分布式链路追踪:Spring-Cloud-Sleuth 探针模型设计

  • Spring Cloud Sleuth:分布式链路追踪之通讯模型设计


做者-游侠,一名对技术、架构、业务和管理如何融合,并孜孜不倦的高级码农。

本文分享自微信公众号 - 架构师玄学之路(andy_aty)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索