理解Go协程与并发

协程

Go语言里建立一个协程很简单,使用go关键字就可让一个普通方法协程化:git

package main

import (
    "fmt"
    "time"
)

func main(){
    fmt.Println("run in main coroutine.")

    for i:=0; i<10; i++ {
        go func(i int) {
            fmt.Printf("run in child coroutine %d.\n", i)
        }(i)
    }

    //防止子协程尚未结束主协程就退出了
    time.Sleep(time.Second * 1)
}

下面这些概念可能不太好理解,须要慢慢理解。能够先跳过,回头再来看。github

概念:golang

  1. 协程能够理解为纯用户态的线程,其经过协做而不是抢占来进行切换。相对于进程或者线程,协程全部的操做均可以在用户态完成,建立和切换的消耗更低。
  2. 一个进程内部能够运行多个线程,而每一个线程又能够运行不少协程。线程要负责对协程进行调度,保证每一个协程都有机会获得执行。当一个协程睡眠时,它要将线程的运行权让给其它的协程来运行,而不能持续霸占这个线程。同一个线程内部最多只会有一个协程正在运行。
  3. 协程能够简化为三个状态:运行态就绪态休眠态。同一个线程中最多只会存在一个处于运行态的协程。就绪态协程是指那些具有了运行能力可是尚未获得运行机会的协程,它们随时会被调度到运行态;休眠态的协程还不具有运行能力,它们是在等待某些条件的发生,好比 IO 操做的完成、睡眠时间的结束等。
  4. 子协程的异常退出会将异常传播到主协程,直接会致使主协程也跟着挂掉。

协程通常用 TCP/HTTP/RPC服务、消息推送系统、聊天系统等。使用协程,咱们能够很方便的搭建一个支持高并发的TCP或HTTP服务端。缓存

通道

通道的英文是Channels,简称chan。何时要用到通道呢?能够先简单的理解为:协程在须要协做通讯的时候就须要用通道。安全

在GO里,不一样的并行协程之间交流的方式有两种,一种是经过共享变量,另外一种是经过通道。Go 语言鼓励使用通道的形式来交流。bash

举个简单的例子,咱们使用协程实现并发调用远程接口,最终咱们须要把每一个协程请求回来的数据进行汇总一块儿返回,这个时候就用到通道了。数据结构

建立通道

建立通道(channel)只能使用make函数:并发

c := make(chan int)

通道是区分类型的,如这里的int异步

Go 语言为通道的读写设计了特殊的箭头语法糖 <-,让咱们使用通道时很是方便。把箭头写在通道变量的右边就是写通道,把箭头写在通道的左边就是读通道。一次只能读写一个元素。函数

c := make(chan bool)
c <- true //写入
<- c //读取

缓冲通道

上面咱们介绍了默认的非缓存类型的channel,不过Go也容许指定channel的缓冲大小,很简单,就是channel能够存储多少元素:

c := make(chan int, value)

value = 0 时,通道是无缓冲阻塞读写的,等价于make(chan int);当value > 0 时,通道有缓冲、是非阻塞的,直到写满 value 个元素才阻塞写入。具体说明下:

非缓冲通道
不管是发送操做仍是接收操做,一开始执行就会被阻塞,直到配对的操做也开始执行才会继续传递。因而可知,非缓冲通道是在用同步的方式传递数据。也就是说,只有收发双方对接上了,数据才会被传递。数据是直接从发送方复制到接收方的,中间并不会用非缓冲通道作中转。

缓冲通道
缓冲通道能够理解为消息队列,在有容量的时候,发送和接收是不会互相依赖的。用异步的方式传递数据。

下面咱们用一个例子来理解一下:

package main

import "fmt"

func main() {
    var c = make(chan int, 0)
    var a string

    go func() {
        a = "hello world"
        <-c
    }()

    c <- 0
    fmt.Println(a)
}

这个例子输出的必定是hello world。可是若是你把通道的容量由0改成大于0的数字,输出结果就不必定是hello world了,极可能是空。为何?

当通道是无缓冲通道时,执行到c <- 0,通道满了,写操做会被阻塞住,直到执行<-c解除阻塞,后面的语句接着执行。

要是改为非阻塞通道,执行到c <- 0,发现还能写入,主协程就不会阻塞了,但这时候输出的是空字符串仍是hello world,取决因而子协程和主协程哪一个运行的速度快。

通道做为容器,它能够像切片同样,使用 cap()len() 全局函数得到通道的容量和当前内部的元素个数。

模拟消息队列

上一节"协程"的例子里,咱们在主协程里加了个time.Sleep(),目的是防止子协程尚未结束主协程就退出了。可是对于实际生活的大多数场景来讲,1秒是不够的,而且大部分时候咱们都没法预知for循环内代码运行时间的长短。这时候就不能使用time.Sleep() 来完成等待操做了。下面咱们用通道来改写:

package main

import (
    "fmt"
)

func main() {
    fmt.Println("run in main coroutine.")

    count := 10
    c := make(chan bool, count)

    for i := 0; i < count; i++ {
        go func(i int) {
            fmt.Printf("run in child coroutine %d.\n", i)
            c <- true
        }(i)
    }

    for i := 0; i < count; i++ {
        <-c
    }
}

