Go语言基础之并发

更新、更全的《Go从入门到放弃》的更新网站,更有python、go、人工智能教学等着你:http://www.javashuo.com/article/p-mxrjjcnn-hn.htmljava

并发是编程里面一个很是重要的概念,Go语言在语言层面天生支持并发,这也是Go语言流行的一个很重要的缘由。python

1、Go语言中的并发编程

2、并发与并行

并发:同一时间段内执行多个任务(你在用微信和两个女友聊天)。c++

并行:同一时刻执行多个任务(你和你朋友都在用微信和女友聊天)。git

Go语言的并发经过goroutine实现。goroutine相似于线程,属于用户态的线程,咱们能够根据须要建立成千上万个goroutine并发工做。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操做系统调度完成。程序员

Go语言还提供channel在多个goroutine间进行通讯。goroutinechannel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。github

3、goroutine

在java/c++中咱们要实现并发编程的时候,咱们一般须要本身维护一个线程池,而且须要本身去包装一个又一个的任务,同时须要本身去调度线程执行任务并维护上下文切换,这一切一般会耗费程序员大量的心智。那么能不能有一种机制,程序员只须要定义不少个任务,让系统去帮助咱们把这些任务分配到CPU上实现并发执行呢?算法

Go语言中的goroutine就是这样一种机制,goroutine的概念相似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每一个CPU。Go语言之因此被称为现代化的编程语言,就是由于它在语言层面已经内置了调度和上下文切换的机制。编程

在Go语言编程中你不须要去本身写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你须要让某个任务并发执行的时候,你只须要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就能够了,就是这么简单粗暴。安全

3.1 使用goroutine

Go语言中使用goroutine很是简单,只须要在调用函数的时候在前面加上go关键字,就能够为一个函数建立一个goroutine

一个goroutine一定对应一个函数,能够建立多个goroutine去执行相同的函数。

3.2 启动单个goroutine

启动goroutine的方式很是简单,只须要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

举个例子以下:

func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

这个示例中hello函数和下面的语句是串行的,执行的结果是打印完Hello Goroutine!后打印main goroutine done!

接下来咱们在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数。

func main() {
    go hello() // 启动另一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
}

这一次的执行结果只打印了main goroutine done!,并无打印Hello Goroutine!。为何呢?

在程序启动时,Go程序就会为main()函数建立一个默认的goroutine

当main()函数返回的时候该goroutine就结束了,全部在main()函数中启动的goroutine会一同结束,main函数所在的goroutine就像是权利的游戏中的夜王,其余的goroutine都是异鬼,夜王一死它转化的那些异鬼也就所有GG了。

因此咱们要想办法让main函数等一等hello函数,最简单粗暴的方式就是time.Sleep了。

func main() {
    go hello() // 启动另一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

执行上面的代码你会发现,这一次先打印main goroutine done!,而后紧接着打印Hello Goroutine!

首先为何会先打印main goroutine done!是由于咱们在建立新的goroutine的时候须要花费一些时间,而此时main函数所在的goroutine是继续执行的。

3.3 启动多个goroutine

在Go语言中实现并发就是这样简单,咱们还能够启动多个goroutine。让咱们再来一个例子: (这里使用了sync.WaitGroup来实现goroutine的同步)

var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待全部登记的goroutine都结束
}

屡次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是由于10个goroutine是并发执行的,而goroutine的调度是随机的。

4、goroutine与线程

4.1 可增加的栈

OS线程(操做系统线程)通常都有固定的栈内存(一般为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型状况下2KB),goroutine的栈不是固定的,他能够按需增大和缩小,goroutine的栈大小限制能够达到1GB,虽然极少会用到这个大。因此在Go语言中一次建立十万左右的goroutine也是能够的。

