TiKV 源码解析系列文章(四)Prometheus(下)

做者: Breezewishhtml

本文为 TiKV 源码解析系列的第四篇,接上篇继续为你们介绍 rust-prometheus上篇 主要介绍了基础知识以及最基本的几个指标的内部工做机制,本篇会进一步介绍更多高级功能的实现原理。git

与上篇同样,如下内部实现都基于本文发布时最新的 rust-prometheus 0.5 版本代码,目前咱们正在开发 1.0 版本,API 设计上会进行一些简化,实现上出于效率考虑也会和这里讲解的略微有一些出入,所以请读者注意甄别。github

指标向量(Metric Vector)

Metric Vector 用于支持带 Label 的指标。因为各类指标均可以带上 Label,所以 Metric Vector 自己实现为了一种泛型结构体,CounterGaugeHistogram 在这之上实现了 CounterVecGaugeVecHistogramVec。Metric Vector 主要实现位于 src/vec.rs缓存

HistogramVec 为例,调用 HistogramVec::with_label_values 可得到一个 Histogram 实例,而 HistogramVec 定义为:安全

pub type HistogramVec = MetricVec<HistogramVecBuilder>;

pub struct MetricVec<T: MetricVecBuilder> {
   pub(crate) v: Arc<MetricVecCore<T>>,
}

impl<T: MetricVecBuilder> MetricVec<T> {
   pub fn with_label_values(&self, vals: &[&str]) -> T::M {
       self.get_metric_with_label_values(vals).unwrap()
   }
}
复制代码

所以 HistogramVec::with_label_values 的核心逻辑其实在 MetricVecCore::get_metric_with_label_values。这么作的缘由是为了让 MetricVec 是一个线程安全、能够被全局共享但又不会在共享的时候具备很大开销的结构,所以将内部逻辑实如今 MetricVecCore,外层(即在 MetricVec)套一个 Arc<T> 后再提供给用户。进一步能够观察 MetricVecCore 的实现,其核心逻辑以下:多线程

pub trait MetricVecBuilder: Send + Sync + Clone {
   type M: Metric;
   type P: Describer + Sync + Send + Clone;

   fn build(&self, &Self::P, &[&str]) -> Result<Self::M>;
}

pub(crate) struct MetricVecCore<T: MetricVecBuilder> {
   pub children: RwLock<HashMap<u64, T::M>>,
   // Some fields are omitted.
}

impl<T: MetricVecBuilder> MetricVecCore<T> {
   // Some functions are omitted.

   pub fn get_metric_with_label_values(&self, vals: &[&str]) -> Result<T::M> {
       let h = self.hash_label_values(vals)?;

       if let Some(metric) = self.children.read().get(&h).cloned() {
           return Ok(metric);
       }

       self.get_or_create_metric(h, vals)
   }

   pub(crate) fn hash_label_values(&self, vals: &[&str]) -> Result<u64> {
       if vals.len() != self.desc.variable_labels.len() {
           return Err(Error::InconsistentCardinality(
               self.desc.variable_labels.len(),
               vals.len(),
           ));
       }

       let mut h = FnvHasher::default();
       for val in vals {
           h.write(val.as_bytes());
       }

       Ok(h.finish())
   }

   fn get_or_create_metric(&self, hash: u64, label_values: &[&str]) -> Result<T::M> {
       let mut children = self.children.write();
       // Check exist first.
       if let Some(metric) = children.get(&hash).cloned() {
           return Ok(metric);
       }

       let metric = self.new_metric.build(&self.opts, label_values)?;
       children.insert(hash, metric.clone());
       Ok(metric)
   }
}
复制代码

如今看代码就很简单了,它首先会依据全部 Label Values 构造一个 Hash,接下来用这个 Hash 在 RwLock<HashMap<u64, T::M>> 中查找,若是找到了,说明给定的这个 Label Values 以前已经出现过、相应的 Metric 指标结构体已经初始化过,所以直接返回对应的实例;若是不存在,则要利用给定的 MetricVecBuilder 构造新的指标加入哈希表,并返回这个新的指标。并发

由上述代码可见,为了在线程安全的条件下实现 Metric Vector 各个 Label Values 具备独立的时间序列,Metric Vector 内部采用了 RwLock 进行同步,也就是说 with_label_values() 及相似函数内部是具备锁的。这在多线程环境下会有必定的效率影响,不过由于大部分状况下都是读锁,所以影响不大。固然,还能够发现其实给定 Label Values 以后调用 with_label_values() 获得的指标实例是能够被缓存起来的,只访问缓存起来的这个指标实例是不会有任何同步开销的,也绕开了计算哈希值等比较占 CPU 的操做。基于这个思想,就有了 Static Metrics,读者能够在本文的后半部分了解 Static Metrics 的详细状况。async

