Go channel系列:html
channel用于goroutines之间的通讯,让它们之间能够进行数据交换。像管道同样,一个goroutine_A向channel_A中放数据,另外一个goroutine_B从channel_A取数据。golang
channel是指针类型的数据类型,经过make来分配内存。例如:bash
ch := make(chan int)
这表示建立一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另外一端只能今后channel中读出int类型的值。并发
须要注意,chan TYPE
才表示channel的类型。因此其做为参数或返回值时,需指定为xxx chan int
相似的格式。dom
向ch这个channel放数据的操做形式为:异步
ch <- VALUE
从ch这个channel读数据的操做形式为:函数
<-ch // 从ch中读取一个值 val = <-ch val := <-ch // 从ch中读取一个值并保存到val变量中 val,ok = <-ch // 从ch读取一个值,判断是否读取成功,若是成功则保存到val变量中
其实很简单,当ch出如今<-
的左边表示send,当ch出如今<-
的右边表示recv。ui
例如:命令行
package main import ( "fmt" "time" ) func main() { ch := make(chan string) go sender(ch) // sender goroutine go recver(ch) // recver goroutine time.Sleep(1e9) } func sender(ch chan string) { ch <- "malongshuai" ch <- "gaoxiaofang" ch <- "wugui" ch <- "tuner" } func recver(ch chan string) { var recv string for { recv = <-ch fmt.Println(recv) } }
输出结果:指针
malongshuai gaoxiaofang wugui tuner
上面激活了一个goroutine用于执行sender()函数,该函数每次向channel ch中发送一个字符串。同时还激活了另外一个goroutine用于执行recver()函数,该函数每次从channel ch中读取一个字符串。
注意上面的recv = <-ch
,当channel中没有数据可读时,recver goroutine将会阻塞在此行。因为recver中读取channel的操做放在了无限for循环中,表示recver goroutine将一直阻塞,直到从channel ch中读取到数据,读取到数据后进入下一轮循环由被阻塞在recv = <-ch
上。直到main中的time.Sleep()指定的时间到了,main程序终止,全部的goroutine将所有被强制终止。
由于receiver要不断从channel中读取可能存在的数据,因此receiver通常都使用一个无限循环来读取channel,避免sender发送的数据被丢弃。
每一个channel都有3种操做:send、receive和close
例如,判断channel是否被关闭:
val, ok := <-counter if ok { fmt.Println(val) }
由于关闭通道也会让recv成功读取(只不过读取到的值为类型的空值),使得本来阻塞在recv操做上的goroutine变得不阻塞,借此技巧能够实现goroutine的执行前后顺序。具体示例见后文:指定goroutine的执行顺序。
channel分为两种:unbuffered channel和buffered channel
能够认为阻塞和不阻塞是由channel控制的,不管是send仍是recv操做,都是在向channel发送请求:
buffered channel有两个属性:容量和长度:和slice的capacity和length的概念是同样的
make(chan TYPE,CAP)
unbuffered channel能够认为是容量为0的buffered channel,因此每发送一个数据就被阻塞。注意,不是容量为1的buffered channel,由于容量为1的channel,是在channel中已有一个数据,并发送第二个数据的时候才被阻塞。
换句话说,send被阻塞的时候,实际上是没有发送成功的,只有被另外一端读走一个数据以后才算是send成功。对于unbuffered channel来讲,这是send/recv的同步模式。而buffered channel则是在每次发送数据到通道的时候,(通道)都向发送者返回一个消息,容量未满的时候返回成功的消息,发送者所以而不会阻塞,容量已满的时候由于已满而迟迟不返回消息,使得发送者被阻塞。
实际上,当向一个channel进行send的时候,先关闭了channel,再读取channel时会发现错误在send,而不是recv。它会提示向已经关闭了的channel发送数据。
func main() { counter := make(chan int) go func() { counter <- 32 }() close(counter) fmt.Println(<-counter) }
输出报错:
panic: send on closed channel
因此,在Go的内部行为中,send和recv是一个总体行为,数据未读就表示未send成功。
有两种特殊的channel:nil channel和channal类型的channel。
当未为channel分配内存时,channel就是nil channel,例如var ch1 chan int
。nil channel会永远阻塞对该channel的读、写操做。
nil channel在某些时候有些妙用,例如在select(关于select,见后文)的某个case分支A将其它某case分支B所操做的channel忽然设置为nil,这将会禁用case分支B。
当channel的类型为一个channel时,就是channel的channel,也就是双层通道。例如:
var chch1 chan chan int
channel的channel是指通道里的数据是通道,能够认为通道里面嵌套了一个或多个通道:只能将整个通道发送到外层通道,读取外层通道时获取到的是内层通道,而后能够操做内层通道。
// 发送通道给外层通道 chch1 <-ch1 chch1 <-ch2 // 从外层通道取出内层通道 c <-chch1 // 操做取出的内层通道 c <-123 val := <-c
channel of channel的妙用之一是将外层通道做为通道的加工厂:在某个goroutine中不断生成通道,在其它goroutine能够不断取出通道来操做。
当channel的某一端(sender/receiver)期待另外一端的(receiver/sender)操做,另外一端正好在期待本端的操做时,也就是说两端都由于对方而使得本身当前处于阻塞状态,这时将会出现死锁问题。
更通俗地说,只要全部goroutine都被阻塞,就会出现死锁。
好比,在main函数中,它有一个默认的goroutine,若是在此goroutine中建立一个unbuffered channel,并在main goroutine中向此channel中发送数据并直接receive数据,将会出现死锁:
package main import ( "fmt" ) func main (){ goo(32) } func goo(s int) { counter := make(chan int) counter <- s fmt.Println(<-counter) }
在上面的示例中,向unbuffered channel中send数据的操做counter <- s
是在main goroutine中进行的,今后channel中recv的操做<-counter
也是在main goroutine中进行的。send的时候会直接阻塞main goroutine,使得recv操做没法被执行,go将探测到此问题,并报错:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
要修复此问题,只需将send操做放在另外一个goroutine中执行便可:
package main import ( "fmt" ) func main() { goo(32) } func goo(s int) { counter := make(chan int) go func() { counter <- s }() fmt.Println(<-counter) }
或者,将counter设置为一个容量为1的buffered channel:
counter := make(chan int,1)
这样放完一个数据后send不会阻塞(被recv以前放第二个数据才会阻塞),能够执行到recv操做。
下面经过sync.WaitGroup类型来等待程序的结束,分析多个goroutine之间通讯时状态的转换。由于建立的channel是unbuffered类型的,因此send和recv都是阻塞的。
package main import ( "fmt" "sync" ) // wg用于等待程序执行完成 var wg sync.WaitGroup func main() { count := make(chan int) // 增长两个待等待的goroutines wg.Add(2) fmt.Println("Start Goroutines") // 激活一个goroutine,label:"Goroutine-1" go printCounts("Goroutine-1", count) // 激活另外一个goroutine,label:"Goroutine-2" go printCounts("Goroutine-2", count) fmt.Println("Communication of channel begins") // 向channel中发送初始数据 count <- 1 // 等待goroutines都执行完成 fmt.Println("Waiting To Finish") wg.Wait() fmt.Println("\nTerminating the Program") } func printCounts(label string, count chan int) { // goroutine执行完成时,wg的计数器减1 defer wg.Done() for { // 从channel中接收数据 // 若是无数据可recv,则goroutine阻塞在此 val, ok := <-count if !ok { fmt.Println("Channel was closed:",label) return } fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) // Close the channel close(count) return } // 输出接收到的数据后,加1,并从新将其send到channel中 val++ count <- val } }
上面的程序中,激活了两个goroutine,激活这两个goroutine后,向channel中发送一个初始数据值1,而后main goroutine将由于wg.Wait()等待2个goroutine都执行完成而被阻塞。
再看这两个goroutine,这两个goroutine执行彻底同样的函数代码,它们都接收count这个channel的数据,但多是goroutine1先接收到channel中的初始值1,也多是goroutine2先接收到初始值1。接收到数据后输出值,并在输出后对数据加1,而后将加1后的数据再次send到channel,每次send都会将本身这个goroutine阻塞(由于unbuffered channel),此时另外一个goroutine由于等待recv而执行。当加1后发送给channel的数据为10以后,某goroutine将关闭count channel,该goroutine将退出,wg的计数器减1,另外一个goroutine因等待recv而阻塞的状态将由于channel的关闭而失败,ok状态码将让该goroutine退出,因而wg的计数器减为0,main goroutine由于wg.Wait()而继续执行后面的代码。
前面都是在for无限循环中读取channel中的数据,但也可使用range来迭代channel,它会返回每次迭代过程当中所读取的数据,直到channel被关闭。必须注意,只要channel未关闭,range迭代channel就会一直被阻塞。
例如,将上面示例中的printCounts()改成for-range的循环形式。
func printCounts(label string, count chan int) { defer wg.Done() for val := range count { fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) close(count) return } val++ count <- val } }
channel是goroutine与goroutine之间通讯的基础,一边产生数据放进channel,另外一边从channel读取放进来的数据。能够借此实现多个goroutine之间的数据交换,例如goroutine_1->goroutine_2->goroutine_3
,就像bash的管道同样,上一个命令的输出能够不断传递给下一个命令的输入,只不过golang借助channel能够在多个goroutine(如函数的执行)之间传,而bash是在命令之间传。
如下是一个示例,第一个函数getRandNum()用于生成随机整数,并将生成的整数放进第一个channel ch1中,第二个函数addRandNum()用于接收ch1中的数据(来自第一个函数),将其输出,而后对接收的值加1后放进第二个channel ch2中,第三个函数printRes接收ch2中的数据并将其输出。
若是将函数认为是Linux的命令,则相似于下面的命令行:ch1至关于第一个管道,ch2至关于第二个管道
getRandNum | addRandNum | printRes
如下是代码部分:
package main import ( "fmt" "math/rand" "sync" ) var wg sync.WaitGroup func main() { wg.Add(3) // 建立两个channel ch1 := make(chan int) ch2 := make(chan int) // 3个goroutine并行 go getRandNum(ch1) go addRandNum(ch1, ch2) go printRes(ch2) wg.Wait() } func getRandNum(out chan int) { // defer the wg.Done() defer wg.Done() var random int // 总共生成10个随机数 for i := 0; i < 10; i++ { // 生成[0,30)之间的随机整数并放进channel out random = rand.Intn(30) out <- random } close(out) } func addRandNum(in,out chan int) { defer wg.Done() for v := range in { // 输出从第一个channel中读取到的数据 // 并将值+1后放进第二个channel中 fmt.Println("before +1:",v) out <- (v + 1) } close(out) } func printRes(in chan int){ defer wg.Done() for v := range in { fmt.Println("after +1:",v) } }
上面经过两个channel将3个goroutine链接起来,其中起链接做用的是第二个函数addRandNum()。在这个函数中使用了两个channel做为参数:一个channel用于接收、一个channel用于发送。
其实channel类的参数变量能够指定数据流向:
in <-chan int
:表示channel in通道只用于接收数据out chan<- int
:表示channel out通道只用于发送数据只用于接收数据的通道<-chan
不可被关闭,由于关闭通道是针对发送数据而言的,表示无数据再需发送。对于recv来讲,关闭通道是没有意义的。
因此,上面示例中三个函数可改写为:
func getRandNum(out chan<- int) { ... } func addRandNum(in <-chan int, out chan<- int) { ... } func printRes(in <-chan int){ ... }
下面是使用buffered channel实现异步处理请求的示例。
在此示例中:
如下是代码部分:
package main import ( "fmt" "math/rand" "sync" "time" ) type Task struct { ID int JobID int Status string CreateTime time.Time } func (t *Task) run() { sleep := rand.Intn(1000) time.Sleep(time.Duration(sleep) * time.Millisecond) t.Status = "Completed" } var wg sync.WaitGroup // worker的数量,即便用多少goroutine执行任务 const workerNum = 3 func main() { wg.Add(workerNum) // 建立容量为10的buffered channel taskQueue := make(chan *Task, 10) // 激活goroutine,执行任务 for workID := 0; workID <= workerNum; workID++ { go worker(taskQueue, workID) } // 将待执行任务放进buffered channel,共15个任务 for i := 1; i <= 15; i++ { taskQueue <- &Task{ ID: i, JobID: 100 + i, CreateTime: time.Now(), } } close(taskQueue) wg.Wait() } // 从buffered channel中读取任务,并执行任务 func worker(in <-chan *Task, workID int) { defer wg.Done() for v := range in { fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) v.run() fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) } }
不少时候想要同时操做多个channel,好比从ch一、ch2读数据。Go提供了一个select语句块,它像switch同样工做,里面放一些case语句块,用来轮询每一个case语句块的send或recv状况。
select
用法格式示例:
select { // ch1有数据时,读取到v1变量中 case v1 := <-ch1: ... // ch2有数据时,读取到v2变量中 case v2 := <-ch2: ... // 全部case都不知足条件时,执行default default: ... }
defalut语句是可选的,不容许fall through行为,但容许case语句块为空块。select会被return、break关键字中断:return是退出整个函数,break是退出当前select。
select的行为模式主要是对channel是否可读进行轮询,但也能够用来向channel发送数据。它的行为以下:
若是有所疑惑,后文的"select超时时间"有更有助于理解select的说明和示例。
全部的case块都是按源代码书写顺序进行评估的。当select未在循环中时,它将只对全部case评估一次,此次结束后就结束select。某次评估过程当中若是有知足条件的case,则全部其它case都直接结束评估,并退出这次select。
其实若是注意到select语句是在某一个goroutine中评估的,就不难理解只有全部case都不知足条件时,select所在goroutine才会被阻塞,只要有一个case知足条件,本次select就不会出现阻塞的状况。
须要注意的是,若是在select中执行send操做,则可能会永远被send阻塞。因此,在使用send的时候,应该也使用defalut语句块,保证send不会被阻塞。若是没有default,或者能确保select不阻塞的语句块,则早晚会被send阻塞。在后文有一个select中send永久阻塞的分析:双层channel的一个示例。
通常来讲,select会放在一个无限循环语句中,一直轮询channel的可读事件。
下面是一个示例,pump1()和pump2()都用于产生数据(一个产生偶数,一个产生奇数),并将数据分别放进ch1和ch2两个通道,suck()则从ch1和ch2中读取数据。而后在无限循环中使用select轮询这两个通道是否可读,最后main goroutine在1秒后强制中断全部goroutine。
package main import ( "fmt" "time" ) func main() { ch1 := make(chan int) ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2) time.Sleep(1e9) } func pump1(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 0 { ch <- i } } } func pump2(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 1 { ch <- i } } } func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1: fmt.Printf("Recv on ch1: %d\n", v) case v := <-ch2: fmt.Printf("Recv on ch2: %d\n", v) } } }