以前有聊过 golang 的协程,我发觉彷佛还很理论,特别是在并发安全上,因此特结合网上的一些例子,来试验下go routine中 的 channel, select, context 的妙用。git
咱们用 gin(一个web框架) 做为处理请求的工具,需求是这样的: 一个请求 X 会去并行调用 A, B, C 三个方法,并把三个方法返回的结果加起来做为 X 请求的 Response。 可是咱们这个 Response 是有时间要求的(不能超过5秒的响应时间),github
可能 A, B, C 中任意一个或两个,处理逻辑十分复杂,或者数据量超大,致使处理时间超出预期, 那么咱们就立刻切断,并返回已经拿到的任意个返回结果之和。golang
咱们先来定义主函数:web
func main() { r := gin.New() r.GET("/calculate", calHandler) http.ListenAndServe(":8008", r) } 复制代码
很是简单,普通的请求接受和 handler 定义。其中 calHandler 是咱们用来处理请求的函数。安全
分别定义三个假的微服务,其中第三个将会是咱们超时的哪位~markdown
func microService1() int { time.Sleep(1*time.Second) return 1 } func microService2() int { time.Sleep(2*time.Second) return 2 } func microService3() int { time.Sleep(10*time.Second) return 3 } 复制代码
接下来,咱们看看 calHandler 里究竟是什么并发
func calHandler(c *gin.Context) { ... c.JSON(http.StatusOK, gin.H{"code":200, "result": sum}) return } 复制代码
一个典型的 gin Response,咱们先不用在乎 sum 是什么。框架
直接用 go 就行了嘛~ 因此一开始咱们可能就这么写:函数
go microService1() go microService2() go microService3() 复制代码
很简单有没有,可是等等,说好的返回值我怎么接呢? 为了可以并行地接受处理结果,咱们很容易想到用 channel 去接。 因此咱们把调用服务改为这样:微服务
var resChan = make(chan int, 3) // 由于有3个结果,因此咱们建立一个能够容纳3个值的 int channel。 go func() { resChan <- microService1() }() go func() { resChan <- microService2() }() go func() { resChan <- microService3() }() 复制代码
有东西接,那也要有方法去算,因此咱们加一个一直循环拿 resChan 中结果并计算的方法:
var resContainer, sum int for { resContainer = <-resChan sum += resContainer } 复制代码
这样一来咱们就有一个 sum 来计算每次从 resChan 中拿出的结果了。
还没结束,说好的超时处理呢? 为了实现超时处理,咱们须要引入一个东西,就是 context,什么是 context ? 咱们这里只使用 context 的一个特性,超时通知(其实这个特性彻底能够用 channel 来替代)。
能够看在定义 calHandler 的时候咱们已经将 c *gin.Context 做为参数传了进来,那咱们就不用本身在声明了。 gin.Context 简单理解为贯穿整个 gin 声明周期的上下文容器,有点像是分身,亦或是量子纠缠的感受。
有了这个 gin.Context, 咱们就能在一个地方对 context 作出操做,而其余正在使用 context 的函数或方法,也会感觉到 context 作出的变化。
ctx, _ := context.WithTimeout(c, 3*time.Second) //定义一个超时的 context 复制代码
只要时间到了,咱们就能用 ctx.Done() 获取到一个超时的 channel(通知),而后其余用到这个 ctx 的地方也会停掉,并释放 ctx。 通常来讲,ctx.Done() 是结合 select 使用的。 因此咱们又须要一个循环来监听 ctx.Done()
for { select { case <- ctx.Done(): // 返回结果 } 复制代码
如今咱们有两个 for 了,是否是可以合并下?
for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- ctx.Done(): fmt.Println("result:", sum) return } } 复制代码
诶嘿,看上去不错。 不过咱们怎么在正常完成微服务调用的时候输出结果呢? 看来咱们还须要一个 flag
var count int for { select { case resContainer = <-resChan: sum += resContainer count ++ fmt.Println("add", resContainer) if count > 2 { fmt.Println("result:", sum) return } case <- ctx.Done(): fmt.Println("timeout result:", sum) return } } 复制代码
咱们加入一个计数器,由于咱们只是调用3次微服务,因此当 count 大于2的时候,咱们就应该结束并输出结果了。
上面的计时器是一种偷懒的方法,由于咱们知道了调用微服务的次数,若是咱们并不知道,或者以后还要添加呢? 手动每次改 count 的判断阈值会不会太不优雅了?这时候咱们就能够加入 sync 包。 咱们将会使用的 sync 的一个特性是 WaitGroup。它的做用是等待一组协程运行完毕后,执行接下去的步骤。
咱们来改下以前微服务调用的代码块:
var success = make(chan int, 1) // 成功的通道标识 wg := sync.WaitGroup{} // 建立一个 waitGroup 组 wg.Add(3) // 咱们往组里加3个标识,由于咱们要运行3个任务 go func() { resChan <- microService1() wg.Done() // 完成一个,Done()一个 }() go func() { resChan <- microService2() wg.Done() }() go func() { resChan <- microService3() wg.Done() }() wg.Wait() // 直到咱们前面三个标识都被 Done 了,不然程序一直会阻塞在这里 success <- 1 // 咱们发送一个成功信号到通道中 复制代码
注意
:若是咱们直接把上面的代码放到 calHandler 里,会出现一个问题,WaitGroup不论怎么样都会堵塞咱们的正常状况输出(死活都要让你超时)。 因此,咱们把上面这段和业务逻辑相关的代码单独抽离出来,并包装一下。
// rc 是结果 channel, success 是成功与否的 flag channel func MyLogic(rc chan<- int, success chan<- int) { wg := sync.WaitGroup{} // 建立一个 waitGroup 组 wg.Add(3) // 咱们往组里加3个标识,由于咱们要运行3个任务 go func() { rc <- microService1() wg.Done() // 完成一个,Done()一个 }() go func() { rc <- microService2() wg.Done() }() go func() { rc <- microService3() wg.Done() }() wg.Wait() // 直到咱们前面三个标识都被 Done 了,不然程序一直会阻塞在这里 success <- 1 // 咱们发送一个成功信号到通道中 } 复制代码
最终,这个 MyLogic 仍是要做为一个协程运行的。 (多谢@TomorrowWu和@chenqinghe提醒)
既然咱们有了 success 这个信号,那么再把它加入到监控 for 循环中,并作些修改,删除原来 count 判断的部分。
for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- success: fmt.Println("result:", sum) return case <- ctx.Done(): fmt.Println("result:", sum) return } } 复制代码
三个 case,分工明确,
case resContainer = <-resChan:
用来拿逻辑的输出的结果并计算
case <- success:
是理想状况下的正常输出
case <- ctx.Done():
是超时状况下的输出
咱们再润色一下,把后两个 case 的 fmt.Println("result:", sum)
改成 gin 的标准 http Response
c.JSON(http.StatusOK, gin.H{"code":200, "result": sum}) return 复制代码
至此,全部的主要代码都完成了。下面是彻底版
package main import ( "context" "fmt" "net/http" "sync" "time" "github.com/gin-gonic/gin" ) // 一个请求会触发调用三个服务,每一个服务输出一个 int, // 请求要求结果为三个服务输出 int 之和 // 请求返回时间不超过3秒,大于3秒只输出已经得到的 int 之和 func calHandler(c *gin.Context) { var resContainer, sum int var success, resChan = make(chan int), make(chan int, 3) ctx, cancel := context.WithTimeout(c, 5*time.Second) defer cancel() // 真正的业务逻辑 go MyLogic(resChan, success) for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- success: c.JSON(http.StatusOK, gin.H{"code":200, "result": sum}) return case <- ctx.Done(): c.JSON(http.StatusOK, gin.H{"code":200, "result": sum}) return } } } func main() { r := gin.New() r.GET("/calculate", calHandler) http.ListenAndServe(":8008", r) } func MyLogic(rc chan<- int, success chan<- int) { wg := sync.WaitGroup{} // 建立一个 waitGroup 组 wg.Add(3) // 咱们往组里加3个标识,由于咱们要运行3个任务 go func() { rc <- microService1() wg.Done() // 完成一个,Done()一个 }() go func() { rc <- microService2() wg.Done() }() go func() { rc <- microService3() wg.Done() }() wg.Wait() // 直到咱们前面三个标识都被 Done 了,不然程序一直会阻塞在这里 success <- 1 // 咱们发送一个成功信号到通道中 } func microService1() int { time.Sleep(1*time.Second) return 1 } func microService2() int { time.Sleep(2*time.Second) return 2 } func microService3() int { time.Sleep(6*time.Second) return 3 } 复制代码
上面的程序只是简单描述了一个调用其余微服务超时的处理场景。 实际过程当中还须要加不少不少调料,才能保证接口的对外完整性。