Golang 在异步处理上有着上佳的表现。由于 goroutines 和 channels 是很是容易使用且有效的异步处理手段。下面咱们一块儿来看一看 Golang 的简易任务队列golang
有些时候,咱们须要作异步处理可是并不须要一个任务对列,这类问题咱们使用 Golang 能够很是简单的实现。以下:安全
go process(job)
这的确是不少场景下的绝佳选择,好比操做一个HTTP请求等待结果。然而,在一些相对复杂高并发的场景下,你就不能简单的使用该方法来实现异步处理。这时候,你须要一个队列来管理须要处理的任务,而且按照必定的顺序来处理这些任务。服务器
接下来看一个最简单的任务队列和工做者模型。并发
func worker(jobChan <-chan Job) { for job := range jobChan { process(job) } } // make a channel with a capacity of 100. jobChan := make(chan Job, 100) // start the worker go worker(jobChan) // enqueue a job jobChan <- job
代码中建立了一个 Job 对象的 channel , 容量为100。而后开启一个工做者协程从 channel 中去除任务并执行。任务的入队操做就是将一个 Job 对象放入任务 channel 中。异步
虽然上面只有短短的几行代码,却完成了不少的工做。咱们实现了一个简易的线程安全的、支持并发的、可靠的任务队列。高并发
上面的例子中,咱们初始化了一个容量为 100 的任务 channel。工具
// make a channel with a capacity of 100. jobChan := make(chan Job, 100)
这意味着任务的入队操做十分简单,以下:ui
// enqueue a job jobChan <- job
这样一来,当 job channel 中已经放入 100 个任务的时候,入队操做将会阻塞,直至有任务被工做者处理完成。这一般不是一个好的现象,由于咱们一般不但愿程序出现阻塞等待。这时候,咱们一般但愿有一个超时机制来告诉服务调用方,当前服务忙,稍后重试。我以前的博文--我读《经过Go来处理每分钟达百万的数据请求》介绍过相似的限流策略。这里方法相似,就是当队列满的时候,返回503,告诉调用方服务忙。代码以下:线程
// TryEnqueue tries to enqueue a job to the given job channel. Returns true if // the operation was successful, and false if enqueuing would not have been // possible without blocking. Job is not enqueued in the latter case. func TryEnqueue(job Job, jobChan <-chan Job) bool { select { case jobChan <- job: return true default: return false } }
这样一来,咱们尝试入队的时候,若是入队失败,放回一个 false ,这样咱们再对这个返回值处理以下:设计
if !TryEnqueue(job, chan) { http.Error(w, "max capacity reached", 503) return }
这样就简单的实现了限流操做。当 jobChan 满的时候,程序会走到 default 返回 false ,从而告知调用方当前的服务器状况。
到上面的步骤,限流已经能够解决,那么咱们接下来考虑,怎么才能优雅的关闭工做者?假设咱们决定再也不向任务队列插入任务,咱们但愿让全部的已入队任务执行完成,咱们能够很是简单的实现:
close(jobChan)
没错,就是这一行代码,咱们就可让任务队列再也不接收新任务(仍然能够从 channel 读取 job ),若是咱们想执行队列里的已经存在的任务,只须要:
for job := range jobChan {...}
全部已经入队的 job 会正常被 woker 取走执行。可是,这样实际上还存在一个问题,就是主协成不会等待工做者执行完工做就会退出。它不知道工做者协成何时可以处理完以上的任务。能够运行的例子以下:
package main import ( "fmt" ) var jobChan chan int func worker(jobChan <- chan int) { for job := range jobChan{ fmt.Printf("执行任务 %d \n", job) } } func main() { jobChan = make(chan int, 100) //入队 for i := 1; i <= 10; i++{ jobChan <- i } close(jobChan) go worker(jobChan) }
运行发现,woker 没法保证执行完 channel 中的 job 就退出了。那咱们怎么解决这个问题?
使用 sysc.WaitGroup:
package main import ( "fmt" "sync" ) var jobChan chan int var wg sync.WaitGroup func worker(jobChan <- chan int) { defer wg.Done() for job := range jobChan{ fmt.Printf("执行任务 %d \n", job) } } func main() { jobChan = make(chan int, 100) //入队 for i := 1; i <= 10; i++{ jobChan <- i } wg.Add(1) close(jobChan) go worker(jobChan) wg.Wait() }
使用这种协程间同步的方法,协成会等待 worker 执行完 job 才会退出。运行结果:
执行任务 1 执行任务 2 执行任务 3 执行任务 4 执行任务 5 执行任务 6 执行任务 7 执行任务 8 执行任务 9 执行任务 10 Process finished with exit code 0
这样是完美的么?在设计功能的时候,为了防止协程假死,咱们应该给协程设置一个超时。
上面的例子中 wg.Wait() 会一直等待,直到 wg.Done() 被调用。可是若是这个操做假死,没法调用,将永远等待。这是咱们不但愿看到的,所以,咱们能够给他设置一个超时时间。方法以下:
package main import ( "fmt" "sync" "time" ) var jobChan chan int var wg sync.WaitGroup func worker(jobChan <-chan int) { defer wg.Done() for job := range jobChan { fmt.Printf("执行任务 %d \n", job) time.Sleep(1 * time.Second) } } func main() { jobChan = make(chan int, 100) //入队 for i := 1; i <= 10; i++ { jobChan <- i } wg.Add(1) close(jobChan) go worker(jobChan) res := WaitTimeout(&wg, 5*time.Second) if res { fmt.Println("执行完成退出") } else { fmt.Println("执行超时退出") } } //超时机制 func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { ch := make(chan struct{}) go func() { wg.Wait() close(ch) }() select { case <-ch: return true case <-time.After(timeout): return false } }
执行结果以下:
执行任务 1 执行任务 2 执行任务 3 执行任务 4 执行任务 5 执行超时退出 Process finished with exit code 0
这样,5s 超时生效,虽然不是全部的任务被执行,因为超时,也会退出。
有时候咱们但愿 woker 丢弃在执行的工做,也就是 cancel 操做,怎么处理?
咱们能够借助 context.Context 实现。以下:
package main import ( "context" "fmt" "sync" "time" ) var jobChan chan int var ctx context.Context var cancel context.CancelFunc func worker(jobChan <-chan int, ctx context.Context) { for { select { case <-ctx.Done(): return case job := <-jobChan: fmt.Printf("执行任务 %d \n", job) time.Sleep(1 * time.Second) } } } func main() { jobChan = make(chan int, 100) //带有取消功能的 contex ctx, cancel = context.WithCancel(context.Background()) //入队 for i := 1; i <= 10; i++ { jobChan <- i } close(jobChan) go worker(jobChan, ctx) time.Sleep(2 * time.Second) //調用cancel cancel() }
結果以下:
执行任务 1 执行任务 2 Process finished with exit code 0
能够看出,咱们等待2s后,咱们主动调用了取消操做,woker 协程主动退出。
这是借助 context 包实现了取消操做,实质上也是监听一个 channel 的操做,那咱们有没有可能不借助 context 实现取消操做呢?
不使用 context 的超时机制实现取消:
package main import ( "fmt" "time" ) var jobChan chan int func worker(jobChan <-chan int, cancelChan <-chan struct{}) { for { select { case <-cancelChan: return case job := <-jobChan: fmt.Printf("执行任务 %d \n", job) time.Sleep(1 * time.Second) } } } func main() { jobChan = make(chan int, 100) //经过chan 取消操做 cancelChan := make(chan struct{}) //入队 for i := 1; i <= 10; i++ { jobChan <- i } close(jobChan) go worker(jobChan, cancelChan) time.Sleep(2 * time.Second) //关闭chan close(cancelChan) }
这样,咱们使用一个关闭 chan 的信号实现了取消操做。缘由是无缓冲 chan 读取会阻塞,当关闭后,能够读取到空,所以会执行 select 里的 return.
照例总结一波,本文介绍了 golang 协程间的同步和通讯的一些方法,任务队列的最简单实现。关于工做者池的实现,我在其余博文也写到了,这里很少写。本文更可能是工具性的代码,写功能时候能够借用,好比超时、取消、chan的操做等。