go--并发

goroutine

在go语言中无须像其余语言中同样维护线程池、进程等,而是使用goroutine进行调度和管理。golang

使用goroutine

go程序中使用go关键字为一个函数建立一个goroutine。一个函数能够被建立为多个goroutine,一个goroutine一定对应一个函数。安全

启动单个goroutine

启动goroutine的方式很是简单,只须要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。并发

举个例子以下:函数

func hello() {
    fmt.Println("Hello goroutine")
}

func main() {
    hello()
    fmt.Println("main goroutine done!")
}

这个实例中hello函数和下面的语句是串行的,执行的结果是打印完hello goroutine后打印main goroutine done!性能

接下来咱们在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数。优化

func main () {
    go hello()
    fmt.Println("main goroutine done!")
}

此次执行结果只打印了main goroutine done!,并无打印hello goroutine,由于在程序启动时,go程序就会为main()函数建立一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,全部在main()函数中启动的goroutine会一同结束。线程

此时,先要让main函数等一等hello函数,最简单的方式是使用Sleepcode

func main() {
    go hello()
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}

sync.WaitGroup

在代码中声音的使用time.Sleep不是最好的办法,go语言中可使用sync.WaitGroup来实现并发任务的同步。sync.WaitGroup方法以下:生命周期

方法 说明
(wg *WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup Wait()) 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值能够增减和减小。当计数器值为0时,表示全部并发任务已经完成。队列

咱们利用sync.WaitGroup将上面的代码优化一下

var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello goroutine!")
}

func main() {
    wg.Add(1)
    go hello()
    fmt.Println("main goroutine done!")
    wg.Wait()
}

goroutine与线程

可增加的栈

OS线程通常都有固定的栈内存(一般为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型状况下为2KB),goroutine的栈不是固定的,他能够按需增大和缩小,goroutine的栈大小限制能够达到1GB,虽然极少会用到这个大小。因此在go语言中一次建立十万左右的goroutine也是能够的。

goroutine调度

OS线程是由OS内核来调度的,goroutine则是由go运行时(runtime)本身的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度 m个goroutine到n个OS线程)。goroutine的调度不须要切换内核预警,因此调用一个goroutine比调度一个线程成本低不少。

GOMAXPROCS

go运行时的调度器使用GOMAXPROCS参数来肯定须要使用多少个OS线程来同时执行go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

go语言中可使用runtime.GOMAXPROCS函数设置当前程序并发时占用的CPU逻辑核心数。

go1.5版本以前,默认使用的是单核心执行。go1.5版本以后,默认是会用所有的CPU逻辑核心数。

channel

单纯地将函数并发执行时没有意义的。函数与函数之间须要交换数据才能体现并发执行函数的意义。

虽然可使用共享内存进行数据交换,可是共享内存在不一样的goroutine中容易发生竟态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种作法势必形成性能问题。

go语言的并发模型是CSP,提倡经过通讯共享内存而不是经过共享内存而实现通讯。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,老是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每个通道都是一个具体类型的导管,也就是声明channel的时候须要为其指定元素类型。

声明channel

声明通道类型的格式以下:

var 变量 chan 元素类型

var ch1 chan int
var ch2 chan bool
var ch3 chan []int

建立channel

通道是引用类型 ,通道类型的空值是nil

声明通道后须要使用make函数初始化后才能使用,建立channel的格式以下:

make (chan 元素类型, [缓冲个数])

channel操做

通道有发送(send)、接收(receive)和关闭(close)三种操做。发送和接收都使用<-符号。

ch := make(chan int)

// 发送
ch <- 10 // 把10发送到ch中

// 接收
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

// 关闭
close(ch)

关于关闭通道须要注意的事情是,只有在通知接收方goroutine全部的数据都发送完毕的时候才须要关闭通道。通道是能够被垃圾回收机制回收的,它和关闭文件是不同的,在结束操做以后关闭文件是必需要作的,但关闭通道不是必须的。

关闭后的通道有如下特色:

  • 对一个关闭的通道再发送值就会致使panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的而且没有值的通道执行接收操做会获得对应类型的零值。
  • 关闭一个已经关闭的通道会致使panic。

