channel 是 Go 语言中的一个核心类型,能够把它当作管道。并发核心单元经过它就能够发送或者接收数据进行通信,这在必定程度上又进一步下降了编程的难度。git
channel 是一个数据类型,主要用来解决 go 程的同步问题以及 go 程之间数据共享(数据传递)的问题。编程
goroutine 运行在相同的地址空间,所以访问共享内存必须作好同步。goroutine 奉行经过通讯来共享内存,而不是共享内存来通讯。缓存
引⽤类型 channel 可用于多个 goroutine 通信。其内部实现了同步,确保并发安全(经过 CSP)。安全
强调一下:数据结构
channel 是一个数据类型,对应一个“管道(通道)”。并发
和 map 相似,channel 也是一个对应 make
建立的底层数据结构的引用。异步
既然是引用, 那么咱们在传参的时候就能完成在 A 函数栈帧内修改 B 函数栈帧数据的目的. 说白了就是传的地址.函数
当咱们复制一个 channel 或用于函数参数传递时,咱们只是拷贝了一个 channel 引用,所以调用者和被调用者将引用同一个 channel 对象。 和其它的引用类型同样,channel 的零值也是 nil。高并发
定义一个 channel 时,也须要定义发送到 channel 的值的类型。channel 可使用内置的 make()
函数来建立:网站
make(chan Type) // 等价于 make(chan Type, 0) make(chan Type, capacity)
当参数 capacity = 0
时,channel 是无缓冲阻塞读写的;当 capacity > 0
时,channel 有缓冲、是非阻塞的,直到写满 capacity 个元素才阻塞写入。
channel 很是像生活中的管道,一边能够存放东西,另外一边能够取出东西。channel 经过操做符 <-
来接收和发送数据,发送和接收数据语法:
channel <- value // 发送 value 到 channel <- channel // 接收并将其丢弃 x := <- channel // 从 channel 中接收数据, 并赋值给 x x, ok := <- channel // 功能同上, 同时检查通道是否已关闭或者是否为空
默认状况下,channel 接收和发送数据都是阻塞的,除非另外一端已经准备好,这样就使得 goroutine 同步变的更加的简单,而不须要显式的 lock。
咱们先看一下没有用 channel 的例子:
package main import ( "fmt" "time" ) // 定义一个打印机 func printer(s string) { for _, value := range s { fmt.Printf("%c", value) time.Sleep(time.Millisecond * 300) } } /* 定义两我的使用打印机 */ func person1() { printer("hello") } func person2() { printer("world") } func main() { go person1() go person2() time.Sleep(time.Second * 5) // 注意,只写上面两行会直接运行完毕,想想 go 程的特性 }
结果:
hwoelrllod
那么,怎么用 channel 实现来保证顺序输出呢?
由于,person1 与 person2 都须要用一个 channel,因此要在全局定义一个 channel。具体代码以下:
PS:你要传的什么类型数据与 channel 中定义的类型没有必然的联系。
package main import ( "fmt" "time" ) // 全局定义一个 channel,用来完成数据同步 var ch = make(chan int) // 传的什么类型数据与 channel 中定义的类型没有必然的联系 // 定义一个打印机 func printer(s string) { for _, value := range s { fmt.Printf("%c", value) time.Sleep(time.Millisecond * 300) } } /* 定义两我的使用打印机 */ func person1() { printer("hello") ch <- 777 } func person2() { <-ch printer("world") } func main() { go person1() go person2() time.Sleep(time.Second * 3) // 注意,只写上面两行会直接运行完毕,想想 go 程的特性 }
这个时候,当运行 person2
函数时,会阻塞在 <-ch
处,运行 person1
函数时,打印完 “hello”,会在 ch <- 777
处阻塞。
可是这时,ch <- 777
对应这写端已经准备好了,同时 <-ch
对应读端也已经准备好了,因此代码就会继续执行,接下来就会打印 “world”。
咱们再来看一段代码:
package main import "fmt" func main() { c := make(chan int) go func() { defer fmt.Println("子 go 程结束") fmt.Println("子 go 程正在运行 ...") c <- 666 /// 把 666 发送到 c }() num := <-c // 从 c 中接收数据, 并赋值给 num fmt.Println("num = ", num) fmt.Println("main go 程结束") }
运行结果:
子 go 程正在运行 ... 子 go 程结束 num = 666 main go 程结束
以上咱们都是用 channel 用来作数据同步,并无用到 channel 中的数据,下面咱们看一个用 channel 完成数据传递的例子:
package main import "fmt" func main() { ch := make(chan string) // len(ch): channel 中剩余未读取的数据个数; cap(ch): channel 的容量 fmt.Println("len(ch) = ", len(ch), "cap(ch) = ", cap(ch)) go func() { for i := 0; i < 2; i++ { fmt.Println("i = ", i) } ch <- "子 go 程打印完毕" }() str := <-ch fmt.Println(str) }
注意:len(ch): channel 中剩余未读取的数据个数; cap(ch): channel 的容量
运行结果:
len(ch) = 0 cap(ch) = 0 i = 0 i = 1 子 go 程打印完毕
强调一下:
channel 有两个端:
要求:读端和写端必须同时知足条件(读端有数据可读,写端有数据可写),才能在 channel 中完成数据流动。不然,阻塞。
【补充知识点】
每当有一个进程启动时,系统会自动打开三个文件:标准输入、标准输出、标准错误,对应三个文件:stdin、stdout、stderr。
当进程运行结束时,系统会自动关闭这三个文件。
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。
这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操做。不然,通道会致使先执行发送或接收操做的 goroutine 阻塞等待。
这种对通道进行发送和接收的交互行为自己就是同步的。其中任意一个操做都没法离开另外一个操做单独存在。
阻塞:因为某种缘由数据没有到达,当前协程(线程)持续处于等待状态,直到条件知足,才接触阻塞。
同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。
下图展现两个 goroutine 如何利用无缓冲的通道来共享一个值:
简单说明:
无缓冲的 channel 建立格式:
make(chan Type) // 等价于 make(chan Type, 0)
若是没有指定缓冲区容量,那么该通道就是同步的,所以会阻塞到发送者准备好发送和接收者准备好接收。
例如:
package main import ( "fmt" "time" ) func main() { // 建立无缓冲的 channel ch := make(chan int, 0) go func() { defer fmt.Println("子 go 程结束") for i := 0; i < 3; i++ { fmt.Println("子 go 程正在运行, i = ", i) ch <- i } }() time.Sleep(time.Second) // 延时一秒 for i := 0; i < 3; i++ { // 从 ch 中接收数据, 并赋值给 num num := <-ch fmt.Println("num = ", num) } fmt.Println("main go程结束") }
运行结果:
子 go 程正在运行, i = 0 num = 0 子 go 程正在运行, i = 1 子 go 程正在运行, i = 2 num = 1 num = 2 main go程结束
强调一下:
无缓冲 channel 的容量为0。
channel 至少应用于两个 go 程中:一个读、另外一个写。
具有同步能力。读、写同步。(好比 打电话)
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。
这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动做的条件也不一样。
只有通道中没有要接收的值时,接收动做才会阻塞。
只有通道没有可用缓冲区容纳被发送的值时,发送动做才会阻塞。
这致使有缓冲的通道和无缓冲的通道之间的一个很大的不一样:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
使用有缓冲channel在goroutine之间同步的示例图:
有缓冲的 channel 建立格式:
make(chan Type, capacity)
若是给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含能够接收的数据,那么其通讯就会无阻塞地进行。
请看如下代码:
package main import ( "fmt" "time" ) func main() { // 建立一个有缓冲的 channel ch := make(chan int, 3) // 存满 3 个元素以前不会阻塞 // 查看一下 channel 的未被读取的缓冲元素数量以及 channel 容量 fmt.Printf("len(ch) = %d, cap(ch) = %d\n", len(ch), cap(ch)) go func() { defer fmt.Println("子 go 程结束") for i := 0; i < 5; i++ { ch <- i fmt.Println("子 go 程正在运行, i = ", i) } }() time.Sleep(time.Second) for i := 0; i < 5; i++ { num := <-ch fmt.Println("num = ", num) } fmt.Println("main go 程结束") }
运行结果:
len(ch) = 0, cap(ch) = 3 子 go 程正在运行, i = 0 子 go 程正在运行, i = 1 子 go 程正在运行, i = 2 num = 0 num = 1 num = 2 num = 3 子 go 程正在运行, i = 3 子 go 程正在运行, i = 4 子 go 程结束 num = 4 main go 程结束
强调一下:
有缓冲 channel 的容量大于 0。
channel 应用于两个 go 程中:一个读、另外一个写。
缓冲区能够进行数据存储,存储至容量上限才阻塞。
具有异步的能力,不须要同时操做 channel 缓冲区。(好比发短信)
若是发送者知道,没有更多的值须要发送到 channel 的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,由于接收者能够中止没必要要的接收等待。
这能够经过内置的 close
函数来关闭 channel 实现。当咱们肯定再也不向对端发送、接收数据时,咱们能够关闭 channel。(通常关闭发送端)
对端能够判断 channel 是否关闭:
if num, ok := <- ch; ok { // 对端没有关闭,num 保存读到的数据 } else { // 对端已经关闭,num 保存对应类型的零值 }
例如:
package main import "fmt" func main() { ch := make(chan int) go func() { for i := 0; i < 5; i++ { ch <- i } // 若是没有 close(ch), 那么当程序打印完 0 1 2 3 4 时, 会由于没有写端 channel 形成死锁 close(ch) // 写端,写完数据主动关闭 channel }() // 从 channel 中读取数据,可是不知道读多少次,咱们能够判断当 channel 关闭时意味着读取数据完毕 for true { // ok 为 true说明 channel 没有关闭, 为 false 说明 channel 已经关闭 if data, ok := <-ch; ok { fmt.Println("写端没有关闭,data = ", data) } else { fmt.Println("写端关闭,data = ", data) break } } fmt.Println("结束.") }
运行结果:
写端没有关闭,data = 0 写端没有关闭,data = 1 写端没有关闭,data = 2 写端没有关闭,data = 3 写端没有关闭,data = 4 写端关闭,data = 0 结束.
咱们也能够用 for range
获取 channel 中的数据:
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 5) go func() { for i := 0; i < 5; i++ { ch <- i } // 若是没有 close(ch), 那么当程序打印完 0 1 2 3 4 时, 会由于没有写端 channel 形成死锁 close(ch) // 写端,写完数据主动关闭 channel fmt.Println("子 go 程结束") }() time.Sleep(time.Second) // 使用 for range 循环读取 channel 的数据,注意这里前面只接收一个变量 for num := range ch { fmt.Println(num) } fmt.Println("结束.") }
运行结果:
子 go 程结束 0 1 2 3 4 结束.
强调一下:
for num := range ch{} // 注意形式,不是 <-ch
默认状况下,通道 channel 是双向的,也就是,既能够往里面发送数据也能够同里面接收数据。
可是,咱们常常见一个通道做为参数进行传递而只但愿对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候咱们能够指定通道的方向。
单向 channel 变量的声明很是简单,以下:
var ch1 chan int // ch1 是一个正常的 channel,是双向的 var ch2 chan<- float64 // ch2 是一个单向 channel,只能用于写 float64 数据 var ch3 <-chan int // ch3 是一个单向 channel,只能用于读 int 数据
chan<-
表示数据进入管道,要把数据写进管道,对于调用者就是输出。<-chan
表示数据从管道出来,对于调用者就是获得管道的数据,固然就是输入。能够将 channel 隐式转换为单向队列,只收或只发,不能将单向 channel 转换为双向 channel:
ch := make(chan int, 3) var sendCh chan<- int = ch // 只写 var recvCh <-chan int // 只读
来看一下单向 channel 的简单示例(记住了,channel 是传引用):
package main import "fmt" // 只写 func send(sendCh chan<- int) { sendCh <- 777 close(sendCh) } // 只读 func recv(recvCh <-chan int) { num := <-recvCh fmt.Println("num = ", num) } func main() { ch := make(chan int) go send(ch) recv(ch) }
运行结果:
num = 777
单向 channel 最典型的应用是: 生产者消费者模型.
所谓生产者消费者模型: 某个模块(函数等)负责产生数据, 这些数据由另外一个模块来负责处理(此处的模块是广义的, 能够是类, 函数, 协程, 线程, 进程等). 产生数据的模块, 就形象地称为生产者; 而处理数据的模块, 就称为消费者.
单单抽象出生产者和消费者, 还够不上是生产者消费者模型. 该模式还须要有一个缓冲区处于生产者和消费者之间, 做为一个中介. 生产者把数据放入缓冲区, 而消费者从缓冲区取出数据. 以下图所示
能够这样理解, 假设你要寄一封信, 大体过程以下:
那么, 这个缓冲区有什么用呢? 为何不让生产者直接调用消费者的某个函数, 直接把数据传递过去, 而去设置一个缓冲区呢?
缓冲区的好处大概以下:
1: 解耦 ( 下降 生产者 和 消费者 之间的耦合度 )
假设生产者和消费者分别是两个类. 若是让生产者直接调用消费者的某个方法, 那么生产者对于消费者就会产生依赖(也就是耦合). 未来若是消费者的代码发生变化, 可能会直接影响到生产者. 而若是二者都依赖某个缓冲区, 二者之间不直接依赖, 耦合度也就相应下降了.
依然用寄信的例子简单说一下, 假设生产者就是你, 你负责写信, 若是没有邮筒(即缓冲区), 你就须要直接把信给邮递员(消费者). 可是, 过了几个月, 邮递员换人了, 你想要寄信就必须再认识新的邮递员, 你刚和新的邮递员熟悉以后, 又换了一个邮递员, 你又要从新认识... 这就显得很麻烦, 就是想寄个信而已, 不想认识那么多邮递员...
可是若是有邮筒(缓冲区)呢, 不管邮递员怎么更换, 这个与你无关, 我依然是把信放入邮筒就能够了. 这样一来, 就简单多了.
2: 提升并发能力 ( 生产者与消费者数量不对等时, 能保持正常通讯 )
生产者直接调用消费者的某个方法, 还有另外一个弊端
因为函数调用是同步的(或者叫阻塞的), 在消费者的方法没有返回以前, 生产者只好一直等在那边. 万一消费者处理数据很慢, 生产者只能白白浪费时间.
使用了生产者/消费者模式以后, 生产者和消费者能够是两个独立的并发主体.
生产者把制造出来的数据放入缓冲区, 就能够再去生产下一个数据. 基本上不用依赖消费者的处理速度.
其实最初这个生产者消费者模式, 主要就是用来处理并发问题的.
从寄信的例子来看, 若是没有邮筒, 你得拿着信傻站在路口等邮递员过来收(至关于生产者阻塞); 又或者邮递员得挨家挨户问, 谁要寄信(至关于消费者轮询).
3: 缓存 ( 生产者与消费者数据处理速度不一致时, 暂存数据 )
若是生产者制造数据的速度时快时慢, 缓冲区的好处就体现出来了.
当数据制造快的时候, 消费者来不及处理, 未处理的数据能够暂时存在缓冲区中. 等生产者的制造速度慢下来, 消费者再慢慢处理掉.
再拿寄信的例子举例, 假设邮递员一次只能带走1000封信. 万一某次碰上情人节送贺卡, 须要寄出的信超过1000封, 这时候邮筒这个缓冲区就派上用场了. 邮递员把来不及带走的信暂存在邮筒中, 等下次过来时再拿走.
先来看一下无缓冲的例子
package main import "fmt" // 生产者 func producer(ch chan<- int) { for i := 0; i < 5; i++ { fmt.Println("生产者写入数据, num = ", i) ch <- i } close(ch) } // 消费者 func consumer(ch <-chan int) { for num := range ch { fmt.Println("消费者拿到数据, num = ", num) } } func main() { // 无缓冲 channel ch := make(chan int) go producer(ch) // 子 go 程,生产者 consumer(ch) // 主 go 程,消费者 }
运行结果:
生产者写入数据, num = 0 生产者写入数据, num = 1 消费者拿到数据, num = 0 消费者拿到数据, num = 1 生产者写入数据, num = 2 生产者写入数据, num = 3 消费者拿到数据, num = 2 消费者拿到数据, num = 3 生产者写入数据, num = 4 消费者拿到数据, num = 4
再来看一下有缓冲的例子 二者对比结果
package main import "fmt" // 生产者 func producer(ch chan<- int) { for i := 0; i < 5; i++ { fmt.Println("生产者写入数据, num = ", i) ch <- i } close(ch) } // 消费者 func consumer(ch <-chan int) { for num := range ch { fmt.Println("消费者拿到数据, num = ", num) } } func main() { // 有缓冲 channel ch := make(chan int, 2) go producer(ch) // 子 go 程,生产者 consumer(ch) // 主 go 程,消费者 }
运行结果:
生产者写入数据, num = 0 生产者写入数据, num = 1 生产者写入数据, num = 2 生产者写入数据, num = 3 消费者拿到数据, num = 0 消费者拿到数据, num = 1 消费者拿到数据, num = 2 消费者拿到数据, num = 3 生产者写入数据, num = 4 消费者拿到数据, num = 4
简单说明
首先建立一个双向的 channel, 而后开启一个新的 goroutine, 把双向通道做为参数传递到 producer 方法中, 同时转成只写通道. 子 go 程开始执行循环, 向只写通道中添加数据, 这就是生产者.
主 go 程直接调用 consumer 方法, 该方法将双向通道转成只读通道, 经过循环每次从通道中读取数据, 这就是消费者.
注意, channel 做为参数传递, 是引用传递.
在实际的开发中, 生产者消费者模式应用也很是的普遍.
例如, 在电商网站中, 订单处理, 就是很是典型的生产者消费者模式.
当不少用户单击下订单按钮后, 订单生产的数据所有放到缓冲区(队列)中, 而后消费者将队列中的数据取出来发送至仓库管理等系统.
经过生产者消费者模式, 将订单系统与仓库管理系统隔离开, 且用户能够随时下单(生产数据). 若是订单系统直接调用仓库系统, 那么用户单击下订单按钮后, 要等到仓库系统的结果返回, 这样速度很慢.
接下来咱们就来模拟一下订单处理的过程.
package main import "fmt" type OrderInfo struct { id int } func producer2(out chan<- OrderInfo) { // 生成订单 -- 生产者 for i:=0; i < 10; i++ { // 循环生成10个订单 order := OrderInfo{id: i+1} fmt.Println("生成的订单ID: ", order.id) out <- order } close(out) // 写完, 关闭channel } func consumer2(in <-chan OrderInfo) { // 处理订单 -- 消费者 for order := range in { // 从channel取出订单 fmt.Println("订单ID为: ", order.id) // 模拟处理订单 } } func main() { ch := make(chan OrderInfo, 5) go producer2(ch) consumer2(ch) }
简单说明: OrderInfo
为订单信息, 这里为了简单只定义了一个订单编号属性, 而后生产者模拟生成10个订单, 消费者对产生的订单进行处理.
Timer 是一个定时器. 表明将来的一个单一事件, 你能够告诉 Timer 你要等待多长时间.
type Timer struct { C <- chan Time r runtimeTimer }
它提供一个channel, 在定时时间到达以前, 没有数据写入 Timer.C
会一直阻塞. 直到定时时间到, 系统会自动向 Timer.C
这个channel中写入当前时间, 阻塞即被解除.
示例代码:
package main import ( "fmt" "time" ) func main() { fmt.Println("当前时间: ", time.Now()) // 建立定时器, 指定定时时长 myTimer := time.NewTimer(time.Second * 2) // 定时到达后, 系统会自动向定时器的成员 C 写入系统当前系统时间 //读取 myTimer.C 获得定时后的系统时间, 并完成一次chan的读操做. nowTime := <- myTimer.C fmt.Println("当前时间: ", nowTime) }
1. Sleep time.Sleep(time.Second) 2. Time.C fmt.Println("当前时间: ", time.Now()) myTimer := time.NewTimer(time.Second * 2) nowTime := <- myTimer.C fmt.Println("如今时间: ", nowTime) 3. time.After fmt.Println("当前时间: ", time.Now()) nowTime := <- time.After(time.Second * 2) fmt.Println("如今时间: ", nowTime)
package main import ( "fmt" "time" ) func main(){ myTimer := time.NewTimer(time.Second * 3) // 建立定时器 go func() { <- myTimer.C fmt.Println("子go程, 定时完毕") }() myTimer.Stop() // 设置定时器中止 for { ; } }
死循环只是为了方便查看结果.
package main import ( "fmt" "time" ) func main() { myTimer := time.NewTimer(time.Second * 10) myTimer.Reset(time.Second * 2) // 重置定时时长为 2 秒 go func(){ <- myTimer.C fmt.Println("子go程, 定时完毕") }() for { ; } }
Ticker是一个周期触发定时的计时器, 它会按照一个时间间隔往channel发送系统当前时间, 而channel的接受者能够以固定的时间间隔从channel中读取.
type Ticker struct { C <- chan Time r runtimeTimer }
package main import ( "fmt" "time" ) func main() { myTicker := time.NewTicker(time.Second) // 定义一个周期定时器 go func() { for { nowTime := <- myTicker.C fmt.Println("如今时间: ", nowTime) } }() // 死循环, 特意不让main goroutine结束 for { ; } }
package main import ( "fmt" "time" ) func main(){ quit := make(chan bool) // 建立一个判断是否终止的channel myTicker := time.NewTicker(time.Second) // 定义一个周期定时器 go func() { i := 0 for { nowTime := <- myTicker.C i++ fmt.Println("如今时间: ", nowTime) if i == 5 { quit <- true // 解除 主go程阻塞 } } }() <- quit // 在子go程循环获取 <- myTicker.C 期间, 一直阻塞 }
欢迎访问个人我的网站:
李培冠博客:lpgit.com