学习对象:https://github.com/tmrts/go-p...。这个repo使用go语言实现了一些设计模式,包括经常使用的Builder模式,Singleton模式等,也有列举出还未用go实现的模式,如Bridge模式等。java
本文并不是完整地介绍和解析这个repo里的每一行代码,只对我的认为值得学习和记录的地方进行说明,阅读过repo代码后再阅读本文比较合适。git
这个模式是一种优雅地设置对象初始化参数的方式。考虑的点是:github
比较如下几种初始化对象参数的方法:设计模式
//name是必填参数, timeout和maxConn是选填参数,若是不填则设置为默认值 // pattern #1 func NewServer(name string, timeout time.Duration, maxConn uint) (*Server, error) {...} // 这种方法最直观, 但也是最不合适的, 由于对于扩展参数须要修改函数签名, 且默认值须要经过文档获知 // pattern #2 type ServerConf struct { Timeout time.Duration MaxConn uint } func NewServer(name string, conf ServerConf) (*Server, error) {...} // 1) func NewServer(name string, conf *ServerConf) (*Server, error) {...} // 2) func NewServer(name string, conf ...ServerConf) (*Server, error) {...} // 3) // 改进: 使用了参数结构体, 增长参数不须要修改函数签名 // 1) conf如今是必传, 实际上里面的是选填参数 // 2) 避免nil; conf可能在外部被改变. // 3) 都使用默认值的时候能够不传, 但多个conf可能在配置上有冲突 // conf的默认空值对于Server多是有意义的. // pattern #3: Functional Options type ConfSetter func(srv *Server) error func ServerTimeoutSetter(t time.Duration) ConfSetter { return func(srv *Server) error { srv.timeout = t return nil } } func ServerMaxConnSetter(m uint) ConfSetter { return func(srv *Server) error { srv.maxConn = m return nil } } func NewServer(name string, setter ...ConfSetter) (*Server, error) { srv := new(Server) ... for _, s := range setter { err := s(srv) } ... } // srv, err := NewServer("name", ServerTimeoutSetter(time.Second)) // 使用闭包做为配置参数. 若是不须要配置选填参数, 只须要填参数name.
上面的pattern#2尝试了三种方法来优化初始化参数的问题,但每种方法都有本身的不足之处。pattern#3,也就是Functional Options
,经过使用闭包来作优化,从使用者的角度来看,已是足够简洁和明确了。固然,代价是初次理解这种写法有点绕,不如前两种写法来得直白。trade offapi
欲言又止稍加思考,容易提出这个问题:这跟Builder模式有什么区别呢?我的认为,Functional Options模式本质上就是Builder模式:经过函数来设置参数。闭包
参考文章:Functional options for friendly APIs并发
熔断模式:若是服务在一段时间内不可用,这时候服务要考虑主动拒绝请求(减轻服务方压力和请求方的资源占用)。等待一段时间后(尝试等待服务变为可用),服务尝试接收部分请求(一会儿涌入过多请求可能致使服务再次不可用),若是请求都成功了,再正常接收全部请求。函数
// 极其精简的版本, repo中版本详尽一些 type Circuit func() error // Counter 的实现应该是一个状态机 type Counter interface { OverFailureThreshold() UpdateFailure() UpdateSuccess() } var cnt Counter func Breaker(c Circuit) Circuit { return func() { if cnt.OverFailureThreshold() { return fmt.Errorf("主动拒绝") } if err := c(); err != nil { cnt.UpdateFailure() return err } cnt.UpdateSuccess() return nil } }
熔断模式更像是中间件而不是设计模式:熔断器是一个抽象的概念而不是具体的代码实现;另外,若是要实现一个实际可用的熔断器,要考虑的方面仍是比较多的。举些例子:须要提供手动配置熔断器的接口,避免出现不可控的请求状况;什么类型的错误熔断器才生效(恶意发送大量无效的请求可能致使熔断器生效),等等。性能
参考文章:Circuit Breaker pattern
参考实现:gobreaker学习
go的标准库中没有实现信号量,repo实现了一个:)
repo实现的实质是使用chan。chan自己已经具有互斥访问的功能,并且能够设定缓冲大小,只要稍加修改就能够看成信号量使用。另外,利用select语法,能够很方便地实现超时的功能。
type Semaphore struct { resource chan struct{} // 编译器会优化struct{}类型, 使得全部struct{}变量都指向同一个内存地址 timeout time.Duration // 用于避免长时间的死锁 } type TimeoutError error func (s *Semaphore) Aquire() TimeoutError { select { // 会从上到下检查是否阻塞 // 若是timeout为0, 且暂时不能得到/解锁资源, 会当即返回超时错误 case: <-s.resource: return nil case: <- time.After(s.timeout): return fmt.Errorf("timeout") } } func (s *Semaphore) Release() TimeoutError { select { // 同Aquire() case: s.resource <- struct{}{}: return nil case: <- time.After(s.timeout): return fmt.Errorf("timeout") } } func NewSemaphore(num uint, timeout time.Duration) (*Semaphore, error) { if num == 0 { return fmt.Errorf("invalid num") //若是是0, 须要先Release才能Aquire. } return &Semaphore{ resource: make(chan strcut{}, num), timeout: timeout, }, nil //其实返回值类型也不影响Semaphore正常工做, 由于chan是引用类型 }
标准库的sync包已经有实现了一个对象池,可是这个对象池接收的类型是 interface{}
(万恶的范型),并且池里的对象若是不被其它内存引用,会被gc回收(同java中弱引用的collection类型相似)。
repo实现的对象池是明确类型的(万恶的范型+1),并且闲置不会被gc回收。但仅仅做为展现说明,repo的实现没有作超时处理。下面的代码尝试加上超时处理。也许对使用者来讲,额外增长处理超时错误的代码比较繁琐,但这是有必要的,除非使用者通读并理解了你的代码。trade off
type Pool struct { pool chan *Object timeout time.Duration } type TimeoutError error func NewPool(total int, timeout time.Duration) *Pool { p := &Pool { pool: make(Pool, total), timeout: timeout, } //pool是引用类型, 因此返回类型能够不是指针 for i := 0; i < total; i++ { p.pool <- new(Object) } return p } func (p *Pool) Aquire() (*Object, TimeoutError) { select { case obj <- p.pool: return obj, nil case <- time.After(timeout): return nil, fmt.Errorf("timeout") } } func (p *Pool) Release(obj *Object) TimeoutError { select { case p.pool <- obj: return nil case <- time.After(timeout): return nil, fmt.Errorf("timeout") } }
解析一下repo里goroutine和chan的使用方式,也不算是设计模式。
Fan-in pattern 主要体现如何使用sync.WaitGroup
同步多个goroutine。思考:这里的实现是若是cs的长度为n, 那个要开n个goroutine, 有没有办法优化为开常数个goroutine?
// 将若干个chan的内容合并到一个chan当中 func Merge(cs ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup wg.Add(len(cs)) // 将send函数在for循环中写成一个不带参数的匿名函数, 看起来会使代码更简洁, // 但实际上全部for循环里的全部goroutine会公用一个c, 代码不能正确实现功能. send := func(c <-chan int) { for n := range c { out <- n } wg.Done() } for _, c := range cs { go send(c) } // 开一个goroutine等待wg, 而后关闭merge的chan, 不阻塞Merge函数 go func() { wg.Wait() close(out) } return out }
Fan-out pattern 将一个主chan的元素循环分发给若干个子chan(分流)。思路比较简单就不贴代码了。思考:reop实现的代码,若是其中一个子chan没有消费元素,那么整个分发流程都会卡住。是否能够优化?
Bounded Parallelism Pattern 比较完整的例子来讲明如何使用goroutine. 面的例子是并发计算目录下文件的md5.
func MD5All(root string) (map[string][md5.Size]byte, error) { //由于byte是定长的, 使用数据更合适, 可读且性能也好一点 done := make(chan struct{}) //用于控制整个流程是否暂停. 其实这里是用context可能会更好. defer close(done) paths, errc := walkFiles(done, root) c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } // 同上, 开goroutine等待全部digester结束 go func() { wg.Wait() close(c) }() m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // 必须放在m处理结束后才检查errc. 不然, 要等待walkFiles结束了才能开始处理m // 相反, 若是errc有信号, c确定已经close了 if err := <-errc; err != nil { return nil, err } return m, nil } func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) // 这里能够适当增长缓冲, 取决于walkFiles快仍是md5.Sum快 errc := make(chan error, 1) //必须有缓冲, 不然死锁. 上面的代码paths close了才检查errc go func() { defer close(paths) // 这里的defer没必要要. defer是运行时的, 有成本. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc } type result struct { path string sum [md5.Size]byte err error } func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { for path := range paths { data, err := ioutil.ReadFile(path) select { // 看md5.Sum先结束仍是done信号先到来 case c <- result{path, md5.Sum(data), err}: case <-done: return } } }