4.2 goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言本身实现的一套调度系统。区别于操做系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对本身管理的goroutine队列作一些调度(好比把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当本身的队列消费完了就去全局队列里取,若是全局队列里也消费完了会去其余P的队列里抢任务。
  • M(machine)是Go运行时(runtime)对操做系统内核线程的虚拟, M与内核线程通常是一一映射的关系, 一个groutine最终是要放到M上执行的;

P与M通常也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其余的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是经过runtime.GOMAXPROCS设定(最大256),Go1.5版本以后默认为物理线程数。 在并发量大的时候会增长一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其余语言的优点在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)本身的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特色是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池须要改变),成本比调度OS线程低不少。 另外一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上自己goroutine的超轻量,以上种种保证了go调度方面的性能。

点我了解更多

4.3 GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来肯定须要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中能够经过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本以前,默认使用的是单核心执行。Go1.5版本以后,默认使用所有的CPU逻辑核心数。

咱们能够经过将任务分配到不一样的CPU逻辑核心上实现并行的效果,这里举个例子:

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

两个任务只有一个逻辑核心,此时是作完一个任务再作另外一个任务。 将逻辑核心数设为2,此时两个任务并行执行,代码以下。

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

Go语言中的操做系统线程和goroutine的关系:

  1. 一个操做系统线程对应用户态多个goroutine。
  2. go程序能够同时使用多个操做系统线程。
  3. goroutine和OS线程是多对多的关系,即m:n。

5、channel

单纯地将函数并发执行是没有意义的。函数与函数间须要交换数据才能体现并发执行函数的意义。

虽然可使用共享内存进行数据交换,可是共享内存在不一样的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种作法势必形成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡经过通讯共享内存而不是经过共享内存而实现通讯

若是说goroutine是Go程序并发的执行体,channel就是它们之间的链接。channel是可让一个goroutine发送特定值到另外一个goroutine的通讯机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,老是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每个通道都是一个具体类型的导管,也就是声明channel的时候须要为其指定元素类型。

5.1 channel类型

channel是一种类型,一种引用类型。声明通道类型的格式以下:

var 变量 chan 元素类型

举几个例子:

var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

5.2 建立channel

通道是引用类型,通道类型的空值是nil

var ch chan int
fmt.Println(ch) // <nil>

声明的通道后须要使用make函数初始化以后才能使用。

建立channel的格式以下:

make(chan 元素类型, [缓冲大小])

channel的缓冲大小是可选的。

举几个例子:

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

5.3 channel操做

通道有发送(send)、接收(receive)和关闭(close)三种操做。

发送和接收都使用<-符号。

如今咱们先使用如下语句定义一个通道:

ch := make(chan int)

5.3.1 发送

将一个值发送到通道中。

ch &lt;- 10 // 把10发送到ch中

5.3.2 接收

从一个通道中接收值。

x := &lt;- ch // 从ch中接收值并赋值给变量x
&lt;-ch       // 从ch中接收值,忽略结果

5.3.3 关闭

咱们经过调用内置的close函数来关闭通道。

close(ch)

关于关闭通道须要注意的事情是,只有在通知接收方goroutine全部的数据都发送完毕的时候才须要关闭通道。通道是能够被垃圾回收机制回收的,它和关闭文件是不同的,在结束操做以后关闭文件是必需要作的,但关闭通道不是必须的。

关闭后的通道有如下特色:

  1. 对一个关闭的通道再发送值就会致使panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的而且没有值的通道执行接收操做会获得对应类型的零值。
  4. 关闭一个已经关闭的通道会致使panic。

5.4 无缓冲的通道

无缓冲的通道又称为阻塞的通道。咱们来看一下下面的代码:

func main() {
    ch := make(chan int)
    ch &lt;- 10
    fmt.Println(&quot;发送成功&quot;)
}

上面这段代码可以经过编译,可是执行的时候会出现如下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54

为何会出现deadlock错误呢?

由于咱们使用ch := make(chan int)建立的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必需要把这个物品送到你的手中,简单来讲就是无缓冲的通道必须有接收才能发送。

上面的代码会阻塞在ch <- 10这一行代码造成死锁,那如何解决这个问题呢?