无缓冲的通道

无缓冲的通道又称为阻塞的通道。

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码可以经过编译,可是执行时会报deadlock的错

为何会出现死锁呢?由于ch := make(chal int)建立的是无缓冲通道,无缓冲通道只有在有人接收值的时候才能发送值。上面的代码会阻塞在ch := make(chan int)这一行造成死锁,解决这个问题能够用如下的方法:

// 启用一个goroutine区接收值

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}

func main() {
    ch := make(chan int)
    go recv(ch)
    ch <- 10
    fmt.Println("发送成功")
}

无缓冲通道上发送操做会阻塞,直到另外一个goroutine在该通道上执行接收操做,这时值才能发送成功,两个goroutine将继续执行。相反,若是接收操做先执行,接收方的goroutine将阻塞,直到另外一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通讯将致使发送和接收的goroutine同步化。所以,无缓冲通道也被称为同步通道

有缓冲的通道

解决上面的问题的方法还有一种就是使用有缓冲区的通道。有缓冲的通道即便用make函数初始化通道的时候为其指定一个容量。

func main() {
    ch := make(chal int, 1)    // 建立一个容量为1的有缓冲区的通道
    ch  <- 10
}

只要通道的容量大于0,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。

如何优雅的从通道循环取值

当经过通道发送有限的数据时,能够经过close函数关闭通道来告知从该通道接收值的goroutine中止等待。当通道被关闭时,往该通道发送值会引起panic,从该通道里接收的值一直都是类型零值,如何判断一个通道是否被关闭了呢?

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i := 0;i < 100;i++ {
            ch1 < i
        }
    }()

    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i, ok := <- ch1
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()

    // 在主goroutine中从ch2接收值打印
    for i := range ch2 {
        fmt.Println(i)
    }
}

单向通道

func counter(out chan <- int) {
    for i := 0; i < 100; i ++ {
        out <- i
    }

    close(out)
}

func squarer(out chan <- int, in <- chan int) {
    for i := range in {
        out <- i * i
    }

    close(out)
}

func printer(in <- chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中:

  • chan <- int是一个只能发送的通道,能够发送可是不能接收。
  • <- chan int是一个只能接收的通道,能够接收可是不能发送。

worker pool(goroutine池)

在工做中咱们一般会使用能够指定启动的goroutine数量的worker pool模式,控制goroutine的数量,防止gouroutine泄露和暴涨。

func worker(id int, jobs <- chan int, results chan <- int) {
    fmt.Printf("start worder: %d\n", id)
    for j := range jobs {
        fmt.Printf("worker:%d start job:%d\n", id, j)
        time.Sleep(time.Second)
        fmt.Printf("worker:%d end job:%d\n", id, j)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    fmt.Println("start fill")
    for j := 1; j <= 5;j ++ {
        jobs <- j
    }

    close(jobs)

    for a := 1; a <= 5; a ++ {
        <- results
    }
}

select多路复用

在某些场景下咱们须要同时从多个通道接收数据。通道在接收数据时,若是没有数据能够接收将会发生阻塞。你也许会写出一下代码使用遍从来实现:

for {
    data, ok := <- ch1
    data, ok := <- ch2
}

这种方式虽然能够实现从多个通道接收值的需求,可是运行性能会差不少。为了应对这种场景,go内置了select关键字,能够同时响应多个通道的操做。

select的使用相似于switch语句,它有一系列case分支和一个默认的分支。每一个case会对应一个通道的通讯(接收或发送)过程。select会一直等待,知道某个case的通讯操做完成时,就会执行case分支对应的语句。格式以下:

select {
    case <- ch1:
        ...
    case data := <- ch2:
        ...
    case ch3 <- data:
        ...
    default:
        默认操做
}

举个例子来演示:

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i ++ {
        select {
            case x := <- ch:
                fmt.Println(x)
            case ch <- i:
        }
    }
}

注:

  • 可处理一个活多个channel的发送/接收操做。
  • 若是多个case同时知足,select会随机选择一个。
  • 对于没有case的select{}会一直等待,可用于阻塞main函数

并发安全和锁

相关文章
相关标签/搜索