单向通道

默认的通道是支持读写的,咱们能够定义单向通道:

//只读
var readOnlyChannel = make(<-chan int)

//只写
var writeOnlyChannel = make(chan<- int)

下面是一个示例,咱们模拟消息队列的消费者、生产者:

package main

import (
    "fmt"
    "time"
)

func Producer(c chan<- int) {
    for i := 0; i < 10; i++ {
        c <- i
    }
}

func Consumer1(c <-chan int) {
    for m := range c {
        fmt.Printf("oh, I get luckly num: %v\n", m)
    }
}

func Consumer2(c <-chan int) {
    for m := range c {
        fmt.Printf("oh, I get luckly num too: %v\n", m)
    }
}

func main() {
    c := make(chan int, 2)

    go Consumer1(c)
    go Consumer2(c)

    Producer(c)

    time.Sleep(time.Second)
}

对于生产者,咱们但愿通道是只写属性,而对于消费者则是只读属性,这样避免对通道进行错误的操做。固然,若是你将本例里消费者、生产者的通道单向属性去掉也是能够的,没什么问题:

func Producer(c chan int) {}
func Consumer1(c chan int) {}
func Consumer2(c chan int) {}

事实上 channel 只读或只写都没有意义,所谓的单向 channel 其实只是方法里声明时用,若是后续代码里,向原本用于读channel里写入了数据,编译器会提示错误。

关闭通道

读取一个已经关闭的通道会当即返回通道类型的零值,而写一个已经关闭的通道会抛异常。若是通道里的元素是整型的,读操做是不能经过返回值来肯定通道是否关闭的。

一、如何安全的读通道,确保不是读取的已关闭通道的零值
答案是使用for...range语法。当通道为空时,循环会阻塞;当通道关闭,循环会中止。经过循环中止,咱们能够认为通道已经关闭。示例:

package main

import "fmt"

func main() {
    var c = make(chan int, 3)

    //子协程写
    go func() {
        c <- 1
        close(c)
    }()

    //直接读取通道,存在不知道子协程是否已关闭的状况
    //fmt.Println(<-c)
    //fmt.Println(<-c)

    //主协程读取:使用for...range安全的读取
    for value := range c {
        fmt.Println(value)
    }
}

输出:

1

二、如何安全的写通道,确保不会写入已关闭的通道?
Go 语言并不存在一个内置函数能够判断出通道是否已经被关闭。确保通道写安全的最好方式是由负责写通道的协程本身来关闭通道,读通道的协程不要去关闭通道。

可是这个方法只能解决单写多读的场景。若是遇到多写单读的状况就有问题了:没法知道其它写协程何时写完,那么也就不能肯定何时关闭通道。这个时候就得额外使用一个通道专门作这个事情。

咱们可使用内置的 sync.WaitGroup,它使用计数来等待指定事件完成:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {

    var ch = make(chan int, 8)

    //写协程
    var wg = new(sync.WaitGroup)

    for i := 1; i <= 4; i++ {
        wg.Add(1)
        go func(num int, ch chan int, wg *sync.WaitGroup) {
            defer wg.Done()
            ch <- num
            ch <- num * 10
        }(i, ch, wg)
    }

    //读
    go func(ch chan int) {
        for num := range ch {
            fmt.Println(num)
        }
    }(ch)

    //Wait阻塞等待全部的写通道协程结束,待计数值变成零,Wait才会返回
    wg.Wait()

    //安全的关闭通道
    close(ch)

    //防止读取通道的协程尚未完毕
    time.Sleep(time.Second)

    fmt.Println("finish")
}

输出:

3
30
2
20
1
10
4
40
finish

多路通道

有时候还会遇到多个生产者,只要有一个生产者就绪,消费者就能够进行消费的状况。这个时候可使用go语言提供的select 语句,它能够同时管理多个通道读写,若是全部通道都不能读写,它就总体阻塞,只要有一个通道能够读写,它就会继续。示例:

package main

import (
    "fmt"
    "time"
)

func main() {

    var ch1 = make(chan int)
    var ch2 = make(chan int)

    fmt.Println(time.Now().Format("15:04:05"))

    go func(ch chan int) {
        time.Sleep(time.Second)
        ch <- 1
    }(ch1)

    go func(ch chan int) {
        time.Sleep(time.Second * 2)
        ch <- 2
    }(ch2)

    for {
        select {
            case v := <-ch1:
                fmt.Println(time.Now().Format("15:04:05") + ":来自ch1:", v)
            case v := <-ch2:
                fmt.Println(time.Now().Format("15:04:05") + ":来自ch2:", v)
            //default:
                //fmt.Println("channel is empty !")
        }
    }
}

输出:

13:39:56
13:39:57:来自ch1: 1
13:39:58:来自ch2: 2
fatal error: all goroutines are asleep - deadlock!

默认select处于阻塞状态,1s后,子协程1完成写入,主协程读出了数据;接着子协程2完成写入,主协程读出了数据;接着主协程挂掉了,缘由是主协程发如今等一个永远不会来的数据,这显然是没有结果的,干脆就直接退出了。

