k8s 中定时任务的实现

k8s 中有许多优秀的包均可以在平时的开发中借鉴与使用,好比,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,能够直接引用。本篇文章会介绍 k8s 中定时任务的实现,k8s 中定时任务都是经过 wait 包实现的,wait 包在 k8s 的多个组件中都有用到,如下是 wait 包在 kubelet 中的几处使用:golang

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
        ...
        // kubelet 每5分钟一次从 apiserver 获取证书
        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }

        closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
        if err != nil {
            return err
        }
        ...
}
...
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration,   kubeDeps *kubelet.Dependencies, enableServer bool) {
    // 持续监听 pod 的变化
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)
    ...
}

golang 中能够经过 time.Ticker 实现定时任务的执行,但在 k8s 中用了更原生的方式,使用 time.Timer 实现的。time.Ticker 和 time.Timer 的使用区别以下:编程

  • ticker 只要定义完成,今后刻开始计时,不须要任何其余的操做,每隔固定时间都会自动触发。api

  • timer 定时器是到了固定时间后会执行一次,仅执行一次缓存

  • 若是 timer 定时器要每隔间隔的时间执行,实现 ticker 的效果,使用 func (t *Timer) Reset(d Duration) bool微信

一个示例:函数

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    timer1 := time.NewTimer(2 * time.Second)
    ticker1 := time.NewTicker(2 * time.Second)

    wg.Add(1)
    go func(t *time.Ticker) {
        defer wg.Done()
        for {
            <-t.C
            fmt.Println("exec ticker", time.Now().Format("2006-01-02 15:04:05"))
        }
    }(ticker1)

    wg.Add(1)
    go func(t *time.Timer) {
        defer wg.Done()
        for {
            <-t.C
            fmt.Println("exec timer", time.Now().Format("2006-01-02 15:04:05"))
            t.Reset(2 * time.Second)
        }
    }(timer1)

    wg.Wait()
}

1、wait 包中的核心代码

核心代码(k8s.io/apimachinery/pkg/util/
wait/wait.go):工具

func JitterUntil(f func()period time.DurationjitterFactor float64sliding boolstopCh <-chan struct{}) {
    var t *time.Timer
    var sawTimeout bool

    for {
        select {
        case <-stopCh:
            return
        default:
        }

        jitteredPeriod := period
        if jitterFactor > 0.0 {
            jitteredPeriod = Jitter(period, jitterFactor)
        }

        if !sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
        }

        select {
        case <-stopCh:
            return
        case <-t.C:
            sawTimeout = true
        }
    }
}

...

func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
    if t == nil {
        return time.NewTimer(d)
    }
    if !t.Stop() && !sawTimeout {
        <-t.C
    }
    t.Reset(d)
    return t
}

几个关键点的说明:性能

  • 一、若是 sliding 为 true,则在 f() 运行以后计算周期。若是为 false,那么 period 包含 f() 的执行时间。spa

  • 二、在 golang 中 select 没有优先级选择,为了不额外执行 f(),在每次循环开始后会先判断 stopCh chan。.net

k8s 中 wait 包实际上是对 time.Timer 作了一层封装实现。

2、wait 包经常使用的方法

一、按期执行一个函数,永不中止,可使用 Forever 方法:

func Forever(f func(), period time.Duration)

二、在须要的时候中止循环,那么可使用下面的方法,增长一个用于中止的 chan 便可,方法定义以下:

func Until(f func(), period time.Duration, stopCh <-chan struct{})

上面的第三个参数 stopCh 就是用于退出无限循环的标志,中止的时候咱们 close 掉这个 chan 就能够了。

三、有时候,咱们还会须要在运行前去检查先决条件,在条件知足的时候才去运行某一任务,这时候可使用 Poll 方法:

func Poll(interval, timeout time.Duration, condition ConditionFunc)

这个函数会以 interval 为间隔,不断去检查 condition 条件是否为真,若是为真则能够继续后续处理;若是指定了 timeout 参数,则该函数也能够只常识指定的时间。

四、PollUntil 方法和上面的相似,可是没有 timeout 参数,多了一个 stopCh 参数,以下所示:

PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error

此外还有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。

3、总结

本篇文章主要讲了 k8s 中定时任务的实现与对应包(wait)中方法的使用。经过阅读 k8s 的源代码,能够发现 k8s 中许多功能的实现也都是咱们须要在平时工做中用的,其大部分包的性能都是通过大规模考验的,经过使用其相关的工具包不只能学到大量的编程技巧也能避免本身造轮子。



本文分享自微信公众号 - 田飞雨(kubeConChina)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索