client-go中有不少比较有意思的实现,如定时器,同步机制等,能够做为移植使用。下面就遇到的一些技术讲解,首先看第一个:html
实现了对golang map的key的处理,如计算交集,并集等。实际中可能会遇到须要判断两个map的key是否重合的场景,此时可使用下述方式实现,sets.StringKeySet函数将入参的map的key抽取成一个String类型,这样就可使用String的方法操做keygolang
ps:更多功能参见源码算法
package main import ( "fmt" "k8s.io/apimachinery/pkg/util/sets" ) func main(){ map1 := map[string]int{"aaa":1,"bbb":2,"ccc":3} map2 := map[string]int{"ccc":1,"ddd":2,"eee":3} newmap1 := sets.StringKeySet(map1) newmap2 := sets.StringKeySet(map2) fmt.Println(newmap1.List(),newmap2.List()) fmt.Println(newmap1.HasAny(newmap2.List()...)) //3个点用于把数组打散为单个元素 }
结果:true
有2个方法:api
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
相似C语言线程的互斥锁,用于对数据进行加解锁操做。当数据被加锁后,未得到该锁的程序将没法读取被加锁的数据。从下面例子能够看出在数据被解锁前其余协程没法对该数据进行读写操做。数组
ps: read data的数据也可能为“data”缓存
package main import ( "fmt" "sync" ) type LockTest struct { l sync.Mutex data string } func main(){ lockTest := LockTest{sync.Mutex{},"data"} go func() { lockTest.l.Lock() fmt.Println("sleep begin") time.Sleep(time.Second*2) fmt.Println("sleep end") lockTest.l.Unlock() }() time.Sleep(time.Second*1) go func() { lockTest.l.Lock() fmt.Println("read data:",lockTest.data) lockTest.l.Unlock() }() go func() { lockTest.l.Lock() fmt.Println("write data begin") lockTest.data="new data" fmt.Println("write data end") lockTest.l.Unlock() }() time.Sleep(time.Second*5) } 结果: sleep begin sleep end write data begin write data end read data: new data
读写锁,含4个方法,前2个为读锁,后2个为写锁,使用时要一一对应。写锁会阻塞读写操做,读锁不会阻塞写操做,读锁能够有多个,读锁之间不会相互阻塞,适用于读多写少的场景。所以若是单纯使用RWMutex.Lock/RWMutex.UnLock与使用Mutex.Lock/Mutex.UnLock效果相同网络
func (rw *RWMutex) RLock() func (rw *RWMutex) RUnlock() func (rw *RWMutex) Lock() func (rw *RWMutex) Unlock()
读写锁通常是读锁和写锁结合使用的。在有写锁的时候,读锁会被阻塞,等待写锁释放后才能进行读操做。app
ps:写锁内部仅能对共享资源进行读操做,若是执行写操做会致使数据异常。sync.Mutex和sync.RWMutex通常都是内置在结构体中使用,用于保护本结构体的数据函数
package main import ( "fmt" "sync" ) type LockTest struct { l sync.RWMutex data string } func main(){ lockTest := LockTest{sync.RWMutex{},"data"} go func() { lockTest.l.Lock() fmt.Println("write data begin") lockTest.data="new data" time.Sleep(time.Second*3) fmt.Println("write data end") lockTest.l.Unlock() }() time.Sleep(time.Second*1) go func() { lockTest.l.RLock() //阻塞等待写锁释放 fmt.Println("read begin") fmt.Println("read data:",lockTest.data) fmt.Println("read begin") lockTest.l.RUnlock() }() time.Sleep(time.Second*5) }
结果:
write data begin write data end read begin read data: new data read begin
sync.Cond用于条件等待,在知足某些条件时程序才能继续执行。它包含以下3个方法:Wait()会挂起其所在的协程等待Signal()或Broadcast()的唤醒。测试
func (c *Cond) Wait() func (c *Cond) Signal() func (c *Cond) Broadcast()
官方推荐的典型用法以下。因为唤醒协程并不意味着条件已就绪,所以在唤醒后须要检测是否本协程的条件已经知足。
c.L.Lock() for !condition() { c.Wait() } ... make use of condition ... c.L.Unlock()
使用Signal()唤醒的方式以下,Signal()用于当次唤醒一个协程。若是注释掉下例中的Signal(),那么两个协程会一直Wait(),并不会继续执行。
package main import ( "fmt" "sync" ) func main(){ l := sync.Mutex{} c := sync.NewCond(&l) condition1 := false condition2 := false
go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }()
time.Sleep(time.Second*1) fmt.Println("signal-1") condition1=true c.Signal() time.Sleep(time.Second*1) fmt.Println("signal-2") condition2=true c.Signal() time.Sleep(time.Second*10) } 结果: signal-1 condition1=true,run1 signal-2 condition2=true,run2
使用Signal()唤醒协程时须要注意,在多个协程等待时,该函数并无指定须要唤醒哪个协程。下面程序的输出可能为“condition1=true,run1”也可能为“condition2=true,run2”。所以Signal通常适用于仅有一个协程等待的状况,不然可能形成混乱。
package main import ( "fmt" "sync" ) func main(){ l := sync.Mutex{} c := sync.NewCond(&l) condition1 := false condition2 := false go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }() time.Sleep(time.Second*1) condition1=true condition2=true c.Signal() time.Sleep(time.Second*10) }
Broadcast()比较简单,即唤醒全部等待的协程
package main import ( "fmt" "sync" ) func main(){ l := sync.Mutex{} c := sync.NewCond(&l) condition1 := false condition2 := false go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }() time.Sleep(time.Second*1) condition1=true condition2=true c.Broadcast() time.Sleep(time.Second*10) } 结果: condition1=true,run1 condition2=true,run2
sync.waitgroup有以下3个方法,Add(delta int)入参表示须要等待的协程的个数,如2表示须要等待2个协程完成;Done()表示该协程结束;Wait()用于阻塞主协程,等待全部协程结束后释放。
func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait()
举例以下,启动10个协程,Wait()会阻塞,直到全部的协程执行Done()。
ps: Add(delta int)函数的入参很重要,入参大于实际须要等待的协程会致使主协程一致阻塞,小于须要等待的协程会致使某些协程提早退出
import ( "fmt" "sync" ) func main(){ wg := sync.WaitGroup{} wg.Add(10) for i := 0; i < 10; i++ { go func(i int) { defer wg.Done() fmt.Print(i, " ") }(i) } wg.Wait() } 结果: 9 4 0 1 2 3 6 5 7 8
下例中使用chan实现主协程控制write,并使用write控制read。协程关闭使用close()函数
ps:使用chan进行协程同步通常将chan做为入参传入,或在函数内部实现协程间的同步。为方便验证,下面例子将全部chan做为全局变量
package main import ( "fmt" "sync" ) var speakCh = make(chan string) var stopReadChan = make(chan struct{}) var stopWriteChan = make(chan struct{}) func readChan(stopCh <-chan struct{}){ for { select { case words := <- speakCh: fmt.Println("received:",words) case <- stopCh: fmt.Println("stop read!") return } } } func writeChan(stopCh <-chan struct{}){ for { select { case <- stopCh: fmt.Println("stop write!") close(stopReadChan) return default: } speakCh <- "hi" time.Sleep(time.Second*2) } } func main(){ go readChan(stopReadChan) go writeChan(stopWriteChan) time.Sleep(time.Second*6) close(stopWriteChan) time.Sleep(time.Second*6) } 结果: received: hi received: hi received: hi stop write! stop read!
context用于对协程进行管理,如主动退出协程,超时退出协程等,能够看做是使用chan管理协程的扩展。在使用时首先建立一个context,使用cancel()能够取消context,并使用Done()返回的chan管理协程。
官方推荐的用法以下:
func Stream(ctx context.Context, out chan<- Value) error { for { v, err := DoSomething(ctx) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case out <- v: } } }
下例中使用context.WithCancel建立一个context,使用cancel()给这一组context发送信号,在协程中使用Done()处理退出事件。
package main import ( "fmt" "context" ) func main(){ ctx,cancel := context.WithCancel(context.Background()) go testCtx(ctx,"ctx1") go testCtx(ctx,"ctx2") go testCtx(ctx,"ctx3") time.Sleep(time.Second*3) cancel() time.Sleep(time.Second*5) } func testCtx(ctx context.Context, name string) error{ for { select { case <-ctx.Done(): fmt.Println("ctx.Done:",name) return ctx.err() default: fmt.Println("default:",name) time.Sleep(time.Second*2) } } } 结果: default: ctx1 default: ctx3 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx1 ctx.Done: ctx3 ctx.Done: ctx2
建立context的方式以下,其他三个能够看做是WithCancel的扩展
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) //须要主动取消context func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) //在deadline时间点后取消context func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //在超时后取消context func WithValue(parent Context, key, val interface{}) Context
再看一个WithTimeout的例子,下面设置context的超时时间为3s且没有主动cancel(),3s超时后能够看到该context对应的协程正常退出
func main(){ ctx,_ := context.WithTimeout(context.Background(),time.Second*3) go testCtx(ctx,"ctx1") go testCtx(ctx,"ctx2") go testCtx(ctx,"ctx3") time.Sleep(time.Second*5) } 结果: default: ctx3 default: ctx1 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx3 ctx.Done: ctx2 ctx.Done: ctx1
context能够看做是一个树,当cancel一个context时,会同时cancle它的子context。下面首先建立一个ctx,而后在此ctx下面建立一个subctx。当执行cancle() ctx时会同时cancel() 该的subctx。
context.Background()就是已经实现的首个context。
func main(){ ctx,cancel := context.WithCancel(context.Background()) subctx,_ := context.WithCancel(ctx) go testCtx(ctx,"ctx1") go testCtx(subctx,"subctx1") go testCtx(subctx,"subctx2") time.Sleep(time.Second*3) canclel() time.Sleep(time.Second*10) } 结果: default: subctx2 default: ctx1 default: subctx1 default: subctx2 default: ctx1 default: subctx1 timeout ctx.Done: ctx1 ctx.Done: subctx1 ctx.Done: subctx2
下例中仅cancel() subctx,能够看到并无影响subctx的parent。
func main(){ ctx, _:= context.WithCancel(context.Background()) subctx,subcancel := context.WithCancel(ctx) go testCtx(ctx,"ctx1") go testCtx(subctx,"subctx1") go testCtx(subctx,"subctx2") time.Sleep(time.Second*3) subcancel() time.Sleep(time.Second*10) } 结果: default: subctx1 default: subctx2 default: ctx1 default: ctx1 default: subctx1 default: subctx2 timeout ctx.Done: subctx2 default: ctx1 ctx.Done: subctx1 default: ctx1 default: ctx1 default: ctx1 default: ctx1
client-go中的wait.Group创造性地将sync.WaitGroup与chan和ctx结合,实现了协程间同步和等待所有Group中的协程结束的功能。因为StartWithChannel和StartWithContext的入参函数类型比较固定,所以使用上并不通用,但能够做为参考,在实际中扩展使用。下例中给出了简单用法。
func (g *Group) Wait() func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))
func main(){ f1:= func(ctx context.Context) { for { select { case <- ctx.Done(): return default: fmt.Println("hi11") time.Sleep(time.Second) } } } wg := wait.Group{} ctx, cancel := context.WithCancel(context.Background()) wg.StartWithContext(ctx,f1) time.Sleep(time.Second*3) cancel() wg.Wait() } 结果: hi hi hi
首先看一下通常使用的定时器,client-go中比较复杂的定时器也是在此基础上封装的。下面例子中给出的是ticker定时器,它会按照必定的时间频率往Ticker.C中发time.Time类型的数据,能够在协程中经过判断Ticker.C来执行定时任务。下例来自官方,实现每秒执行一次打印,
import ( "fmt" "time" ) func main(){ ticker := time.NewTicker(time.Second) defer ticker.Stop() done := make(chan bool) go func() { time.Sleep(10 * time.Second) done <- true }() for { select { case <-done: fmt.Println("Done!") return case t := <-ticker.C: fmt.Println("Current time: ", t) } } } 结果: Current time: 2019-07-04 14:30:37.9088968 +0800 CST m=+5.328291301 Current time: 2019-07-04 14:30:38.9089349 +0800 CST m=+6.328328801 Current time: 2019-07-04 14:30:39.9101415 +0800 CST m=+7.329534901 Current time: 2019-07-04 14:30:40.9095174 +0800 CST m=+8.328910201 Current time: 2019-07-04 14:30:41.9092961 +0800 CST m=+9.328688301 Current time: 2019-07-04 14:30:42.9087682 +0800 CST m=+10.328159801 Current time: 2019-07-04 14:30:43.9088604 +0800 CST m=+11.328251401 Current time: 2019-07-04 14:30:44.909609 +0800 CST m=+12.328999501 Current time: 2019-07-04 14:30:45.9094782 +0800 CST m=+13.328868101 Current time: 2019-07-04 14:30:46.909006 +0800 CST m=+14.328395401 Done!
须要注意的是使用ticker并不能保证程序被精确性调度,若是程序的执行时间大于ticker的调度周期,那么程序的触发周期会发生误差(可能因为系统cpu占用太高,网络延迟等缘由)。以下面例子中,ticker触发周期为1s,但程序执行大于2s,此时会出现程序执行频率不一致的状况。适用于周期性触发一个任务。
func main(){ ticker := time.NewTicker(time.Second) defer ticker.Stop() done := make(chan bool) go func() { time.Sleep(10 * time.Second) done <- true }() for { select { case <-done: fmt.Println("Done!") return case t := <-ticker.C: time.Sleep(time.Second*2) fmt.Println("Current time: ", t) } } } 结果: Current time: 2019-07-04 14:56:52.5446526 +0800 CST m=+5.281916601 Current time: 2019-07-04 14:56:53.5452488 +0800 CST m=+6.282512201 //和上一条相差1s,但和下一条相差2s Current time: 2019-07-04 14:56:55.5443528 +0800 CST m=+8.281615101 Current time: 2019-07-04 14:56:57.5449183 +0800 CST m=+10.282179401 Current time: 2019-07-04 14:56:59.5448671 +0800 CST m=+12.282127101 Done!
timer的机制和ticker相同,在定时器超时后往一个chan中发送time.Time数据。不一样的是ticker能够周期性调度,timer只会执行一次,若是须要重复调度,须要使用Reset函数重置timer。利用该机制,能够在同一个timer上以不一样间隔调度程序。
func main(){ timer := time.NewTimer(time.Second) defer timer.Stop() t := <-timer.C fmt.Println("Current time: ", t) timer.Reset(time.Second*2) t = <-timer.C fmt.Println("Current time: ", t) timer.Reset(time.Second*3) t = <-timer.C fmt.Println("Current time: ", t) } 结果: Current time: 2019-07-04 15:47:01.7518201 +0800 CST m=+5.312710501 Current time: 2019-07-04 15:47:03.7766692 +0800 CST m=+7.337558501 Current time: 2019-07-04 15:47:06.7770913 +0800 CST m=+10.337978901
使用timer须要注意Reset函数只能在timer超时后使用,不然将无效。由于Timer.C的长度只有1,若是前面一个定时器结束前执行了Reset,那么前面的定时器会被取消。具体能够参见这里
func NewTimer(d Duration) *Timer { c := make(chan Time, 1) ... }
下面例子中能够看出,屡次执行Reset并不会屡次触发定时任务,在前一个定时器超时前执行Reset,会取消前一个定时器并以Reset中的duration开始计时。
func main(){ fmt.Println("now time: "time.Now()) timer := time.NewTimer(time.Second*5) defer timer.Stop() timer.Reset(time.Second*2) timer.Reset(time.Second*2) timer.Reset(time.Second*2) go func() { for ; ; { select { case t:=<- timer.C: fmt.Println("Current time: ", t) } } }() time.Sleep(time.Second*10) } 结果: now time: 2019-07-04 16:16:31.7246084 +0800 CST m=+4.281414201 Current time: 2019-07-04 16:16:33.7505395 +0800 CST m=+6.307344201
官方推荐的用法以下,因为没有加锁,此方法不能在多个协程中同时使用。
if !t.Stop() { <-t.C }
t.Reset(d)
更多timer的用法能够参见官方文档
func Forever(f func(), period time.Duration) func Until(f func(), period time.Duration, stopCh <-chan struct{}) func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration)
Until函数每period会调度f函数,若是stopCh中有中止信号,则退出。当程序运行时间超过period时,也不会退出调度循环,该特性和Ticker相同。底层使用Timer实现。
Until和NonSlidingUntil为一对,UntilWithContext和NonSlidingUntilWithContext为一对,区别只是定时器启动时间点不一样,能够简单用下图表示。能够看到带“NonSliding”前缀的函数。
这两种(带“NonSliding”前缀的)函数在处理正常程序时没有什么区别,但在一些场景下会有不一样的地方。下面例子中使用wait.NonSlidingUntil处理的程序中sleep了2s,这能够表示程序由于某种缘由致使超出正常处理时间。此时能够看到结果中的“num 1”和“num 2”是同时调用的
func main(){ first := true num := 0 stopCh:=make(chan struct{} ) go func() { time.Sleep(time.Second*10) close(stopCh) fmt.Println("done") }() go wait.NonSlidingUntil(func(){ if true == first{ time.Sleep(time.Second*2) first=false }
num = num + 1 fmt.Println("num:",num,"time",time.Now()) },time.Second*1,stopCh) time.Sleep(time.Second*100) } 结果: num: 1 time 2019-07-04 21:05:59.5298524 +0800 CST m=+26.277103101 num: 2 time 2019-07-04 21:05:59.554999 +0800 CST m=+26.302249701 num: 3 time 2019-07-04 21:06:00.5559679 +0800 CST m=+27.303218601 num: 4 time 2019-07-04 21:06:01.5566608 +0800 CST m=+28.303911501
将上述程序的wait.NonSlidingUntil替换为wait.Until,获得以下结果,能够看到首次(异常)和第二次(正常)的间隔正好是wait.Until中设置的调度周期,即1s。
ps:大部分场景下二者使用上并无什么不一样,毕竟正常状况下程序运行时间必然小于程序调度周期。若是须要在程序处理延时的状况下尽快进行下一次调度,则选择带”NonSliding“前缀的函数
结果:
num: 1 time 2019-07-04 21:09:14.9643889 +0800 CST m=+2.010865201 num: 2 time 2019-07-04 21:09:15.9935285 +0800 CST m=+3.040004801 num: 3 time 2019-07-04 21:09:16.9956846 +0800 CST m=+4.042160901
该函数比较简单,就是取消了用于控制Until中止的stopCh。以永远不中止的方式周期性执行f函数
ExponentialBackoff能够实如今函数执行错误后实现以指数退避方式的延时重试。ExponentialBackoff内部使用的是time.Sleep
ExponentialBackoff的首个入参Backoff以下:
type Backoff struct { // The initial duration. Duration time.Duration // Duration is multiplied by factor each iteration. Must be greater // than or equal to zero. Factor float64 // The amount of jitter applied each iteration. Jitter is applied after // cap. Jitter float64 // The number of steps before duration stops changing. If zero, initial // duration is always used. Used for exponential backoff in combination // with Factor. Steps int // The returned duration will never be greater than cap *before* jitter // is applied. The actual maximum cap is `cap * (1.0 + jitter)`. Cap time.Duration }
第二个参数ConditionFunc表示运行的函数,返回的bool值表示该函数是否执行成功,若是执行成功则会退出指数退避
type ConditionFunc func() (done bool, err error)
下面作几组测试:
=> 当Factor和Jitter都为0时,能够看到调度周期是相同的,即Duration的值(1s)。
import ( "fmt" "k8s.io/apimachinery/pkg/util/wait" "time" ) func main(){ var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 0, Jitter: 0, } fmt.Println(wait.ExponentialBackoff(DefaultRetry,func() (bool, error){ fmt.Println(time.Now()) return false,nil })) } 结果: 2019-07-05 10:17:33.9610108 +0800 CST m=+0.079831101 2019-07-05 10:17:34.961132 +0800 CST m=+1.079952301 2019-07-05 10:17:35.961512 +0800 CST m=+2.080332301 2019-07-05 10:17:36.9625144 +0800 CST m=+3.081334701 2019-07-05 10:17:37.9636334 +0800 CST m=+4.082453701 timed out waiting for the condition
=> 先看Jitter对duration的影响,Jitter(duration, b.Jitter)的计算方式以下,若是入参的Factor为0,而Jitter非0,则将Factor调整为1。rand.Float64()为[0.0,1.0)的伪随机数。
将Jitter调整为0.5,根据下面计算方式预期duration为[1s,1.5s)。运行程序得出以下结果,观察能够发现,duration大概是1.4s
if maxFactor <= 0.0 {
maxFactor = 1.0 } wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 0, Jitter: 0.5, } 结果: 2019-07-05 10:21:49.5993445 +0800 CST m=+2.382669101 2019-07-05 10:21:50.9026701 +0800 CST m=+3.685994701 2019-07-05 10:21:52.3759019 +0800 CST m=+5.159226401 2019-07-05 10:21:53.7086265 +0800 CST m=+6.491951001 2019-07-05 10:21:54.9283913 +0800 CST m=+7.711715901 timed out waiting for the condition
=> Factor非0且Jitter为0时,对duration的调整以下
if b.Factor != 0 { b.Duration = time.Duration(float64(b.Duration) * b.Factor) if b.Cap > 0 && b.Duration > b.Cap { b.Duration = b.Cap b.Steps = 0 } }
从公式中能够得出,Factor对程序执行的延的影响以下,能够看到Factor为1时并无什么做用
duration(1) = duration
duration(2) = Factor * duration(1) duration(3) = Factor * duration(2) ... duration(n) = Factor * duration(n-1)
Factor为1时,能够看到函数执行间隔均为1s
var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 1, Jitter: 0, } 结果: 2019-07-05 10:28:50.8481017 +0800 CST m=+2.363983901 2019-07-05 10:28:51.8482274 +0800 CST m=+3.364109601 2019-07-05 10:28:52.8482359 +0800 CST m=+4.364118201 2019-07-05 10:28:53.848687 +0800 CST m=+5.364569301 2019-07-05 10:28:54.849409 +0800 CST m=+6.365291201 timed out waiting for the condition
调整Factor为3,预期延时时间为1s,3s,9s,27s,从测试结果看与预期相符
var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 3, Jitter: 0, } 结果: 2019-07-05 10:35:06.9030165 +0800 CST m=+0.077746101 2019-07-05 10:35:07.9038392 +0800 CST m=+1.078568701 2019-07-05 10:35:10.9038733 +0800 CST m=+4.078602901 2019-07-05 10:35:19.9042141 +0800 CST m=+13.078943601 2019-07-05 10:35:46.904647 +0800 CST m=+40.079376501 timed out waiting for the condition
=> 当Factor和Jitter非0时的延迟计算方式以下:
save_duration(0) = duration duration(1) = Jitter(save_duration(0) , b.Jitter) save_duration(1) = Factor * save_duration(0) duration(2) = Jitter(save_duration(1), b.Jitter) save_duration(2) = Factor * save_duration(1) duration(3) = Jitter(save_duration(2), b.Jitter) save_duration = Factor * save_duration(2) ... duration(n) = Jitter(save_duration(n-1), b.Jitter)
设置Backoff参数以下,按照上述公式得出的指望延时为[1,1.1),[3,3.3), [9,9.9), [27,29.7)。实际运行以下,小数点一位后四舍五入得出实际延时为1.1, 3.3, 9.6, 28.2,与预期相符。
var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 3, Jitter: 0.1, } 结果: 2019-07-05 11:42:54.8779046 +0800 CST m=+0.135740401 2019-07-05 11:42:55.9399737 +0800 CST m=+1.197782901 2019-07-05 11:42:59.2240904 +0800 CST m=+4.481817401 2019-07-05 11:43:08.8232438 +0800 CST m=+14.080730501 2019-07-05 11:43:37.0058953 +0800 CST m=+42.262752301 timed out waiting for the condition
=> 最后看下Backoff.Cap的影响。设置Cap为10s,预期会比上面不带Cap的少执行2次(不带Cap限制的在Step为0时还会执行一次)。实际执行上也是如此
var DefaultRetry = wait.Backoff{ Steps: 5, Duration: 1 * time.Second, Factor: 3, Jitter: 0.1, Cap: time.Second*10, } 结果: 2019-07-05 12:02:43.8678742 +0800 CST m=+0.120673901 2019-07-05 12:02:44.9294079 +0800 CST m=+1.182202101 2019-07-05 12:02:48.2125558 +0800 CST m=+4.465333301
ExponentialBackoff借鉴了TCP协议的指数退避算法,适用于可能会产生资源竞争的场景。指数退避能够有效地在没有缓存处理场景下减少服务端的压力。
func Poll(interval, timeout time.Duration, condition ConditionFunc) error func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error func PollInfinite(interval time.Duration, condition ConditionFunc) error func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
Poll表示以interval的周期执行condition函数,直到timeout超时或condition返回true/err非空。
wait.Poll和wait.Until使用上仍是有些相似的,区别在于一个使用timeout限制超时时间,一个使用chan提供主动中止调度。
import ( "fmt" "k8s.io/apimachinery/pkg/util/wait" "time" ) func main(){ wait.Poll(time.Second, time.Second*5, func() (done bool, err error) { fmt.Println(time.Now()) return false,nil }) 结果: 2019-07-05 13:43:31.2622405 +0800 CST m=+1.069324901 2019-07-05 13:43:32.2619663 +0800 CST m=+2.069050701 2019-07-05 13:43:33.2626114 +0800 CST m=+3.069695801 2019-07-05 13:43:34.2626876 +0800 CST m=+4.069772001 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201
PollInfinite相比Poll取消了timeout的限制。
PollUntil相比Until来讲,PollUntil在condition函数返回true或error的时候会退出调度。
Poll和PollImmediate为一组,PollInfinite和PollImmediateInfinite为一组,PollUntil和PollImmediateUntil为一组,它们的细微差异在于前者在执行condition函数前会等待interval时间,后者则会首先运行condition函数,而后再检查是否须要等待(condition返回true或err非空时不会再等待)。若是不关注这点差别,用哪一个均可以。
实现heap须要实现下面Interface接口,heap使用队列实现了一个彻底二叉树
// heap.Interface type Interface interface { sort.Interface Push(x interface{}) // add x as element Len() Pop() interface{} // remove and return element Len() - 1. } // sort.Interface type Interface interface { // Len is the number of elements in the collection. Len() int // Less reports whether the element with // index i should sort before the element with index j. Less(i, j int) bool // Swap swaps the elements with indexes i and j. Swap(i, j int) }
heap对外提供的方法为以下:
func Init(h Interface) func Push(h Interface, x interface{}) func Pop(h Interface) interface{} func Remove(h Interface, i int) interface{} func Fix(h Interface, i int) // 当修改完队列中的index=i的元素后,从新排序
例子以下:
import ( "container/heap" "fmt" ) func GetAllHeapItems(t Heap_t,name string){ items := []interface{}{} for t.Len() != 0{ items = append(items, heap.Pop(&t)) } fmt.Println(name,":",items) } type Heap_t []int func (h Heap_t)Len() int{return len(h)} func (h Heap_t)Less(i,j int)bool {return h[i]<h[j]} func (h Heap_t)Swap(i,j int){h[i], h[j] = h[j], h[i]} func (h *Heap_t)Push(x interface{}){*h = append(*h,x.(int))} func (h *Heap_t)Pop() interface{}{ if h.Len() == 0{ return nil } x := (*h)[len(*h)-1] *h = (*h)[0:(len(*h) - 1)] return x } func main(){ h := &Heap_t{4,2,6,80,100,45} //[1 2 4 8 80 45 6 23 56 100] heap.Init(h) GetAllHeapItems(*h,"h") h1 := &Heap_t{4,2,6,80,100,45} heap.Init(h1) h1.Push(3) GetAllHeapItems(*h1,"h1") h2 := &Heap_t{4,2,6,80,100,45} heap.Init(h2) GetAllHeapItems(*h2,"h2") h3 := &Heap_t{4,2,6,80,100,45} heap.Init(h3) (*h3)[2] = 200 fmt.Println(1111,h3) heap.Fix(h3,2) fmt.Println(2222,h3) GetAllHeapItems(*h3,"h3") } 结果: h : [2 4 6 45 80 100] h1 : [2 3 4 6 45 80 100] h2 : [2 4 6 45 80 100] 1111 &[2 4 200 80 100 45] 2222 &[2 4 45 80 100 200] h3 : [2 4 45 80 100 200]
heap的实现比较巧妙,使用队列实现了彻底二叉树,比较适用于查询频繁的场景,原理解析能够参见这里
更多使用和例子参见官方文档
func main(){ ... select{} }
package main import ( "fmt" ) func main(){ type emptyCtx int background := new(emptyCtx) todo := new(emptyCtx) typeSwitch := func (i interface{}) { switch i { case background: fmt.Println("background") case todo: fmt.Println("todo") default: fmt.Println("default") } } typeSwitch(background) } 结果: true
参考:
https://www.flysnow.org/2017/05/12/go-in-action-go-context.html