在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据每每会依赖其余服务,复杂的api更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时通常都比较低,但若是多个服务串行依赖的话那么整个api的耗时将会大大增长。git
那么经过什么手段来优化呢?咱们首先想到的是经过并发来的方式来处理依赖,这样就能下降整个依赖的耗时,Go基础库中为咱们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖若是有一个出错咱们指望能当即返回而不是等全部依赖都执行完再返回结果,并且WaitGroup中对变量的赋值每每须要加锁,每一个依赖函数都须要添加Add和Done对于新手来讲比较容易出错github
基于以上的背景,go-zero框架中为咱们提供了并发处理工具MapReduce,该工具开箱即用,不须要作什么初始化,咱们经过下图看下使用MapReduce和没使用的耗时对比:golang
相同的依赖,串行处理的话须要200ms,使用MapReduce后的耗时等于全部依赖中最大的耗时为100ms,可见MapReduce能够大大下降服务耗时,并且随着依赖的增长效果就会越明显,减小处理耗时的同时并不会增长服务器压力api
MapReduce是Google提出的一个软件架构,用于大规模数据集的并行运算,go-zero中的MapReduce工具正是借鉴了这种架构思想服务器
go-zero框架中的MapReduce工具主要用来对批量数据进行并发的处理,以此来提高服务的性能restful
咱们经过几个示例来演示MapReduce的用法架构
MapReduce主要有三个参数,第一个参数为generate用以生产数据,第二个参数为mapper用以对数据进行处理,第三个参数为reducer用以对mapper后的数据作聚合返回,还能够经过opts选项设置并发处理的线程数量并发
场景一: 某些功能的结果每每须要依赖多个服务,好比商品详情的结果每每会依赖用户服务、库存服务、订单服务等等,通常被依赖的服务都是以rpc的形式对外提供,为了下降依赖的耗时咱们每每须要对依赖作并行处理app
func productDetail(uid, pid int64) (*ProductDetail, error) { var pd ProductDetail err := mr.Finish(func() (err error) { pd.User, err = userRpc.User(uid) return }, func() (err error) { pd.Store, err = storeRpc.Store(pid) return }, func() (err error) { pd.Order, err = orderRpc.Order(pid) return }) if err != nil { log.Printf("product detail error: %v", err) return nil, err } return &pd, nil }
该示例中返回商品详情依赖了多个服务获取数据,所以作并发的依赖处理,对接口的性能有很大的提高框架
场景二: 不少时候咱们须要对一批数据进行处理,好比对一批用户id,效验每一个用户的合法性而且效验过程当中有一个出错就认为效验失败,返回的结果为效验合法的用户id
func checkLegal(uids []int64) ([]int64, error) { r, err := mr.MapReduce(func(source chan<- interface{}) { for _, uid := range uids { source <- uid } }, func(item interface{}, writer mr.Writer, cancel func(error)) { uid := item.(int64) ok, err := check(uid) if err != nil { cancel(err) } if ok { writer.Write(uid) } }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) { var uids []int64 for p := range pipe { uids = append(uids, p.(int64)) } writer.Write(uids) }) if err != nil { log.Printf("check error: %v", err) return nil, err } return r.([]int64), nil } func check(uid int64) (bool, error) { // do something check user legal return true, nil }
该示例中,若是check过程出现错误则经过cancel方法结束效验过程,并返回error整个效验过程结束,若是某个uid效验结果为false则最终结果不返回该uid
MapReduce使用注意事项
实现原理分析:
MapReduce中首先经过buildSource方法经过执行generate(参数为无缓冲channel)产生数据,并返回无缓冲的channel,mapper会从该channel中读取数据
func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{}) go func() { defer close(source) generate(source) }() return source }
在MapReduceWithSource方法中定义了cancel方法,mapper和reducer中均可以调用该方法,调用后主线程收到close信号会立马返回
cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { // 默认的error retErr.Set(ErrCancelWithNil) } drain(source) // 调用close(ouput)主线程收到Done信号,立马返回 finish() })
在mapperDispatcher方法中调用了executeMappers,executeMappers消费buildSource产生的数据,每个item都会起一个goroutine单独处理,默认最大并发数为16,能够经过WithWorkers进行设置
var wg sync.WaitGroup defer func() { wg.Wait() // 保证全部的item都处理完成 close(collector) }() pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) // 将mapper处理完的数据写入collector for { select { case <-done: // 当调用了cancel会触发当即返回 return case pool <- lang.Placeholder: // 控制最大并发数 item, ok := <-input if !ok { <-pool return } wg.Add(1) go func() { defer func() { wg.Done() <-pool }() mapper(item, writer) // 对item进行处理,处理完调用writer.Write把结果写入collector对应的channel中 }() } }
reducer单goroutine对数mapper写入collector的数据进行处理,若是reducer中没有手动调用writer.Write则最终会执行finish方法对output进行close避免死锁
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) }()
在该工具包中还提供了许多针对不一样业务场景的方法,实现原理与MapReduce大同小异,感兴趣的同窗能够查看源码学习
本文主要介绍了go-zero框架中的MapReduce工具,在实际的项目中很是实用。用好工具对于提高服务性能和开发效率都有很大的帮助,但愿本篇文章能给你们带来一些收获。
https://github.com/tal-tech/go-zero
好将来技术