另外读者也能够发现,Label Values 的取值应当是一个有限的、封闭的小集合,不该该是一个开放的或取值空间很大的集合,由于每个值都会对应一个内存中指标实例,而且不会被释放。例如 HTTP Method 是一个很好的 Label,由于它只多是 GET / POST / PUT / DELETE 等;而 Client Address 则不少状况下并不适合做为 Label,由于它是一个开放的集合,或者有很是巨大的取值空间,若是将它做为 Label 极可能会有容易 OOM 的风险。这个风险在 Prometheus 官方文档中也明确指出了。ide

整型指标(Integer Metric)

在讲解 Counter / Gauge 的实现时咱们提到,rust-prometheus 使用 CAS 操做实现 AtomicF64 中的原子递增和递减,若是改用 atomic fetch-and-add 操做则通常能够取得更高效率。考虑到大部分状况下指标均可以是整数而不须要是小数,例如对于简单的次数计数器来讲它只多是整数,所以 rust-prometheus 额外地提供了整型指标,容许用户自由地选择,针对整数指标状况提供更高的效率。函数

为了加强代码的复用,rust-prometheus 实际上采用了泛型来实现 CounterGauge。经过对不一样的 Atomic(如 AtomicF64AtomicI64)进行泛化,就能够采用同一份代码实现整数的指标和(传统的)浮点数指标。

Atomic trait 定义以下(src/atomic64/mod.rs):

pub trait Atomic: Send + Sync {
   /// The numeric type associated with this atomic.
   type T: Number;
   /// Create a new atomic value.
   fn new(val: Self::T) -> Self;
   /// Set the value to the provided value.
   fn set(&self, val: Self::T);
   /// Get the value.
   fn get(&self) -> Self::T;
   /// Increment the value by a given amount.
   fn inc_by(&self, delta: Self::T);
   /// Decrement the value by a given amount.
   fn dec_by(&self, delta: Self::T);
}
复制代码

原生的 AtomicU64AtomicI64 及咱们自行实现的 AtomicF64 都实现了 Atomic trait。进而,CounterGauge 均可以利用上 Atomic trait:

pub struct Value<P: Atomic> {
   pub val: P,
   // Some fields are omitted.
}

pub struct GenericCounter<P: Atomic> {
   v: Arc<Value<P>>,
}

pub type Counter = GenericCounter<AtomicF64>;
pub type IntCounter = GenericCounter<AtomicI64>;
复制代码

本地指标(Local Metrics)

由前面这些源码解析能够知道,指标内部的实现是原子变量,用于支持线程安全的并发更新,但这在须要频繁更新指标的场景下相比简单地更新本地变量仍然具备显著的开销(大约有 10 倍的差距)。为了进一步优化、支持高效率的指标更新操做,rust-prometheus 提供了 Local Metrics 功能。

rust-prometheus 中 Counter 和 Histogram 指标支持 local() 函数,该函数会返回一个该指标的本地实例。本地实例是一个非线程安全的实例,不能多个线程共享。例如,Histogram::local() 会返回 LocalHistogram。因为 Local Metrics 使用是本地变量,开销极小,所以能够放心地频繁更新 Local Metrics。用户只需按期调用 Local Metrics 的 flush() 函数将其数据按期同步到全局指标便可。通常来讲 Prometheus 收集数据的间隔是 15s 到 1 分钟左右(由用户自行配置),所以即便是以 1s 为间隔进行 flush() 精度也足够了。

普通的全局指标使用流程以下图所示,多个线程直接利用原子操做更新全局指标:

normal_metrics

本地指标使用流程以下图所示,每一个要用到该指标的线程都保存一份本地指标。更新本地指标操做开销很小,能够在频繁的操做中使用。随后,只需再按期将这个本地指标 flush 到全局指标,就能使得指标的更新操做真正生效。

local_metrics

TiKV 中大量运用了本地指标提高性能。例如,TiKV 的线程池通常都提供 Context 变量,Context 中存储了本地指标。线程池上运行的任务都能访问到一个和当前 worker thread 绑定的 Context,所以它们均可以安全地更新 Context 中的这些本地指标。最后,线程池通常提供 tick() 函数,容许以必定间隔触发任务,tick() 中 TiKV 会对这些 Context 中的本地指标进行 flush()

Local Counter

Counter 的本地指标 LocalCounter 实现很简单,它是一个包含了计数器的结构体,该结构体提供了与 Counter 一致的接口方便用户使用。该结构体额外提供了 flush(),将保存的计数器的值做为增量值更新到全局指标:

pub struct GenericLocalCounter<P: Atomic> {
   counter: GenericCounter<P>,
   val: P::T,
}

pub type LocalCounter = GenericLocalCounter<AtomicF64>;
pub type LocalIntCounter = GenericLocalCounter<AtomicI64>;

impl<P: Atomic> GenericLocalCounter<P> {
   // Some functions are omitted.

   pub fn flush(&mut self) {
       if self.val == P::T::from_i64(0) {
           return;
       }
       self.counter.inc_by(self.val);
       self.val = P::T::from_i64(0);
   }
}
复制代码

Local Histogram

因为 Histogram 本质也是对各类计数器进行累加操做,所以 LocalHistogram 的实现也很相似,例如 observe(x) 的实现与 Histogram 一模一样,除了它不是原子操做;flush() 也是将全部值累加到全局指标上去:

