MIT 6.824学习笔记3 Go语言并发解析

以前看过一个go语言并发的介绍:https://www.cnblogs.com/pdev/p/10936485.html   但这个太简略啦。下面看点深刻的html

还记得http://www.javashuo.com/article/p-kacbshul-ew.html中咱们写过一个简单的爬虫。这里面就用到了Go的两种并发方式:golang

 


1.    Go routines和Go channels(ConcurrentChannel),这是Go语言特有的一种并发方式,能够简化编程

1.1 Go routines

Goroutines 能够看做是轻量级线程。建立一个 goroutine 很是简单,只须要把 go 关键字放在函数调用语句前。为了说明这有多么简单,咱们建立两个 finder 函数,并用 go 调用,让它们每次找到 "ore" 就打印出来。算法

package main
import (
    "fmt"
    "time"
    "math/rand"
)

func finder(mines [5]string, coreid int) {
    <-time.After(time.Second * time.Duration(coreid))
    rand.Seed(time.Now().UnixNano())
    idx := rand.Intn(5)
    fmt.Println(time.Now(), coreid, mines[idx])
}

func main() {
    theMine := [5]string{"rock", "ore", "gold", "copper", "sliver"}
    go finder(theMine, 1)
    go finder(theMine, 2)
    <-time.After(time.Second * 3) //you can ignore this for now
    fmt.Println(time.Now(), "END")
}

程序的输出以下:
F:\My Drive\19summer\
6824>go run gor.go 2019-08-01 17:45:41.0985917 -0500 CDT m=+1.001057201 1 ore 2019-08-01 17:45:42.0986489 -0500 CDT m=+2.001114401 2 ore 2019-08-01 17:45:43.0987061 -0500 CDT m=+3.001171601 END

从执行时间能够看出,两个finder是并发运行的编程

但这两个线程是彼此独立的。若是他们须要交流信息呢?就须要Go channel了。segmentfault

 

1.2 Go Channel

Channels 容许 go routines 之间相互通讯。你能够把 channel 看做管道,goroutines 能够往里面发消息,也能够从中接收其它 go routines 的消息。安全

myFirstChannel := make(chan string)并发

Goroutines 能够往 channel 发送消息,也能够从中接收消息。这是经过箭头操做符 (<-) 完成的,它指示 channel 中的数据流向。ide

myFirstChannel <-"hello" // Send函数

myVariable := <- myFirstChannel // Receiveoop

 再来看一个程序:

package main
import (
    "fmt"
    "time"
)

func main() {
    theMine := [5]string{"ore1", "ore2", "ore3", "ore4", "ore5"}
    oreChan := make(chan string)

    // Finder
    go func(mine [5]string) {
        for _, item := range mine {
            oreChan <- item //send
            fmt.Println("Miner: Send " + item + " to breaker")
        }
    }(theMine)

    // Ore Breaker
    go func() {
        for i := 0; i < 5; i++ {
            foundOre := <-oreChan //receive
            <-time.After(time.Nanosecond * 10)
            fmt.Println("Miner: Receive " + foundOre + " from finder")
        }
    }()

    <-time.After(time.Second * 5) // Again, ignore this for now
}

程序的输出以下:

F:\My Drive\19summer\6824>go run gor2.go
Miner: Send ore1 to breaker
Miner: Receive ore1 from finder
Miner: Send ore2 to breaker
Miner: Receive ore2 from finder
Miner: Send ore3 to breaker
Miner: Receive ore3 from finder
Miner: Send ore4 to breaker
Miner: Receive ore4 from finder
Miner: Send ore5 to breaker
Miner: Receive ore5 from finder

能够看到已经能够经过go channel在线程之间进行通讯啦!

在receive和fmt.Println之间的<-time.After(time.Nanosecond * 10)是为了方便在命令行查看输出,不然由于cpu运行程序太快了,命令行打印顺序会和实际运行顺序不同。

 

1.3 阻塞的Go Channel

默认的,信道的存消息和取消息都是阻塞的 (叫作无缓冲的信道)。也就是说, 无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另外一端已经准备好。Channels 阻塞 goroutines 发生在各类情形下。这能在 goroutines 各自欢快地运行以前,实现彼此之间的短暂同步。

Blocking on a Send:一旦一个 goroutine(gopher) 向一个 channel 发送了数据,它就被阻塞了,直到另外一个 goroutine 从该 channel 取走数据。

