golang CSP 模型中的
C
, 主要用于goroutine之间消息的传递,咱们知道在写代码的过程当中,解偶是很是重要的一环,而使用channel则能够很好的隔离goroutine,使得goroutne之间的交互,只须要将重心关注在如何从channel中消费或者生产消息。
使用make声明一个channel
ch := make(chan int) ch <- 1 // write ch位于 <- 的左侧(表明数据流入 <- ch // read ch位于 <- 的右侧(表明数据流出
在真正使用channel前,咱们须要了解channel可能会产生 阻塞场景的全部可能,以防止在代码中编写出不符合咱们预期的代码。
下面咱们罗列出可能的四种情形
channel中无数据,可是执行 <- channel (读
ch := make(chan interface{}) <-ch fmt.Println("read buf succ")
channel中无数据,往 channel <- (写 ,可是没有goroutine读取。
ch := make(chan interface{}) ch <- 1 fmt.Println("read buf succ")
channel中无数据,可是执行 <- channel
ch := make(chan interface{}, 1) <-ch fmt.Println("read buf succ")
channel中已满, 继续执行 channel <- 动做,可是没有goroutine读取。
ch := make(chan interface{}, 1) ch <- 1 ch <- 2 fmt.Println("read buf succ")
使用 close关闭channel
ch := make(chan interface{}) close(ch)
关闭channel须要注意golang
- 重复关闭会 panic
- 向关闭的channel发送数据会panic
- 从关闭的channel读取数据,会读取到值的初始值,好比interface类型,读取到的就是nil
range 字段会阻塞监听 channel, 直到channel 被close。
func recv(ch chan int) { for msg := range ch { // 使用 range 能够自动等待 ch 的行为, 直到ch 被close。 fmt.Println(msg) } fmt.Println("channel closed") } func send(ch chan int, msg int) { ch <- msg } func main() { ch := make(chan int, 2) go recv(ch) ch <- 1 ch <- 2 ch <- 3 time.AfterFunc(time.Second*2, func() { close(ch) }) }
select 的大体工做原理app
- 检查全部的
case
- 当检查的
case
已经能够发送|接收,则执行当前代码块- 当有多个
case
能够执行,则随机
选择一个执行- 当没有
case
能够执行,则阻塞- 若是存在
default
,当没有可执行代码块时,则执行default
代码块使用select来管理channel的读取, 经过default防止阻塞.性能
func readCh(ch chan interface{}) error { select { case v := <-ch: fmt.Println(v) default: return errors.New("no data") } return nil }
使用 timer 或者 context 来进行到期退出断定. 另外咱们也可使用sync.Once()这种形式设定一个开关,
来控制select的退出逻辑,可参照grpc/internal/grpcsync/event.go
func readCh(ch chan interface{}) error { select { case v := <-ch: fmt.Println(v) case <-time.After(time.Second): return errors.New("time arrived") } return nil }
下面代码的 Unbounded 实现摘自grpc/internal/buffer/unbounded.go,
它没有选择使用带容量的channel,而是另外使用了一个list来备份积压的消息,这里我猜有两个缘由code
- 使用这种方式channel变成了一个任意长度的channel,不用考虑channel被写满致使的问题。
- 这里为何不直接使用list + mutex,由于须要channel的特性来隔离goroutine。
type Unbounded struct { c chan interface{} backlog []interface{} sync.Mutex } func NewUnbounded() *Unbounded { return &Unbounded{c: make(chan interface{}, 1)} } func (b *Unbounded) Put(t interface{}) { b.Lock() if len(b.backlog) == 0 { select { case b.c <- t: b.Unlock() return default: } } b.backlog = append(b.backlog, t) b.Unlock() } func (b *Unbounded) Load() { b.Lock() if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: b.backlog[0] = nil b.backlog = b.backlog[1:] default: } } b.Unlock() } func (b *Unbounded) Get() <-chan interface{} { return b.c } var q *Queue type Queue struct { buf *Unbounded } type QueueInterface interface { consume() produce(info int) } func (q *Queue) consume() { for { select { case t := <-q.buf.Get(): q.buf.Load() fmt.Println(t) case <-time.After(time.Second * 10): fmt.Println(errors.New("the end")) } } } func (q *Queue) produce(info int) { q.buf.Put(info) } func main() { q := &Queue{ buf: NewUnbounded(), } go q.consume() q.produce(1) q.produce(3) time.AfterFunc(time.Second*2, func() { for i := 0; i < 3; i++ { q.produce(4) } //q.produce(4) }) select {} }
注: 这里的实现使用了interface做为channel的消息体,凡是在有性能瓶颈的地方应该使用具体的类型独立实现一版,相似grpc/internal/transport.go中的recvBuffer