Go 每日一库之 ants

简介

处理大量并发是 Go 语言的一大优点。语言内置了方便的并发语法,能够很是方便的建立不少个轻量级的 goroutine 并发处理任务。相比于建立多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。可是受限于资源总量,系统中可以建立的 goroutine 数量也是受限的。默认每一个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能建立 8GB/8KB = 1000000 个 goroutine,更况且系统还须要保留一部份内存运行平常管理任务,go 运行时须要内存运行 gc、处理 goroutine 切换等。使用的内存超过机器内存容量,系统会使用交换区(swap),致使性能急速降低。咱们能够简单验证一下建立过多 goroutine 会发生什么:git

func main() {
  var wg sync.WaitGroup
  wg.Add(10000000)
  for i := 0; i < 10000000; i++ {
    go func() {
      time.Sleep(1 * time.Minute)
    }()
  }
  wg.Wait()
}

在个人机器上(8G内存)运行上面的程序会报errno 1455,即Out of Memory错误,这很好理解。谨慎运行github

另外一方面,goroutine 的管理也是一个问题。goroutine 只能本身运行结束,外部没有任何手段能够强制j结束一个 goroutine。若是一个 goroutine 由于某种缘由没有自行结束,就会出现 goroutine 泄露。此外,频繁建立 goroutine 也是一个开销。golang

鉴于上述缘由,天然出现了与线程池同样的需求,即 goroutine 池。通常的 goroutine 池自动管理 goroutine 的生命周期,能够按需建立,动态缩容。向 goroutine 池提交一个任务,goroutine 池会自动安排某个 goroutine 来处理。微信

ants就是其中一个实现 goroutine 池的库。数据结构

快速使用

本文代码使用 Go Modules。闭包

建立目录并初始化:并发

$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants

安装ants库,使用v2版本:app

$ go get -u github.com/panjf2000/ants/v2

咱们接下来要实现一个计算大量整数和的程序。首先建立基础的任务结构,并实现其执行任务方法:函数

type Task struct {
  index int
  nums  []int
  sum   int
  wg    *sync.WaitGroup
}

func (t *Task) Do() {
  for _, num := range t.nums {
    t.sum += num
  }

  t.wg.Done()
}

很简单,就是将一个切片中的全部整数相加。oop

而后咱们建立 goroutine 池,注意池使用完后须要手动关闭,这里使用defer关闭:

p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()

func taskFunc(data interface{}) {
  task := data.(*Task)
  task.Do()
  fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}

上面调用了ants.NewPoolWithFunc()建立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 个 goroutine。第二个参数为每次执行任务的函数。当咱们调用p.Invoke(data)的时候,ants池会在其管理的 goroutine 中找出一个空闲的,让它执行函数taskFunc,并将data做为参数。

接着,咱们模拟数据,作数据切分,生成任务,交给 ants 处理:

const (
  DataSize    = 10000
  DataPerTask = 100
)

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  task := &Task{
    index: i + 1,
    nums:  nums[i*DataPerTask : (i+1)*DataPerTask],
    wg:    &wg,
  }

  tasks = append(tasks, task)
  p.Invoke(task)
}

wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())

随机生成 10000 个整数,将这些整数分为 100 份,每份 100 个,生成Task结构,调用p.Invoke(task)处理。wg.Wait()等待处理完成,而后输出ants正在运行的 goroutine 数量,这时应该是 0。

最后咱们将结果汇总,并验证一下结果,与直接相加获得的结果作一个比较:

var sum int
for _, task := range tasks {
  sum += task.sum
}

var expect int
for _, num := range nums {
  expect += num
}

fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

运行:

$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172

确实,任务完成以后,正在运行的 goroutine 数量变为 0。并且咱们验证了,结果没有误差。另外须要注意,goroutine 池中任务的执行顺序是随机的,与提交任务的前后没有关系。由上面运行打印的任务标识咱们也能发现这一点。

函数做为任务

ants支持将一个不接受任何参数的函数做为任务提交给 goroutine 运行。因为不接受参数,咱们提交的函数要么不须要外部数据,只须要处理自身逻辑,不然就必须用某种方式将须要的数据传递进去,例如闭包。

提交函数做为任务的 goroutine 池使用ants.NewPool()建立,它只接受一个参数表示池子的容量。调用池子对象的Submit()方法来提交任务,将一个不接受任何参数的函数传入。

最开始的例子能够改写一下。增长一个任务包装函数,将任务须要的参数做为包装函数的参数。包装函数返回实际的任务函数,该任务函数就能够经过闭包访问它须要的数据了:

type taskFunc func()

func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
  return func() {
    for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
      *sum += num
    }

    fmt.Printf("task:%d sum:%d\n", i+1, *sum)
    wg.Done()
  }
}

调用ants.NewPool(10)建立 goroutine 池,一样池子用完须要释放,这里使用defer

p, _ := ants.NewPool(10)
defer p.Release()

生成模拟数据,切分任务。提交任务给ants池执行,这里使用taskFuncWrapper()包装函数生成具体的任务,而后调用p.Submit()提交:

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()

