Channel是Go中的一种类型,和goroutine一块儿为Go提供了并发技术, 它在开发中获得了普遍的应用。Go鼓励人们经过Channel在goroutine之间传递数据的引用(就像把数据的owner从一个goroutine传递给另一个goroutine), Effective Go总结了这么一句话:html
Do not communicate by sharing memory; instead, share memory by communicating.git
在 Go内存模型指出了channel做为并发控制的一个特性:github
A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)golang
除了正常的在goroutine之间安全地传递共享数据, Channel还能够玩出不少的花样(模式), 本文列举了一些channel的应用模式。apache
促成本文诞生的因素主要包括:编程
下面就让咱们以实例的方式看看这么模式吧。数组
咱们知道, Go的标准库sync
有Mutex
,能够用来做为锁,可是Mutex
却没有实现TryLock
方法。缓存
咱们对于TryLock
的定义是当前goroutine尝试得到锁, 若是成功,则得到了锁,返回true, 不然返回false。咱们可使用这个方法避免在获取锁的时候当前goroutine被阻塞住。安全
原本,这是一个经常使用的功能,在一些其它编程语言中都有实现,为何Go中没有实现的?issue#6123有详细的讨论,在我看来,Go核心组成员自己对这个特性没有积极性,而且认为经过channel能够实现相同的方式。并发
其实,对于标准库的sync.Mutex
要增长这个功能很简单,下面的方式就是经过hack
的方式为Mutex
实现了TryLock
的功能。
const mutexLocked = 1 << iota type Mutex struct { mu sync.Mutex } func (m *Mutex) Lock() { m.mu.Lock() } func (m *Mutex) Unlock() { m.mu.Unlock() } func (m *Mutex) TryLock() bool { return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked) } func (m *Mutex) IsLocked() bool { return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked }
上面的代码还额外增长了一个IsLocked
方法,不过这个方法通常不经常使用,由于查询和加锁这两个方法执行的时候不是一个原子的操做,素以这个方法通常在调试和打日志的时候可能有用。若是你看一下Mutex
实现的源代码,就很容易理解上面的这段代码了,由于mutex
实现锁主要利用CAS
对它的一个int32字段作操做。
既然标准库中不许备在Mutex
上增长这个方法,而是推荐使用channel来实现,那么就让咱们看看如何使用 channel来实现。
type Mutex struct { ch chan struct{} } func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu } func (m *Mutex) Lock() { <-m.ch } func (m *Mutex) Unlock() { select { case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") } } func (m *Mutex) TryLock() bool { select { case <-m.ch: return true default: } return false } func (m *Mutex) IsLocked() bool { return len(m.ch) > 0 }
你还能够将缓存的大小从1改成n,用来处理n个锁(资源)。主要是利用channel边界状况下的阻塞特性实现的。
有时候,咱们在获取一把锁的时候,因为有竞争的关系,在锁被别的goroutine拥有的时候,当前goroutine没有办法当即得到锁,只能阻塞等待。标准库并无提供等待超时的功能,咱们尝试实现它。
type Mutex struct { ch chan struct{} } func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu } func (m *Mutex) Lock() { <-m.ch } func (m *Mutex) Unlock() { select { case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") } } func (m *Mutex) TryLock(timeout time.Duration) bool { timer := time.NewTimer(timeout) select { case <-m.ch: timer.Stop() return true case <-time.After(timeout): } return false } func (m *Mutex) IsLocked() bool { return len(m.ch) > 0 }
Or Channel 模式
你也能够把它用Context
来改造,不是利用超时,而是利用Context
来取消/超时得到锁的操做,这个做业留给读者来实现。
当你等待多个信号的时候,若是收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。
举个例子, 咱们往提供相同服务的n个节点发送请求,只要任意一个服务节点返回结果,咱们就能够执行下面的业务逻辑,其它n-1的节点的请求能够被取消或者忽略。当n=2的时候,这就是back request
模式。 这样能够用资源来换取latency的提高。
须要注意的是,当收到任意一个信号的时候,其它信号都被忽略。若是用channel来实现,只要从任意一个channel中接收到一个数据,那么全部的channel均可以被关闭了(依照你的实现,可是输出的channel确定会被关闭)。
有三种实现的方式: goroutine、reflect和递归。
func or(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { var once sync.Once for _, c := range chans { go func(c <-chan interface{}) { select { case <-c: once.Do(func() { close(out) }) case <-out: } }(c) } }() return out }
为了不并发关闭输出channel的问题,关闭操做只执行一次。or
函数能够处理n个channel,它为每一个channel启动一个goroutine,只要任意一个goroutine从channel读取到数据,输出的channel就被关闭掉了。
Go的反射库针对select语句有专门的数据(reflect.SelectCase
)和函数(reflect.Select
)处理。
因此咱们能够利用反射“随机”地从一组可选的channel中接收数据,并关闭输出channel。
这种方式看起来更简洁。
func or(channels ...<-chan interface{}) <-chan interface{} { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan interface{}) go func() { defer close(orDone) var cases []reflect.SelectCase for _, c := range channels { cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } reflect.Select(cases) }() return orDone }
func or(channels ...<-chan interface{}) <-chan interface{} { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan interface{}) go func() { defer close(orDone) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: m := len(channels) / 2 select { case <-or(channels[:m]...): case <-or(channels[m:]...): } } }() return orDone }
Or-Done-Channel模式
在后面的扇入(合并)模式中,咱们仍是会使用相一样的递归模式来合并多个输入channel,根据 justforfun 的测试结果,这种递归的方式要比goroutine、Reflect更有效。
这种模式是咱们常用的一种模式,经过一个信号channel(done)来控制(取消)输入channel的处理。
一旦从done channel中读取到一个信号,或者done channel被关闭, 输入channel的处理则被取消。
这个模式提供一个简便的方法,把done channel 和 输入 channel 融合成一个输出channel。
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream }
每一个channel起一个goroutine。
func fanIn(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { var wg sync.WaitGroup wg.Add(len(chans)) for _, c := range chans { go func(c <-chan interface{}) { for v := range c { out <- v } wg.Done() }(c) } wg.Wait() close(out) }() return out
下面这种实现方式其实仍是有些问题的, 在输入channel读取比较均匀的时候比较有效,不然性能比较低下。
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) var cases []reflect.SelectCase for _, c := range chans { cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } for len(cases) > 0 { i, v, ok := reflect.Select(cases) if !ok { //remove this case cases = append(cases[:i], cases[i+1:]...) continue } out <- v.Interface() } }() return out }
func fanInRec(chans ...<-chan interface{}) <-chan interface{} { switch len(chans) { case 0: c := make(chan interface{}) close(c) return c case 1: return chans[0] case 2: return mergeTwo(chans[0], chans[1]) default: m := len(chans) / 2 return mergeTwo( fanInRec(chans[:m]...), fanInRec(chans[m:]...)) } } func mergeTwo(a, b <-chan interface{}) <-chan interface{} { c := make(chan interface{}) go func() { defer close(c) for a != nil || b != nil { select { case v, ok := <-a: if !ok { a = nil continue } c <- v case v, ok := <-b: if !ok { b = nil continue } c <- v } } }() return c }
扇出行为至少能够分为两种:
本节只介绍第一种状况,下一节介绍第二种状况
将读取的值发送给每一个输出channel, 异步模式可能会产生不少的goroutine。
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() for v := range ch { v := v for i := 0; i < len(out); i++ { i := i if async { go func() { out[i] <- v }() } else { out[i] <- v } } } }() }
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() cases := make([]reflect.SelectCase, len(out)) for i := range cases { cases[i].Dir = reflect.SelectSend } for v := range ch { v := v for i := range cases { cases[i].Chan = reflect.ValueOf(out[i]) cases[i].Send = reflect.ValueOf(v) } for _ = range cases { // for each channel chosen, _, _ := reflect.Select(cases) cases[chosen].Chan = reflect.ValueOf(nil) } } }() }
roundrobin的方式选择输出channel。
func fanOut(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() // roundrobin var i = 0 var n = len(out) for v := range ch { v := v out[i] <- v i = (i + 1) % n } }() }
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() cases := make([]reflect.SelectCase, len(out)) for i := range cases { cases[i].Dir = reflect.SelectSend cases[i].Chan = reflect.ValueOf(out[i]) } for v := range ch { v := v for i := range cases { cases[i].Send = reflect.ValueOf(v) } _, _, _ = reflect.Select(cases) } }() }
由于go自己的channel没法再进行扩展, eapache/channels
库定义了本身的channel接口,并提供了与channel方便的转换。
eapache/channels
提供了四个方法:
同时对上面的四个函数还提供了WeakXXX
的函数,输入关闭后不会关闭输出。
下面看看对应的函数的例子。
func testDist() { fmt.Println("dist:") a := channels.NewNativeChannel(channels.None) outputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for i := 0; i < 6; i++ { var v interface{} var j int select { case v = <-outputs[0].Out(): j = 0 case v = <-outputs[1].Out(): j = 1 case v = <-outputs[2].Out(): j = 2 case v = <-outputs[3].Out(): j = 3 } fmt.Printf("channel#%d: %d\n", j, v) } }
func testTee() { fmt.Println("tee:") a := channels.NewNativeChannel(channels.None) outputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3]) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for i := 0; i < 20; i++ { var v interface{} var j int select { case v = <-outputs[0].Out(): j = 0 case v = <-outputs[1].Out(): j = 1 case v = <-outputs[2].Out(): j = 2 case v = <-outputs[3].Out(): j = 3 } fmt.Printf("channel#%d: %d\n", j, v) } }
func testMulti() { fmt.Println("multi:") a := channels.NewNativeChannel(channels.None) inputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) go func() { for i := 0; i < 5; i++ { for j := range inputs { inputs[j].In() <- i } } for i := range inputs { inputs[i].Close() } }() for v := range a.Out() { fmt.Printf("%d ", v) } }
func testPipe() { fmt.Println("pipe:") a := channels.NewNativeChannel(channels.None) b := channels.NewNativeChannel(channels.None) channels.Pipe(a, b) // channels.WeakPipe(a, b) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for v := range b.Out() { fmt.Printf("%d ", v) } }
从channel的行为来看,它看起来很像一个数据流,因此咱们能够实现一些相似Scala 集合的操做。
Scala的集合类提供了丰富的操做(方法), 固然其它的一些编程语言或者框架也提供了相似的方法, 好比Apache Spark、Java Stream、ReactiveX等。
下面列出了一些方法的实现,我相信通过一些人的挖掘,相关的方法能够变成一个很好的类库,可是目前咱们先看一些例子。
skip函数是从一个channel中跳过开一些数据,而后才开始读取。
skipN跳过开始的N个数据。
func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream }
func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if !fn(v) { takeStream <- v } } } }() return takeStream }
func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) take := false for { select { case <-done: return case v := <-valueStream: if !take { take = !fn(v) if !take { continue } } takeStream <- v } } }() return takeStream }
takeN 读取开头N个数据。
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream }
func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if fn(v) { takeStream <- v } } } }() return takeStream }
func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if !fn(v) { return } takeStream <- v } } }() return takeStream }
若是输入是一个channel,channel中的数据仍是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。
它与扇入不一样,扇入的输入channel在调用的时候就是固定的,而且以数组的方式提供,而flat的输入是一个channel,能够运行时随时的加入channel。
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { var stream <-chan interface{} select { case maybeStream, ok := <-chanStream: if ok == false { return } stream = maybeStream case <-done: return } for val := range orDone(done, stream) { select { case valStream <- val: case <-done: } } } }() return valStream }
map将一个channel映射成另一个channel, channel的类型能够不一样。
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} { out := make(chan interface{}) if in == nil { close(out) return out } go func() { defer close(out) for v := range in { out <- fn(v) } }() return out }
好比你能够处理一个公司员工工资的channel, 输出一个扣税以后的员工工资的channel。由于map
是go的关键字,因此咱们不能命名函数类型为map
,这里用mapChan
代替。
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} { if in == nil { return nil } out := <-in for v := range in { out = fn(out, v) } return out } 你能够用`reduce`实现`sum`、`max`、`min`等聚合操做。
全部的代码能够在github上找到: smallnest/channels。