如何防止 goroutine 泄露

本文首发于个人博客,若是以为有用,欢迎点赞收藏,让更多的朋友看到。golang

今天来简单谈谈,Go 如何防止 goroutine 泄露。后端

概述

Go 的并发模型与其余语言不一样,虽然说它简化了并发程序的开发难度,但若是不了解使用方法,经常会遇到 goroutine 泄露的问题。虽然 goroutine 是轻量级的线程,占用资源不多,但若是一直得不到释放而且还在不断建立新协程,毫无疑问是有问题的,而且是要在程序运行几天,甚至更长的时间才能发现的问题。bash

对于上面描述的问题,我以为能够从两方面入手解决,以下:微信

一是预防,要作到预防,咱们就须要了解什么样的代码会产生泄露,以及了解如何写出正确的代码;并发

二是监控,虽然说预防减小了泄露产生的几率,但没有人敢说本身不犯错,于是,一般咱们还须要一些监控手段进一步保证程序的健壮性;函数

接下来,我将会分两篇文章分别从这两个角度进行介绍,今天先谈第一点。post

如何监控泄露

本文主要集中在第一点上,但为了更好的演示效果,能够先介绍一个最简单的监控方式。经过 runtime.NumGoroutine() 获取当前运行中的 goroutine 数量,经过它确认是否发生泄漏。它的使用很是简单,就不为它专门写个例子了。学习

一个简单的例子

语言级别的并发支持是 Go 的一大优点,但这个优点也很容易被滥用。一般咱们在开始 Go 并发学习时,经常听别人说,Go 的并发很是简单,在调用函数前加上 go 关键词即可启动 goroutine,即一个并发单元,但不少人可能只听到了这句话,而后就出现了相似下面的代码:ui

package main

import (
    "fmt"
    "runtime"
    "time"
)

func sayHello() {
    for {
        fmt.Println("Hello gorotine")
        time.Sleep(time.Second)
    }
}