汇总结果,验证:

var sum int
for _, partSum := range partSums {
  sum += partSum
}

var expect int
for _, num := range nums {
  expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

这个程序的功能与最开始的彻底相同。

执行流程

GitHub 仓库中有个执行流程图,我从新绘制了一下:

执行流程以下:

  • 初始化 goroutine 池;
  • 提交任务给 goroutine 池,检查是否有空闲的 goroutine:

    • 有,获取空闲 goroutine
    • 无,检查池中的 goroutine 数量是否已到池容量上限:

      • 已到上限,检查 goroutine 池是不是非阻塞的:

        • 非阻塞,直接返回nil表示执行失败
        • 阻塞,等待 goroutine 空闲
      • 未到上限,建立一个新的 goroutine 处理任务
  • 任务处理完成,将 goroutine 交还给池,以待处理下一个任务

选项

ants提供了一些选项能够定制 goroutine 池的行为。选项使用Options结构定义:

// src/github.com/panjf2000/ants/options.go
type Options struct {
  ExpiryDuration time.Duration
  PreAlloc bool
  MaxBlockingTasks int
  Nonblocking bool
  PanicHandler func(interface{})
  Logger Logger
}

各个选项含义以下:

  • ExpiryDuration:过时时间。表示 goroutine 空闲多长时间以后会被ants池回收
  • PreAlloc:预分配。调用NewPool()/NewPoolWithFunc()以后预分配worker(管理一个工做 goroutine 的结构体)切片。并且使用预分配与否会直接影响池中管理worker的结构。见下面源码
  • MaxBlockingTasks:最大阻塞任务数量。即池中 goroutine 数量已到池容量,且全部 goroutine 都处理繁忙状态,这时到来的任务会在阻塞列表等待。这个选项设置的是列表的最大长度。阻塞的任务数量达到这个值后,后续任务提交直接返回失败
  • Nonblocking:池是否阻塞,默认阻塞。提交任务时,若是ants池中 goroutine 已到上限且所有繁忙,阻塞的池会将任务添加的阻塞列表等待(固然受限于阻塞列表长度,见上一个选项)。非阻塞的池直接返回失败
  • PanicHandler:panic 处理。遇到 panic 会调用这里设置的处理函数
  • Logger:指定日志记录器

NewPool()部分源码:

if p.options.PreAlloc {
  if size == -1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)
}

使用预分配时,建立loopQueueType类型的结构,反之建立stackType类型。这是ants定义的两种管理worker的数据结构。

ants定义了一些With*函数来设置这些选项:

func WithOptions(options Options) Option {
  return func(opts *Options) {
    *opts = options
  }
}

func WithExpiryDuration(expiryDuration time.Duration) Option {
  return func(opts *Options) {
    opts.ExpiryDuration = expiryDuration
  }
}

func WithPreAlloc(preAlloc bool) Option {
  return func(opts *Options) {
    opts.PreAlloc = preAlloc
  }
}

func WithMaxBlockingTasks(maxBlockingTasks int) Option {
  return func(opts *Options) {
    opts.MaxBlockingTasks = maxBlockingTasks
  }
}

func WithNonblocking(nonblocking bool) Option {
  return func(opts *Options) {
    opts.Nonblocking = nonblocking
  }
}

func WithPanicHandler(panicHandler func(interface{})) Option {
  return func(opts *Options) {
    opts.PanicHandler = panicHandler
  }
}

func WithLogger(logger Logger) Option {
  return func(opts *Options) {
    opts.Logger = logger
  }
}

这里使用了 Go 语言中很是常见的一种模式,我称之为选项模式,很是方便地构造有大量参数,且大部分有默认值或通常不须要显式设置的对象。

咱们来验证几个选项。

最大等待队列长度

ants池设置容量以后,若是全部的 goroutine 都在处理任务。这时提交的任务默认会进入等待队列,WithMaxBlockingTasks(maxBlockingTasks int)能够设置等待队列的最大长度。超过这个长度,提交任务直接返回错误:

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    time.Sleep(1 * time.Second)
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(8)
  for i := 1; i <= 8; i++ {
    go func(i int) {
      err := p.Submit(wrapper(i, &wg))
      if err != nil {
        fmt.Printf("task:%d err:%v\n", i, err)
        wg.Done()
      }
    }(i)
  }

  wg.Wait()
}

上面代码中,咱们设置 goroutine 池的容量为 4,最大阻塞队列长度为 2。而后一个 for 提交 8 个任务,指望结果是:4 个任务在执行,2 个任务在等待,2 个任务提交失败。运行结果:

hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2

咱们看到提交任务失败,打印too many goroutines blocked ...