若是把注释的部分打开,那么程序在打印出来自ch一、ch2的数据后,就会一直执行default里面的程序。这个时候程序不会退出。缘由是当 select 语句全部通道都不可读写时,若是定义了 default 分支,那就会执行 default 分支逻辑。

注:select{}代码块是一个没有任何caseselect,它会一直阻塞。

Chan的应用场景

golang中chan的应用场景总结
https://github.com/nange/blog/issues/9

Go语言之Channels实际应用
https://www.s0nnet.com/archives/go-channels-practice

  • 消息队列
  • 并发请求
  • 模拟锁的功能
  • 模拟sync.WaitGroup
  • 并行计算

通道原理部分能够根据文末给出的参考连接《快学 Go 语言》第 12 课 —— 通道去查看。

并发锁

互斥所

go语言里的map是线程不安全的:

package main

import "fmt"

func write(d map[string]string) {
    d["name"] = "yujc"
}

func read(d map[string]string) {
    fmt.Println(d["name"])
}

func main() {
    d := map[string]string{}
    go read(d)
    write(d)
}

Go 语言内置了数据结构竞态检查工具来帮咱们检查程序中是否存在线程不安全的代码,只要在运行的时候加上-race参数便可:

$ go run -race main.go 
==================
WARNING: DATA RACE
Read at 0x00c0000a8180 by goroutine 6:

...

yujc
Found 2 data race(s)
exit status 66

能够看出,上面的代码存在安全隐患。

咱们可使用sync.Mutex来保护map,原理是在每次读写操做以前使用互斥锁进行保护,防止其余线程同时操做:

package main

import (
    "fmt"
    "sync"
)

type SafeDict struct {
    data map[string]string
    mux  *sync.Mutex
}

func NewSafeDict(data map[string]string) *SafeDict {
    return &SafeDict{
        data: data,
        mux:  &sync.Mutex{},
    }
}

func (d *SafeDict) Get(key string) string {
    d.mux.Lock()
    defer d.mux.Unlock()
    return d.data[key]
}

func (d *SafeDict) Set(key string, value string) {
    d.mux.Lock()
    defer d.mux.Unlock()
    d.data[key] = value
}

func main(){
    dict := NewSafeDict(map[string]string{})

    go func(dict *SafeDict) {
        fmt.Println(dict.Get("name"))
    }(dict)

    dict.Set("name", "yujc")
}

运行检测:

$ go run -race main.go 
yujc

上面的代码若是不使用-race运行,不必定会有结果,取决于主协程、子协程哪一个先运行。

注意:sync.Mutex 是一个结构体对象,这个对象在使用的过程当中要避免被浅拷贝,不然起不到保护做用。应尽可能使用它的指针类型。

上面的代码里咱们多处使用了d.mux.Lock(),可否简化成d.Lock()呢?答案是能够的。咱们知道,结构体能够自动继承匿名内部结构体的全部方法:

type SafeDict struct {
    data map[string]string
    *sync.Mutex
}

func NewSafeDict(data map[string]string) *SafeDict {
    return &SafeDict{data, &sync.Mutex{}}
}

func (d *SafeDict) Get(key string) string {
    d.Lock()
    defer d.Unlock()
    return d.data[key]
}

这样就完成了简化。

读写锁

对于读多写少的场景,可使用读写锁代替互斥锁,能够提升性能。

读写锁提供了下面4个方法:

  • Lock() 写加锁
  • Unlock() 写释放锁
  • RLock() 读加锁
  • RUnlock() 读释放锁

写锁排它锁,加写锁时会阻塞其它协程再加读锁写锁读锁共享锁,加读锁还能够容许其它协程再加读锁,可是会阻塞加写锁读写锁在写并发高的状况下性能退化为普通的互斥锁

咱们把上节中的互斥锁换成读写锁:

package main

import (
    "fmt"
    "sync"
)

type SafeDict struct {
    data map[string]string
    *sync.RWMutex
}

func NewSafeDict(data map[string]string) *SafeDict {
    return &SafeDict{data, &sync.RWMutex{}}
}

func (d *SafeDict) Get(key string) string {
    d.RLock()
    defer d.RUnlock()
    return d.data[key]
}

func (d *SafeDict) Set(key string, value string) {
    d.Lock()
    defer d.Unlock()
    d.data[key] = value
}

func main(){
    dict := NewSafeDict(map[string]string{})

    go func(dict *SafeDict) {
        fmt.Println(dict.Get("name"))
    }(dict)

    dict.Set("name", "yujc")
}

改完后,使用竞态检测工具检测仍是能经过的。

参考

一、make(chan int) 和 make(chan int, 1) 的区别
https://www.jianshu.com/p/f12e1766c19f
二、channel
https://www.jianshu.com/p/4d97dc032730
三、《快学 Go 语言》第 12 课 —— 通道
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484601&idx=1&sn=97c0de2acc3127c9e913b6338fa65737
四、《快学 Go 语言》第 13 课 —— 并发与安全
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484683&idx=1&sn=966cb818f034ffd4538eae7a61cd0c58

相关文章
相关标签/搜索