Blocking on a Receive:和发送时情形相似,当channel是空的时,一个 goroutine 可能阻塞着等待从一个 channel 获取数据。

一开始接触阻塞的概念可能使人有些困惑,但你能够把它想象成两个 goroutines(gophers) 之间的交易。 其中一个 gopher 不管是等着收钱仍是送钱,都须要等待交易的另外一方出现。

 

既然已经了解 goroutine 经过 channel 通讯可能发生阻塞的不一样情形,让咱们讨论两种不一样类型的 channels: unbuffered 和 buffered 。选择使用哪种 channel 可能会改变程序的运行表现。

Unbuffered Channels:在前面的例子中咱们一直在用 unbuffered channels,它们不同凡响的地方在于每次只有一份数据能够经过。不管如何,咱们测试到的无缓冲信道的大小都是0 (len(channel))

Buffered Channels:在并发程序中,时间协调并不老是完美的。在挖矿的例子中,咱们可能遇到这样的情形:开矿 gopher 处理一块矿石所花的时间,寻矿 gohper 可能已经找到 3 块矿石了。为了避免让寻矿 gopher 浪费大量时间等着给开矿 gopher 传送矿石,咱们可使用 buffered channel。咱们先建立一个容量为 3 的 buffered channel。

bufferedChan := make(chan string, 3)

buffered 和 unbuffered channels 工做原理相似,但有一点不一样—在须要另外一个 gorountine 取走数据以前,咱们能够向 buffered channel 发送3份数据,而在buffer满以前都不会发生阻塞,而当第4份数据发过来时就会发生阻塞。也就是说,缓冲信道会在满容量的时候加锁。

无缓冲区的channel能够理解为make(chan string, 0) 

例以下面的程序:

package main
import (
    "fmt"
    "time"
)

func main() {
    bufferedChan := make(chan string, 3)

    go func() {
        bufferedChan <-"first"
        fmt.Println("Sent 1st")
        bufferedChan <-"second"
        fmt.Println("Sent 2nd")
        bufferedChan <-"third"
        fmt.Println("Sent 3rd")
    }()

    <-time.After(time.Second * 1)

    go func() {
        firstRead := <- bufferedChan
        fmt.Println("Receiving..")
        fmt.Println(firstRead)
        secondRead := <- bufferedChan
        fmt.Println(secondRead)
        thirdRead := <- bufferedChan
        fmt.Println(thirdRead)
    }()

    <-time.After(time.Second * 5) // Again, ignore this for now
}

输出结果以下:

F:\My Drive\19summer\6824>go run gor2.go
Sent 1st
Sent 2nd
Sent 3rd
Receiving..
first
second
third

相比最初的例子,已经有了很大改进!如今每一个函数都独立地运行在各自的 goroutines 中。此外,每次处理完一块矿石,它就会被带进挖矿流水线的下一个阶段。

 

其实,缓冲信道是先进先出的,咱们能够把缓冲信道看做为一个线程安全的队列:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

 

 1.4 其余一些概念

匿名的 Goroutines

 咱们能够用以下方式建立一个匿名函数并运行在它的 goroutine 中。若是只须要调用一次函数,经过这种方式咱们可让它在本身的 goroutine 中运行,而不须要建立一个正式的函数声明。

go func() {
    fmt.Println("I'm running in my own go routine")
}()

和匿名函数的定义很是像

 

main 函数是一个 goroutine

main 函数确实运行在本身的 goroutine 中!更重要的是要知道,一旦 main 函数返回,它将关掉当前正在运行的其余 goroutines。这就是为何咱们在 main 函数的最后设置了一个定时器—它建立了一个 channel,并在 5 秒后发送一个值。经过添加上面这行代码,main routine 会阻塞,以给其余 goroutines 5 秒的时间来运行。不然主线程就会过早结束,致使finder没有机会执行

<-time.After(time.Second * 5) // Receiving from channel after 5 sec

但是采用等待的办法并很差,若是能像Python同样有个thread.join()来阻塞主线程,等待全部子线程跑完就行了。

有一种方法能够阻塞 main 函数直到其余全部 goroutines 都运行完。一般的作法是建立一个 done channel, main 函数在等待读取它时被阻塞。一旦完成工做,向这个 channel 发送数据,程序就会结束了。

func main() {
    doneChan := make(chan string)

    go func() {
        // Do some work…
        doneChan <- "I'm all done!"
    }()

    <-doneChan // block until go routine signals work is done
}

 

能够遍历 channel