代码中有 4 点须要注意:

  • 提交任务必须并行进行。若是是串行提交,第 5 个任务提交时因为池中没有空闲的 goroutine 处理该任务,Submit()方法会被阻塞,后续任务就都不能提交了。也就达不到验证的目的了
  • 因为任务可能提交失败,失败的任务不会实际执行,因此实际上wg.Done()次数会小于 8。于是在err != nil分支中咱们须要调用一次wg.Done()。不然wg.Wait()会永远阻塞
  • 为了不任务执行过快,空出了 goroutine,观察不到现象,每一个任务中我使用time.Sleep(1 * time.Second)休眠 1s
  • 因为 goroutine 之间的执行顺序未显式同步,故每次执行的顺序不肯定

因为简单起见,前面的例子中Submit()方法的返回值都被咱们忽略了。实际开发中必定不要忽略。

非阻塞

ants池默认是阻塞的,咱们可使用WithNonblocking(nonblocking bool)设置其为非阻塞。非阻塞的ants池中,在全部 goroutine 都在处理任务时,提交新任务会直接返回错误:

func main() {
  p, _ := ants.NewPool(2, ants.WithNonblocking(true))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 3; i++ {
    err := p.Submit(wrapper(i, &wg))
    if err != nil {
      fmt.Printf("task:%d err:%v\n", i, err)
      wg.Done()
    }
  }

  wg.Wait()
}

使用上个例子中的wrapper()函数,ants池容量设置为 2。连续提交 3 个任务,指望结果前两个任务正常执行,第 3 个任务提交时返回错误:

hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1

panic 处理器

一个鲁棒性强的库必定不会忽视错误的处理,特别是宕机相关的错误。在 Go 语言中就是 panic,也被称为运行时恐慌,在程序运行的过程当中产生的严重性错误,例如索引越界,空指针解引用等,都会触发 panic。若是不处理 panic,程序会直接意外退出,可能形成数据丢失的严重后果。

ants中若是 goroutine 在执行任务时发生panic,会终止当前任务的执行,将发生错误的堆栈输出到os.Stderr注意,该 goroutine 仍是会被放回池中,下次能够取出执行新的任务

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    if i%2 == 0 {
      panic(fmt.Sprintf("panic from task:%d", i))
    }
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(2)
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 2; i++ {
    p.Submit(wrapper(i, &wg))
  }

  time.Sleep(1 * time.Second)
  p.Submit(wrapper(3, &wg))
  p.Submit(wrapper(5, &wg))
  wg.Wait()
}

咱们让偶数个任务触发panic。提交两个任务,第二个任务必定会触发panic。触发panic以后,咱们还能够继续提交任务 三、5。注意这里没有 4,提交任务 4 仍是会触发panic

上面的程序须要注意 2 点:

  • 任务函数中wg.Done()是在panic方法以后,若是触发了panic,函数中的其余正常逻辑就不会再继续执行了。因此咱们虽然wg.Add(3),可是一共提交了 4 个任务,其中一个任务触发了panicwg.Done()没有正确执行。实际开发中,咱们通常使用defer语句来确保wg.Done()必定会执行
  • 在 for 循环以后,我添加了一行代码time.Sleep(1 * time.Second)。若是没有这一行,后续的两条Submit()方法能够直接执行,可能会致使任务很快就完成了,wg.Wait()直接返回了,这时panic的堆栈尚未输出。你能够尝试注释掉这行代码运行看看结果

除了ants提供的默认 panic 处理器,咱们还可使用WithPanicHandler(paincHandler func(interface{}))指定咱们本身编写的 panic 处理器。处理器的参数就是传给panic的值:

func panicHandler(err interface{}) {
  fmt.Fprintln(os.Stderr, err)
}

p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()

其他代码与上面的彻底相同,指定了panicHandler后触发panic就会执行它。运行:

hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3

看到输出了传给panic函数的字符串(第二行输出)。

默认池

为了方便使用,不少 Go 库都喜欢提供其核心功能类型的一个默认实现。能够直接经过库提供的接口调用。例如net/http,例如antsants库中定义了一个默认的池,默认容量为MaxInt32。goroutine 池的各个方法均可以直接经过ants包直接访问:

// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)

func Submit(task func()) error {
  return defaultAntsPool.Submit(task)
}

func Running() int {
  return defaultAntsPool.Running()
}

func Cap() int {
  return defaultAntsPool.Cap()
}

func Free() int {
  return defaultAntsPool.Free()
}

func Release() {
  defaultAntsPool.Release()
}

func Reboot() {
  defaultAntsPool.Reboot()
}

直接使用:

func main() {
  defer ants.Release()

  var wg sync.WaitGroup
  wg.Add(2)
  for i := 1; i <= 2; i++ {
    ants.Submit(wrapper(i, &wg))
  }
  wg.Wait()
}

默认池也须要Release()

总结

本文介绍了 goroutine 池的由来,并借由ants库介绍了基本的使用方法,和一些细节。ants源码很少,去掉测试的核心代码只有 1k 行左右,建议有时间、感兴趣的童鞋深刻阅读。

你们若是发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. ants GitHub:github.com/panjf2000/ants
  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

个人博客:https://darjun.github.io

欢迎关注个人微信公众号【GoUpUp】,共同窗习,一块儿进步~

相关文章
相关标签/搜索