一种方法是启用一个goroutine去接收值,例如:

func recv(c chan int) {
    ret := &lt;-c
    fmt.Println(&quot;接收成功&quot;, ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch &lt;- 10
    fmt.Println(&quot;发送成功&quot;)
}

无缓冲通道上的发送操做会阻塞,直到另外一个goroutine在该通道上执行接收操做,这时值才能发送成功,两个goroutine将继续执行。相反,若是接收操做先执行,接收方的goroutine将阻塞,直到另外一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通讯将致使发送和接收的goroutine同步化。所以,无缓冲通道也被称为同步通道

5.5 有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲区的通道。咱们能够在使用make函数初始化通道的时候为其指定通道的容量,例如:

func main() {
    ch := make(chan int, 1) // 建立一个容量为1的有缓冲区通道
    ch &lt;- 10
    fmt.Println(&quot;发送成功&quot;)
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

咱们可使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然咱们不多会这么作。

5.6 如何优雅的从通道循环取值

当经过通道发送有限的数据时,咱们能够经过close函数关闭通道来告知从该通道接收值的goroutine中止等待。当通道被关闭时,往该通道发送值会引起panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?

咱们来看下面这个例子:

// channel 练习
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i := 0; i &lt; 100; i++ {
            ch1 &lt;- i
        }
        close(ch1)
    }()
    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i, ok := &lt;-ch1 // 通道关闭后再取值ok=false
            if !ok {
                break
            }
            ch2 &lt;- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中从ch2中接收值打印
    for i := range ch2 { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
}

从上面的例子中咱们看到有两种方式在接收值的时候判断通道是否被关闭,咱们一般使用的是for range的方式。

5.7 单向通道

有的时候咱们会将通道做为参数在多个任务函数间传递,不少时候咱们在不一样的任务函数中使用通道都会对其进行限制,好比限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种状况。例如,咱们把上面的例子改造以下:

func counter(out chan&lt;- int) {
    for i := 0; i &lt; 100; i++ {
        out &lt;- i
    }
    close(out)
}

func squarer(out chan&lt;- int, in &lt;-chan int) {
    for i := range in {
        out &lt;- i * i
    }
    close(out)
}
func printer(in &lt;-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,

  • chan<- int是一个只能发送的通道,能够发送可是不能接收;
  • <-chan int是一个只能接收的通道,能够接收可是不能发送。

在函数传参及任何赋值操做中将双向通道转换为单向通道是能够的,但反过来是不能够的。

5.8 通道总结

channel常见的异常总结,以下图: channel01.png

关闭已经关闭的channel也会引起panic

6、worker pool(goroutine池)

在工做中咱们一般会使用能够指定启动的goroutine数量–worker pool模式,控制goroutine的数量,防止goroutine泄漏和暴涨。

一个简易的work pool示例代码以下:

func worker(id int, jobs &lt;-chan int, results chan&lt;- int) {
    for j := range jobs {
        fmt.Printf(&quot;worker:%d start job:%d\n&quot;, id, j)
        time.Sleep(time.Second)
        fmt.Printf(&quot;worker:%d end job:%d\n&quot;, id, j)
        results &lt;- j * 2
    }
}


func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    // 开启3个goroutine
    for w := 1; w &lt;= 3; w++ {
        go worker(w, jobs, results)
    }
    // 5个任务
    for j := 1; j &lt;= 5; j++ {
        jobs &lt;- j
    }
    close(jobs)
    // 输出结果
    for a := 1; a &lt;= 5; a++ {
        &lt;-results
    }
}

7、select多路复用

在某些场景下咱们须要同时从多个通道接收数据。通道在接收数据时,若是没有数据能够接收将会发生阻塞。你也许会写出以下代码使用遍历的方式来实现:

for{
    // 尝试从ch1接收值
    data, ok := &lt;-ch1
    // 尝试从ch2接收值
    data, ok := &lt;-ch2
    …
}

这种方式虽然能够实现从多个通道接收值的需求,可是运行性能会差不少。为了应对这种场景,Go内置了select关键字,能够同时响应多个通道的操做。

select的使用相似于switch语句,它有一系列case分支和一个默认的分支。每一个case会对应一个通道的通讯(接收或发送)过程。select会一直等待,直到某个case的通讯操做完成时,就会执行case分支对应的语句。具体格式以下:

select{
    case &lt;-ch1:
        ...
    case data := &lt;-ch2:
        ...
    case ch3&lt;-data:
        ...
    default:
        默认操做
}

举个小例子来演示下select的使用:

func main() {
    ch := make(chan int, 1)
    for i := 0; i &lt; 10; i++ {
        select {
        case x := &lt;-ch:
            fmt.Println(x)
        case ch &lt;- i:
        }
    }
}

使用select语句能提升代码的可读性。

  • 可处理一个或多个channel的发送/接收操做。
  • 若是多个case同时知足,select会随机选择一个。
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

8、并发安全和锁

有时候在Go代码中可能会存在多个goroutine同时操做一个资源(临界区),这种状况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车箱里的人竞争。

举个例子:

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i &lt; 5000; i++ {
        x = x + 1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代码中咱们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,致使最后的结果与期待的不符。

8.1 互斥锁

互斥锁是一种经常使用的控制共享资源访问的方法,它可以保证同时只有一个goroutine能够访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i &lt; 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥锁可以保证同一时间有且只有一个goroutine进入临界区,其余的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才能够获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

8.2 读写互斥锁

互斥锁是彻底互斥的,可是有不少实际的场景下是读多写少的,当咱们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁以后,其余的goroutine若是是获取读锁会继续得到锁,若是是获取写锁就会等待;当一个goroutine获取写锁以后,其余的goroutine不管是获取读锁仍是写锁都会等待。

读写锁示例:

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操做耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操做耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i &lt; 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i &lt; 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

须要注意的是读写锁很是适合读多写少的场景,若是读和写的操做差异不大,读写锁的优点就发挥不出来。

8.3 sync.WaitGroup

在代码中生硬的使用time.Sleep确定是不合适的,Go语言中可使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有如下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值能够增长和减小。例如当咱们启动了N 个并发任务时,就将计数器值增长N。每一个任务完成时经过调用Done()方法将计数器减1。经过调用Wait()来等待并发任务执行完,当计数器值为0时,表示全部并发任务已经完成。

咱们利用sync.WaitGroup将上面的代码优化一下:

var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println(&quot;Hello Goroutine!&quot;)
}
func main() {
    wg.Add(1)
    go hello() // 启动另一个goroutine去执行hello函数
    fmt.Println(&quot;main goroutine done!&quot;)
    wg.Wait()
}

须要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

8.4 sync.Once

说在前面的话:这是一个进阶知识点。

在编程的不少场景下咱们须要确保某些操做在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个Do方法,其签名以下:

func (o *Once) Do(f func()) {}

备注:若是要执行的函数f须要传递参数就须要搭配闭包来使用。

8.4.1 加载配置文件示例

延迟一个开销很大的初始化操做到真正用到它的时候再执行是一个很好的实践。由于预先初始化一个变量(好比在init函数中完成初始化)会增长程序的启动耗时,并且有可能实际执行过程当中这个变量没有用上,那么这个初始化操做就不是必需要作的。咱们来看一个例子:

var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        &quot;left&quot;:  loadIcon(&quot;left.png&quot;),
        &quot;up&quot;:    loadIcon(&quot;up.png&quot;),
        &quot;right&quot;: loadIcon(&quot;right.png&quot;),
        &quot;down&quot;:  loadIcon(&quot;down.png&quot;),
    }
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每一个goroutine都知足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为如下结果:

func loadIcons() {
    icons = make(map[string]image.Image)
    icons[&quot;left&quot;] = loadIcon(&quot;left.png&quot;)
    icons[&quot;up&quot;] = loadIcon(&quot;up.png&quot;)
    icons[&quot;right&quot;] = loadIcon(&quot;right.png&quot;)
    icons[&quot;down&quot;] = loadIcon(&quot;down.png&quot;)
}

在这种状况下就会出现即便判断了icons不是nil也不意味着变量初始化完成了。考虑到这种状况,咱们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其余的goroutine操做,可是这样作又会引起性能问题。

使用sync.Once改造的示例代码以下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        &quot;left&quot;:  loadIcon(&quot;left.png&quot;),
        &quot;up&quot;:    loadIcon(&quot;up.png&quot;),
        &quot;right&quot;: loadIcon(&quot;right.png&quot;),
        &quot;down&quot;:  loadIcon(&quot;down.png&quot;),
    }
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

8.4.2 关闭channel示例

var wg sync.WaitGroup
var once sync.Once

func f1(ch1 chan&lt;- int) {
    defer wg.Done()
    for i := 0; i &lt; 100; i++ {
        ch1 &lt;- i
    }
    close(ch1)
}

func f2(ch1 &lt;-chan int, ch2 chan&lt;- int) {
    defer wg.Done()
    for {
        x, ok := &lt;-ch1
        if !ok {
            break
        }
        ch2 &lt;- x * x
    }
    once.Do(func() { close(ch2) }) // 确保某个操做只执行一次
}