pub struct LocalHistogramCore {
   histogram: Histogram,
   counts: Vec<u64>,
   count: u64,
   sum: f64,
}

impl LocalHistogramCore {
   // Some functions are omitted.

   pub fn observe(&mut self, v: f64) {
       // Try find the bucket.
       let mut iter = self
           .histogram
           .core
           .upper_bounds
           .iter()
           .enumerate()
           .filter(|&(_, f)| v <= *f);
       if let Some((i, _)) = iter.next() {
           self.counts[i] += 1;
       }

       self.count += 1;
       self.sum += v;
   }

   pub fn flush(&mut self) {
       // No cached metric, return.
       if self.count == 0 {
           return;
       }
       {
           let h = &self.histogram;
           for (i, v) in self.counts.iter().enumerate() {
               if *v > 0 {
                   h.core.counts[i].inc_by(*v);
               }
           }
           h.core.count.inc_by(self.count);
           h.core.sum.inc_by(self.sum);
       }
       self.clear();
   }
}
复制代码

静态指标(Static Metrics)

以前解释过,对于 Metric Vector 来讲,因为每个 Label Values 取值都是独立的指标实例,所以为了线程安全实现上采用了 HashMap + RwLock。为了提高效率,能够将 with_label_values 访问得到的指标保存下来,之后直接访问。另外使用姿式正确的话,Label Values 取值是一个有限的、肯定的、小的集合,甚至大多数状况下在编译期就知道取值内容(例如 HTTP Method)。综上,咱们能够直接写代码将各类已知的 Label Values 提早保存下来,以后能够以静态的方式访问,这就是静态指标。

以 TiKV 为例,有 Contributor 为 TiKV 提过这个 PR:#2765 server: precreate some labal metrics。这个 PR 改进了 TiKV 中统计各类 gRPC 接口消息次数的指标,因为 gRPC 接口是固定的、已知的,所以能够提早将它们缓存起来:

struct Metrics {
   kv_get: Histogram,
   kv_scan: Histogram,
   kv_prewrite: Histogram,
   kv_commit: Histogram,
   // ...
}

impl Metrics {
   fn new() -> Metrics {
       Metrics {
           kv_get: GRPC_MSG_HISTOGRAM_VEC.with_label_values(&["kv_get"]),
           kv_scan: GRPC_MSG_HISTOGRAM_VEC.with_label_values(&["kv_scan"]),
           kv_prewrite: GRPC_MSG_HISTOGRAM_VEC.with_label_values(&["kv_prewrite"]),
           kv_commit: GRPC_MSG_HISTOGRAM_VEC.with_label_values(&["kv_commit"]),
           // ...
       }
   }
}
复制代码

使用的时候也很简单,直接访问便可:

@@ -102,10 +155,8 @@ fn make_callback<T: Debug + Send + 'static>() -> (Box<FnBox(T) + Send>, oneshot:

impl<T: RaftStoreRouter + 'static> tikvpb_grpc::Tikv for Service<T> {
    fn kv_get(&self, ctx: RpcContext, mut req: GetRequest, sink: UnarySink<GetResponse>) {
- let label = "kv_get";
- let timer = GRPC_MSG_HISTOGRAM_VEC
- .with_label_values(&[label])
- .start_coarse_timer();
+ const LABEL: &str = "kv_get";
+ let timer = self.metrics.kv_get.start_coarse_timer();

        let (cb, future) = make_callback();
        let res = self.storage.async_get(
复制代码

这样一个简单的优化能够为 TiKV 提高 7% 的 Raw Get 效率,能够说是很超值了(主要缘由是 Raw Get 自己开销极小,所以在指标上花费的时间就显得有一些显著了)。但这个优化方案其实还有一些问题:

  1. 代码繁琐,有大量重复的、或知足某些 pattern 的代码;

  2. 若是还有另外一个 Label 维度,那么须要维护的字段数量就会急剧膨胀(由于每一种值的组合都须要分配一个字段)。

为了解决以上两个问题,rust-prometheus 提供了 Static Metric 宏。例如对于刚才的 TiKV 改进 PR #2765 来讲,使用 Static Metric 宏能够简化为:

make_static_metric! {
   pub struct GrpcMsgHistogram: Histogram {
       "type" => {
           kv_get,
           kv_scan,
           kv_prewrite,
           kv_commit,
           // ...
       },
   }
}

let metrics = GrpcMsgHistogram::from(GRPC_MSG_HISTOGRAM_VEC);

// Usage:
metrics.kv_get.start_coarse_timer();
复制代码

能够看到,使用宏以后,须要维护的繁琐的代码量大大减小了。这个宏也能正常地支持多个 Label 同时存在的状况。

限于篇幅,这里就不具体讲解这个宏是如何写的了,感兴趣的同窗能够观看我司同窗最近在 FOSDEM 2019 上的技术分享 视频(进度条 19:54 开始介绍 Static Metrics)和 Slide,里面详细地介绍了如何从零开始写出一个这样的宏(的简化版本)。

相关文章
相关标签/搜索