在作任务开发的时候,大家必定会碰到如下场景:前端
场景1:调用第三方接口的时候, 一个需求你须要调用不一样的接口,作数据组装。
场景2:一个应用首页可能依托于不少服务。那就涉及到在加载页面时须要同时请求多个服务的接口。这一步每每是由后端统一调用组装数据再返回给前端,也就是所谓的 BFF(Backend For Frontend) 层。git
针对以上两种场景,假设在没有强依赖关系下,选择串行调用,那么总耗时即:github
time=s1+s2+....sn
按照当代秒入百万的有为青年,这么长时间早就把你祖宗十八代问候了一遍。后端
为了伟大的KPI,咱们每每会选择并发地调用这些依赖接口。那么总耗时就是:闭包
time=max(s1,s2,s3.....,sn)
固然开始堆业务的时候能够先串行化,等到上面的人着急的时候,亮出绝招。并发
这样,年末 PPT 就能够加上浓重的一笔流水帐:为业务某个接口提升百分之XXX性能,间接产生XXX价值。app
固然这一切的前提是,作老板不懂技术,作技术”懂”你。函数
言归正传,若是修改为并发调用,你可能会这么写,工具
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(2) var userInfo *User var productList []Product go func() { defer wg.Done() userInfo, _ = getUser() }() go func() { defer wg.Done() productList, _ = getProductList() }() wg.Wait() fmt.Printf("用户信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) } /********用户服务**********/ type User struct { Name string Age uint8 } func getUser() (*User, error) { time.Sleep(500 * time.Millisecond) var u User u.Name = "wuqinqiang" u.Age = 18 return &u, nil } /********商品服务**********/ type Product struct { Title string Price uint32 } func getProductList() ([]Product, error) { time.Sleep(400 * time.Millisecond) var list []Product list = append(list, Product{ Title: "SHib", Price: 10, }) return list, nil }
先无论其余问题。从实现上来讲,须要多少服务,你会开多少个 G
,利用 sync.WaitGroup
的特性,
实现并发编排任务的效果。性能
好像,问题不大。
可是随着代号 996
业务场景的增长,你会发现,好多模块都有类似的功能,只是对应的业务场景不一样而已。
那么咱们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。
安排。
本着不重复造轮子的原则,去搜了下开源项目,最终看上了 go-zero
里面的一个工具 mapreduce
。
从文件名咱们能看出来是什么了,能够自行 Google
这个名词。
使用很简单。咱们经过它改造一下上面的代码:
package main import ( "fmt" "github.com/tal-tech/go-zero/core/mr" "time" ) func main() { var userInfo *User var productList []Product _ = mr.Finish(func() (err error) { userInfo, err = getUser() return err }, func() (err error) { productList, err = getProductList() return err }) fmt.Printf("用户信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) }
用户信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]
是否是舒服多了。
可是这里还须要注意一点,假设你调用的其中一个服务错误,而且你 return err
对应的错误,那么其余调用的服务会被取消。
好比咱们修改 getProductList 直接响应错误。
func getProductList() ([]Product, error) { return nil, errors.New("test error") } //打印 用户信息:<nil> 商品信息:[]
那么最终打印的时候连用户信息都会为空,由于出现一个服务错误,用户服务请求被取消了。
通常状况下,在请求服务错误的时候咱们会有保底操做,一个服务错误不能影响其余请求的结果。
因此在使用的时候具体处理取决于业务场景。
既然用了,那么就追下源码吧。
func Finish(fns ...func() error) error { if len(fns) == 0 { return nil } return MapReduceVoid(func(source chan<- interface{}) { for _, fn := range fns { source <- fn } }, func(item interface{}, writer Writer, cancel func(error)) { fn := item.(func() error) if err := fn(); err != nil { cancel(err) } }, func(pipe <-chan interface{}, cancel func(error)) { drain(pipe) }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
对于 MapReduceVoid
函数,主要查看三个闭包参数。
GenerateFunc
用于生产数据。MapperFunc
读取生产出的数据,进行处理。VoidReducerFunc
这里表示不对 mapper
后的数据作聚合返回。因此这个闭包在此操做几乎0做用。func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { source := buildSource(generate) return MapReduceWithSource(source, mapper, reducer, opts...) } func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{})// 建立无缓冲通道 threading.GoSafe(func() { defer close(source) generate(source) //开始生产数据 }) return source //返回无缓冲通道 }
buildSource
函数中,返回一个无缓冲的通道。并开启一个 G
运行 generate(source)
,往无缓冲通道塞数据。 这个generate(source)
不就是一开始 Finish
传递的第一个闭包参数。
return MapReduceVoid(func(source chan<- interface{}) { // 就这个 for _, fn := range fns { source <- fn } })
而后查看 MapReduceWithSource
函数,
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { options := buildOptions(opts...) //任务执行结束通知信号 output := make(chan interface{}) //将mapper处理完的数据写入collector collector := make(chan interface{}, options.workers) // 取消操做信号 done := syncx.NewDoneChan() writer := newGuardedWriter(output, done.Done()) var closeOnce sync.Once var retErr errorx.AtomicError finish := func() { closeOnce.Do(func() { done.Close() close(output) }) } cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { retErr.Set(ErrCancelWithNil) } drain(source) finish() }) go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }() // 真正从生成器通道取数据执行Mapper go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers) value, ok := <-output if err := retErr.Load(); err != nil { return nil, err } else if ok { return value, nil } else { return nil, ErrReduceNoOutput } }
这段代码挺长的,咱们说下核心的点。咱们看到使用一个G
调用 executeMappers
方法。
go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) { var wg sync.WaitGroup defer func() { // 等待全部任务所有执行完毕 wg.Wait() // 关闭通道 close(collector) }() //根据指定数量建立 worker池 pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) for { select { case <-done: return case pool <- lang.Placeholder: // 从buildSource() 返回的无缓冲通道取数据 item, ok := <-input // 当通道关闭,结束 if !ok { <-pool return } wg.Add(1) // better to safely run caller defined method threading.GoSafe(func() { defer func() { wg.Done() <-pool }() //真正运行闭包函数的地方 // func(item interface{}, w Writer) { // mapper(item, w, cancel) // } mapper(item, writer) }) } } }
具体的逻辑已备注,代码很容易懂。
一旦 executeMappers
函数返回,关闭 collector
通道,那么执行 reducer
再也不阻塞。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) //这里 drain(collector) }()
这里的 reducer(collector, writer, cancel)
其实就是从 MapReduceVoid
传递的第三个闭包函数。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) //这里 drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
而后这个闭包函数又执行了 reducer(input, cancel)
,这里的 reducer
就是咱们一开始解释过的 VoidReducerFunc
,从 Finish() 而来
。
等等,看到上面三个地方的 drain(input)
了吗?
// drain drains the channel. func drain(channel <-chan interface{}) { // drain the channel for range channel { } }
其实就是一个排空 channel
的操做,可是三个地方都对同一个 channel
,也是让我费解。
还有更重要的一点。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }()
上面的代码,假如执行 reducer
,writer
写入引起 panic
,那么drain(collector)
会直接卡住。
不过做者已经修复了这个问题,直接把 drain(collector)
放入到 defer
。
具体 issues[1]。
到这里,关于 Finish
的源码也就结束了。感兴趣的能够看看其余源码。
很喜欢 go-zero
里的一些工具,可是每每用的一些工具并不独立,
依赖于其余文件包,致使明明只想使用其中一个工具却须要安装整个包。
因此最终的结果就是扒源码,建立无依赖库工具集,遵循 MIT
便可。