func main() {
    defer func() {
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    go sayHello()
    fmt.Println("Hello main")
}
复制代码

对 Go 比较熟悉的话,很容易发现这段代码的问题,sayHello 是个死循环,没有如何退出机制,所以也就没有任何办法释放建立的 goroutine。咱们经过在 main 函数最前面的 defer 实如今函数退出时打印当前运行中的 goroutine 数量,毫无心外,它的输出以下:atom

the number of goroutines: 2
复制代码

不过,由于上面的程序并不是常驻,有泄露问题也不大,程序退出后系统会自动回收运行时资源。但若是这段代码在常驻服务中执行,好比 http server,每接收到一个请求,便会启动一次 sayHello,时间流逝,每次启动的 goroutine 都得不到释放,你的服务将会离奔溃愈来愈近。

这个例子比较简单,我相信,对 Go 的并发稍微有点了解的朋友都不会犯这个错。

泄露状况分类

前面介绍的例子因为在 goroutine 运行死循环致使的泄露。接下来,我会按照并发的数据同步方式对泄露的各类状况进行分析。简单可归于两类,即:

  • channel 致使的泄露
  • 传统同步机制致使的泄露

传统同步机制主要指面向共享内存的同步机制,好比排它锁、共享锁等。这两种状况致使的泄露仍是比较常见的。go 因为 defer 的存在,第二类状况,通常状况下仍是比较容易避免的。

chanel 引发的泄露

先说 channel,若是以前读过官方的那篇并发的文章翻译版,你会发现 channel 的使用,一个不当心就泄露了。咱们来具体总结下那些状况下可能致使。

发送不接收

咱们知道,发送者通常都会配有相应的接收者。理想状况下,咱们但愿接收者总能接收完全部发送的数据,这样就不会有任何问题。但现实是,一旦接收者发生异常退出,中止继续接收上游数据,发送者就会被阻塞。这个状况在 前面说的文章 中有很是细致的介绍。

示例代码:

package main

import "time"

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func main() {
    defer func() {
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    // Set up the pipeline.
    out := gen(2, 3)

    for n := range out {
        fmt.Println(n)               // 2
        time.Sleep(5 * time.Second) // done thing, 可能异常中断接收
        if true { // if err != nil 
            break
        }
    }
}
复制代码

例子中,发送者经过 out chan 向下游发送数据,main 函数接收数据,接收者一般会依据接收到的数据作一些具体的处理,这里用 Sleep 代替。若是这期间发生异常,致使处理中断,退出循环。gen 函数中启动的 goroutine 并不会退出。

如何解决?

此处的主要问题在于,当接收者中止工做,发送者并不知道,还在傻傻地向下游发送数据。故而,咱们须要一种机制去通知发送者。我直接说答案吧,就不循渐进了。Go 能够经过 channel 的关闭向全部的接收者发送广播信息。

修改后的代码:

package main

import "time"

func gen(done chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }()
    return out
}

func main() {
    defer func() {
        time.Sleep(time.Second)
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    // Set up the pipeline.
    done := make(chan struct{})
    defer close(done)

    out := gen(done, 2, 3)

    for n := range out {
        fmt.Println(n) // 2
        time.Sleep(5 * time.Second) // done thing, 可能异常中断接收
        if true { // if err != nil 
            break
        }
    }
}
复制代码

函数 gen 中经过 select 实现 2 个 channel 的同时处理。当异常发生时,将进入 <-done 分支,实现 goroutine 退出。这里为了演示效果,保证资源顺利释放,退出时等待了几秒保证释放完成。

执行后的输出以下:

the number of goroutines:  1
复制代码

如今只有主 goroutine 存在。

接收不发送

发送不接收会致使发送者阻塞,反之,接收不发送也会致使接收者阻塞。直接看示例代码,以下:

package main

func main() {
    defer func() {
        time.Sleep(time.Second)
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    var ch chan struct{}
    go func() {
        ch <- struct{}{}
    }()
}
复制代码

运行结果显示:

the number of goroutines:  2
复制代码

固然,咱们正常不会遇到这么傻的状况发生,现实工做中的案例更多多是发送已完成,可是发送者并无关闭 channel,接收者天然也没法知道发送完毕,阻塞所以就发生了。

解决方案是什么?那固然就是,发送完成后必定要记得关闭 channel。

nil channel

向 nil channel 发送和接收数据都将会致使阻塞。这种状况可能在咱们定义 channel 时忘记初始化的时候发生。

示例代码:

func main() {
    defer func() {
        time.Sleep(time.Second)
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    var ch chan int
    go func() {
        <-ch
        // ch<-
    }()
}
复制代码

两种写法:<-ch 和 ch<- 1,分别表示接收与发送,都将会致使阻塞。若是想实现阻塞,经过 nil channel 和 done channel 结合实现阻止 main 函数的退出,这或许是能够一试的方法。

func main() {
	defer func() {
		time.Sleep(time.Second)
		fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
	}()

	done := make(chan struct{})

	var ch chan int
	go func() {
		defer close(done)
	}()

	select {
	case <-ch:
	case <-done:
		return
	}
}
复制代码

在 goroutine 执行完成,检测到 done 关闭,main 函数退出。

真实的场景

真实的场景确定不会像案例中的简单,可能涉及多阶段 goroutine 之间的协做,某个 goroutine 可能即便接收者又是发送者。但归根接底,不管什么使用模式。都是把基础知识组织在一块儿的合理运用。

传统同步机制

虽然,通常推荐 Go 并发数据的传递,但有些场景下,显然仍是使用传统同步机制更合适。Go 中提供传统同步机制主要在 sync 和 atomic 两个包。接下来,我主要介绍的是锁和 WaitGroup 可能致使 goroutine 的泄露。

Mutex

和其余语言相似,Go 中存在两种锁,排它锁和共享锁,关于它们的使用就不做介绍了。咱们以排它锁为例进行分析。

示例以下:

func main() {
    total := 0

    defer func() {
        time.Sleep(time.Second)
        fmt.Println("total: ", total)
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    var mutex sync.Mutex
    for i := 0; i < 2; i++ {
        go func() {
            mutex.Lock()
            total += 1
        }()
    }
}
复制代码

执行结果以下:

total: 1
the number of goroutines: 2
复制代码

这段代码经过启动两个 goroutine 对 total 进行加法操做,为防止出现数据竞争,对计算部分作了加锁保护,但并无及时的解锁,致使 i = 1 的 goroutine 一直阻塞等待 i = 0 的 goroutine 释放锁。能够看到,退出时有 2 个 goroutine 存在,出现了泄露,total 的值为 1。

怎么解决?由于 Go 有 defer 的存在,这个问题仍是很是容易解决的,只要记得在 Lock 的时候,记住 defer Unlock 便可。

示例以下:

mutex.Lock()
defer mutext.Unlock()
复制代码

其余的锁与这里其实都是相似的。

WaitGroup

WaitGroup 和锁有所差异,它相似 Linux 中的信号量,能够实现一组 goroutine 操做的等待。使用的时候,若是设置了错误的任务数,也可能会致使阻塞,致使泄露发生。

一个例子,咱们在开发一个后端接口时须要访问多个数据表,因为数据间没有依赖关系,咱们能够并发访问,示例以下:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func handle() {
    var wg sync.WaitGroup

    wg.Add(4)

    go func() {
        fmt.Println("访问表1")
        wg.Done()
    }()

    go func() {
        fmt.Println("访问表2")
        wg.Done()
    }()

    go func() {
        fmt.Println("访问表3")
        wg.Done()
    }()

    wg.Wait()
}

func main() {
    defer func() {
        time.Sleep(time.Second)
        fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
    }()

    go handle()
    time.Sleep(time.Second)
}
复制代码

执行结果以下:

the number of goroutines: 2
复制代码

出现了泄露。再看代码,它的开始部分定义了类型为 sync.WaitGroup 的变量 wg,设置并发任务数为 4,可是从例子中能够看出只有 3 个并发任务。故最后的 wg.Wait() 等待退出条件将永远没法知足,handle 将会一直阻塞。

怎么防止这类状况发生?

我我的的建议是,尽可能不要一次设置所有任务数,即便数量很是明确的状况。由于在开始多个并发任务之间或许也可能出现被阻断的状况发生。最好是尽可能在任务启动时经过 wg.Add(1) 的方式增长。

示例以下:

...
    wg.Add(1)
    go func() {
        fmt.Println("访问表1")
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        fmt.Println("访问表2")
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        fmt.Println("访问表3")
        wg.Done()
    }()
    ...
复制代码

总结

大概介绍完了我认为的全部可能致使 goroutine 泄露的状况。总结下来,其实不管是死循环、channel 阻塞、锁等待,只要是会形成阻塞的写法均可能产生泄露。于是,如何防止 goroutine 泄露就变成了如何防止发生阻塞。为进一步防止泄露,有些实现中会加入超时处理,主动释放处理时间太长的 goroutine。

本篇主要从如何写出正确代码的角度来介绍如何防止 goroutine 的泄露。下篇,将会介绍如何实现更好的监控检测,以帮助咱们发现当前代码中已经存在的泄露。

参考资料

Concurrency In Go
Goroutine leak
Leaking-Goroutines
Go Concurrency Patterns: Context
Go Concurrency Patterns: Pipelines and cancellation
make goroutine stay running after returning from function
Never start a goroutine without knowing how it will stop


波罗学的微信公众号