在前面的例子中咱们让 miner 在 for 循环中迭代 3 次从 channel 中读取数据。若是咱们不能确切知道将从 finder 接收多少块矿石呢?

相似于对集合数据类型 (注: 如 slice) 进行遍历,你也能够遍历一个 channel。更新前面的 miner 函数,咱们能够这样写:

// Ore Breaker
go func() {
    for foundOre := range oreChan {
        fmt.Println("Miner: Received " + foundOre + " from finder")
    }
}()

因为 miner 须要读取 finder 发送给它的全部数据,遍历 channel 能确保咱们接收到已经发送的全部数据。

 

注意遍历 channel 会阻塞,直到有新数据被发送到 channel。下面这个程序就会发生死锁:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    for v := range ch {
        fmt.Println(v)
    }
}

缘由是range不等到信道关闭是不会结束读取的。也就是若是 缓冲信道干涸了,那么range就会阻塞当前goroutine, 因此死锁咯。在全部数据发送完以后避免 go routine 阻塞的惟一方法就是用 "close(channel)" 关掉 channel。以下程序

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// 显式地关闭信道
close(ch)

for v := range ch {
    fmt.Println(v)
}

 被关闭的信道会禁止数据流入, 是只读的。咱们仍然能够从关闭的信道中取出数据,可是不能再写入数据了。

 

对 channel 进行非阻塞读写(不用担忧channel空/满形成阻塞)

有一个技巧,利用 Go 的 select case 语句能够实现对 channel 的非阻塞读。经过使用这这种语句,若是 channel 有数据,goroutine 将会从中读取,不然就执行默认的分支。

myChan := make(chan string)

go func(){
    myChan <- "Message!"
}()

select {
case msg := <- myChan: fmt.Println(msg) default: fmt.Println("No Msg") }
<-time.After(time.Second * 1) select { case msg := <- myChan: fmt.Println(msg) default: fmt.Println("No Msg") } 程序输出以下: No Msg Message!

非阻塞写也是使用一样的 select case 语句来实现,惟一不一样的地方在于,case 语句看起来像是发送而不是接收。

select {
    case myChan <- "message":
        fmt.Println("sent the message")
    default:
        fmt.Println("no message sent")
}

 

1.5 并发和并行

默认地, Go全部的goroutines只能在一个线程(一个cpu核心)里跑 。 也就是说, 两个go routine不是并行的,可是是并发的。在同一个原生线程里,若是当前goroutine不发生阻塞,它是不会让出CPU时间给其余同线程的goroutines的,这是Go运行时对goroutine的调度,咱们也可使用runtime包来手工调度。

前面带有sleep的程序看时间像是“并行”的,是由于sleep函数则阻塞掉了 当前goroutine, 当前goroutine主动让其余goroutine执行, 因此造成了逻辑上的并行, 也就是并发。而对于下面这段程序,两个goroutine是一个一个进行的,打印的结果老是同样的:

var quit chan int = make(chan int)

func loop() {
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", i)
    }
    quit <- 0
}

func main() {
    go loop()
    go loop()

    for i := 0; i < 2; i++ {
        <- quit
    }
}

F:\My Drive\19summer\6824>go run gor2.go
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

 

还有一个颇有意思的例子:https://segmentfault.com/q/1010000000207474

 

为了能实现真正的多核并行,咱们须要用到runtime包(runtime包是goroutine的调度器),来显式的指定要用两个核心。有两种实现方案:

1. 指定要用几个核