func main() {
    a := make(chan int, 100)
    b := make(chan int, 100)
    wg.Add(3)
    go f1(a)
    go f2(a, b)
    go f2(a, b)
    wg.Wait()
    for ret := range b {
        fmt.Println(ret)
    }
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操做的时候是并发安全的而且初始化操做也不会被执行屡次。

8.5 sync.Map

Go语言中内置的map不是并发安全的。请看下面的示例:

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}

func set(key string, value int) {
    m[key] = value
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i &lt; 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            set(key, n)
            fmt.Printf(&quot;k=:%v,v:=%v\n&quot;, key, get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

上面的代码开启少许几个goroutine的时候可能没什么问题,当并发多了以后执行上面的代码就会报fatal error: concurrent map writes错误。

像这种场景下就须要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map同样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操做方法。

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i &lt; 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf(&quot;k=:%v,v:=%v\n&quot;, key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

9、原子操做

代码中的加锁操做由于涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型咱们还可使用原子操做来保证并发安全,由于原子操做是Go语言提供的方法它在用户态就能够完成,所以性能比加锁操做更好。Go语言中原子操做由内置的标准库sync/atomic提供。

9.1 atomic包

方法 解释
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
读取操做
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
写入操做
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操做
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操做
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操做

9.2 示例

咱们填写一个示例来比较下互斥锁和原子操做的性能。

var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
    // x = x + 1
    x++ // 等价于上面的操做
    wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
    l.Lock()
    x++
    l.Unlock()
    wg.Done()
}

// 原子操做版加函数
func atomicAdd() {
    atomic.AddInt64(&amp;x, 1)
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i &lt; 10000; i++ {
        wg.Add(1)
        // go add()       // 普通版add函数 不是并发安全的
        // go mutexAdd()  // 加锁版add函数 是并发安全的,可是加锁性能开销大
        go atomicAdd() // 原子操做版add函数 是并发安全,性能优于加锁版
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(x)
    fmt.Println(end.Sub(start))
}

atomic包提供了底层的原子级内存操做,对于同步算法的实现颇有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

10、练习题

  1. 使用goroutinechannel实现一个计算int64随机数各位数和的程序。
    1. 开启一个goroutine循环生成int64类型的随机数,发送到jobChan
    2. 开启24个goroutinejobChan中取出随机数计算各位数的和,将结果发送到resultChan
    3. goroutineresultChan取出结果并打印到终端输出
  2. 为了保证业务代码的执行性能将以前写的日志库改写为异步记录日志方式。
相关文章
相关标签/搜索