package main
import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(coreid int) {
    for i := 0; i < 1000; i++ { //为了观察,跑多些
        fmt.Printf("%d-%d ", coreid, i)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多使用2个核

    go loop(0)
    go loop(1)

    for i := 0; i < 2; i++ {
        <- quit
    }
}

这种输出将会是不规律的两个线程交替输出,达到了真正的并行

2. 显式地让出CPU时间( 其实这种主动让出CPU时间的方式仍然是在单核里跑。但手工地切换goroutine致使了看上去的“并行”。)

package main
import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(coreid int) {
    for i := 0; i < 10; i++ { //为了观察,跑多些
        runtime.Gosched() // 显式地让出CPU时间给其余goroutine
        fmt.Printf("%d-%d ", coreid, i)
    }
    quit <- 0
}

func main() {
    go loop(0)
    go loop(1)

    for i := 0; i < 2; i++ {
        <- quit
    }
}

输出是很是有规律的交替进行:
F:\My Drive\19summer\6824>go run gor2.go
1-0 0-0 1-1 0-1 1-2 0-2 1-3 0-3 1-4 0-4 1-5 0-5 1-6 0-6 1-7 0-7 1-8 0-8 1-9 0-9


 关于runtime包几个函数:

  • Gosched 让出cpu
  • NumCPU 返回当前系统的CPU核数量
  • GOMAXPROCS 设置最大的可同时使用的CPU核数
  • Goexit 退出当前goroutine(可是defer语句会照常执行)

咱们知道“进程是资源分配的最小单位,线程是CPU调度的最小单位”。那么go routine和线程有什么关系呢?能够看go官方文档中的一段话(https://golang.org/doc/faq#goroutines):

Why goroutines instead of threads?

Goroutines are part of making concurrency easy to use. The idea, which has been around for a while, is to multiplex independently executing functions—coroutines(协程)—onto a set of threads. When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won't be blocked. The programmer sees none of this, which is the point. The result, which we call goroutines, can be very cheap: they have little overhead beyond the memory for the stack, which is just a few kilobytes.

To make the stacks small, Go's run-time uses resizable, bounded stacks. A newly minted goroutine is given a few kilobytes, which is almost always enough. When it isn't, the run-time grows (and shrinks) the memory for storing the stack automatically, allowing many goroutines to live in a modest amount of memory. The CPU overhead averages about three cheap instructions per function call. It is practical to create hundreds of thousands of goroutines in the same address space. If goroutines were just threads, system resources would run out at a much smaller number.

 

协程能够理解为同一个线程经过上下文切换来“超线程”,并发执行两个工做。( https://www.liaoxuefeng.com/wiki/897692888725344/923057403198272 )

对于 进程、线程,都是有内核进行调度,有 CPU 时间片的概念,进行 抢占式调度(有多种调度算法)
对于 协程(用户级线程),这是对内核透明的,也就是系统并不知道有协程的存在,是彻底由用户本身的程序进行调度的, 由于是由用户程序本身控制,那么就很难像抢占式调度那样作到强制的 CPU 控制权切换到其余进程
/线程,一般只能进行 协做式调度,须要协程本身主动把控制权转让出去以后,其余协程才能被执行到。
本质上,goroutine 就是协程。 不一样的是,Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装
和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的CPU (P) 转让出去,让其余 goroutine
能被调度并执行,也就是 Golang 从语言层面支持了协程。Golang 的一大特点就是从语言层面原生支持协程,在函数或
者方法前面加 go关键字就可建立一个协程。

http://www.javashuo.com/article/p-xzfhmmyd-er.html

 

假设咱们开了三个Goroutine,但只分配了两个核(两个线程),会发生什么呢?写段程序来试验一下:

package main

import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(id int) { // id: 该goroutine的标号
    for i := 0; i < 100; i++ { //打印10次该goroutine的标号
        fmt.Printf("%d ", id)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多同时使用2个核

    for i := 0; i < 3; i++ { //开三个goroutine
        go loop(i)
    }

    for i := 0; i < 3; i++ {
        <- quit
    }
}


输出结果有不少种:
F:\My Drive\19summer\6824>go run gor2.go
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
F:\My Drive\19summer\6824>go run gor2.go
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
F:\My Drive\19summer\6824>
  • 有时会发生抢占式输出(说明Go开了不止一个原生线程,达到了真正的并行)
  • 有时会顺序输出, 打印完0再打印1, 再打印2(说明Go开一个原生线程,单线程上的goroutine不阻塞不松开CPU)

那么,咱们还会观察到一个现象,不管是抢占地输出仍是顺序的输出,都会有那么两个数字表现出这样的现象:一个数字的全部输出都会在另外一个数字的全部输出以前

缘由是, 3个goroutine分配到至多2个线程上,就会至少两个goroutine分配到同一个线程里,单线程里的goroutine 不阻塞不放开CPU, 也就发生了顺序输出。

 

Ref:

Go并发的一些应用:https://blog.csdn.net/kjfcpua/article/details/18265475

https://stackoverflow.com/questions/13107958/what-exactly-does-runtime-gosched-do

https://studygolang.com/articles/13875

https://blog.csdn.net/kjfcpua/article/details/18265441

https://blog.csdn.net/kjfcpua/article/details/18265461

 


 

2.    基于共享变量的并发(ConcurrentMutex),能够理解成传统的使用加锁/解锁和信号量来手动处理并发

 

 

 

1111

相关文章
相关标签/搜索