Go语言基础之12--Channel

 1、不一样goroutine之间如何进行通信?

一、全局变量和锁同步web

缺点:多个goroutine要通讯时,定义太多的全局变量(每一个全局变量功能不同),很差维护安全

二、Channel异步

2、channel概念

a. 相似unix中管道(pipe)函数

b. 先进先出spa

c. 线程安全,多个goroutine同时访问,不须要加锁线程

d. channel是有类型的,好比说:一个整数(int)的channel只能存放整数(int)3d

 

注意:channel是引用类型,channel的零值也是nilunix

3、channel声明

var 变量名 chan 类型指针

var test chan int日志

var test chan string

var test chan map[string]string

var test chan stu

var test chan *stu

4、channel初始化

channel是引用类型,channel的零值也是nil,因此须要使用make进行初始化,好比:

var test chan int
test = make(chan int, 10) //第1个参数chan是声明为channel,第2个参数是channel的类型,第3个参数是channel的长度

 

上述channel长度为10,channel最多只能存10个元素,当第11个元素插入时,channel也不会扩容,此时channel已经满了,队列只能阻塞住了,除非第1个被取走了,第11个才能进去channel。

 

总结:channel队列两种状况会阻塞

第一种:channel为空时,取数据会阻塞;

第二种:channel满了,再往channel中插入数据,也会阻塞

注意:若是初始化channel时不定义队列长度(无缓冲区(长度为0)channel),channel就至关于没有长度,也就至关于没有空间去存放元素。可是也有解决办法:

就是程序中:一个去放,还有一个去取,至关于立马取出来。

代码示例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int = make(chan int) //channel没有长度
    fmt.Printf("%p\n", intChan)
    go func() {
        intChan <- 100 //放100进没有长度的channel
        fmt.Printf("insert item end\n")
    }()
    go func() {
        fmt.Printf("start\n")
        time.Sleep(time.Second * 3)
        var a int
        a = <-intChan //读取100,至关于立马取
        fmt.Printf("a=%d\n", a)

    }()

    time.Sleep(time.Second * 5)
}

 执行结果以下:

 

再来看一个定义了channel(带缓冲区channel)长度的实例:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int = make(chan int, 1) //channel长度为1
    fmt.Printf("%p\n", intChan)
    go func() {
        intChan <- 100 //放100进channel,以后随时能够读取channel
        fmt.Printf("insert item end\n")
    }()
    go func() {
        fmt.Printf("start\n")
        time.Sleep(time.Second * 3)
        var a int
        a = <-intChan //a读取channel中的元素
        fmt.Printf("a=%d\n", a)

    }()

    time.Sleep(time.Second * 5)
}

 执行结果以下:

5、channel的基本操做

一、 从channel读取数据:

var testChan chan int
testChan = make(chan int, 10)
var a int
a = <- testChan  //至关于从testChan中读取出来数据并赋值给a

 

二、为channel写入数据:

var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a  //写入数据10给管道testChan

intChan <- 100

intChan是一个channel类型的变量,根据箭头方向来判断,很形象,此处就表示将100插入到管道(channel)intChan中去。

a <- intChan

a为新定义的变量,表示a读取管道intChan中的值。

6、goroutine与channel结合

代码实例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string) //不带缓冲区的channel
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) { //该goroutine函数为channel中插入数据,相似于生产者
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
}
func getData(ch chan string) { //该goroutine函数读取channel中数据,相似于消费者
    var input string
    for {
        input = <-ch
        fmt.Println(input)
    }
}

执行结果以下:

7、channel阻塞(无缓冲区)

总结:channel队列两种状况会阻塞

第一种:channel为空时,取数据会阻塞;

第二种:channel满了,再往channel中插入数据,也会阻塞

 

实例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string) //定义一个无缓冲区(长度为0)channel
    go sendData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) { //往channel中插入数据,可是没有人取,只能阻塞了
    var i int
    for {
        var str string
        str = fmt.Sprintf("stu %d", i)
        fmt.Println("write:", str)
        ch <- str
        i++
    }
}

 执行结果:

 

解释:

能够看到只有写入没有读取,阻塞住了

8、带缓冲区channel

一、以下所示, testChan长度为0:

var testChan chan int
testChan = make(chan int)
var a int
a = <- testChan

 

二、以下所示, testChan是带缓冲区的chan,一次能够放10个元素:

var testChan chan int
testChan = make(chan int, 10)
var a int = 10
testChan <- a

 

9、channel之间的同步

代码实例:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
}
func getData(ch chan string) {
    var input string
    for {
        input = <-ch
        fmt.Println(input)
    }
}

会有一个问题,若是sleep时间都结束了,可是sendData和getdata所在的函数还没执行完,那么也会被中断执行,如何解决呢:

解决办法:

一、死循环:( 缺点:有时生产者和消费者已经执行完,却依然还在死循环,退不出。)

二、标识位,也就是全局变量和加锁(缺点:比较麻烦,若是有100个goroutine,也要写100个标识位)

上述2个办法都太麻烦不可取,能够pass掉了,下面咱们有更好办法:

9.1 方法1:channel

代码实例:

package main

import (
    "fmt"
    //  "time"
)

func main() {
    ch := make(chan string)
    exitChan := make(chan bool, 3) //此例咱们有3个goroutine,因此咱们定义一个长度为3的channel,当个人channel中能够读取到3个元素时,即表示3个goroutine都执行完毕了。
    go sendData(ch, exitChan) //每个goroutine执行结束时,往channel中插入一个数据
    go getData(ch, exitChan)
    go getData2(ch, exitChan)

    //等待其余goroutine退出,当goroutine都执行完毕退出以后,channel中有3个元素,咱们能够作一个取3次的操做,当3次都取完了,表示全部goroutine都退出了
    <-exitChan  //从channel中取出来元素并未赋值给任何变量,就至关于丢弃了
    <-exitChan
    <-exitChan
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, exitCh chan bool) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch) //插入数据结束后,关闭管道channnel
    fmt.Printf("send data exited")
    exitCh <- true //此时已经往goroutine中插入数据结束,goroutine退出以前,往咱们定义的channel中插入一个数据true,至关于告知我已经执行完成
}

func getData(ch chan string, exitCh chan bool) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch  //检查管道是否被关闭
        if !ok {  //若是被关闭了,ok=false,咱们就break退出
            break
        }
        // 此处 打印出来的顺序 和写入的顺序 是一致的
        // 遵循队列的原则: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    exitCh <- true
}

func getData2(ch chan string, exitCh chan bool) {
    //var input2 string
    for {
        //input2 = <- ch
        input2, ok := <-ch
        if !ok {
            break
        }
        // 此处 打印出来的顺序 和写入的顺序 是一致的
        // 遵循队列的原则: 先入先出
        fmt.Printf("getData2中的input值:%s\n", input2)
    }
    fmt.Printf("get data2 exited\n")
    exitCh <- true
}

 执行结果以下:

注意:当咱们为channel中放入10个元素,而后把channel关闭,这些元素仍是在channel中的,不会消失的,以后想取仍是能够取出来的。

经过以下实例来证实:

package main

import (
    "fmt"
    "time"
)

func main() {
    var intChan chan int
    intChan = make(chan int, 10)
    for i := 0; i < 10; i++ {
        intChan <- i
    }

    close(intChan)
    time.Sleep(time.Second * 10)
    for i := 0; i < 10; i++ {
        var a int
        a = <-intChan
        fmt.Printf("a=%d\n", a)
    }
}

执行结果以下图:

解释:

能够看到在为channel中放入10个元素以后,就关闭了channel,以后依然能够取出来。

9.1 方法2:(推荐

针对大批量goroutine,用sync包中的waitGroup方法,其自己是一个结构体,该方法的本质在底层就是一个计数。

代码实例以下:

package main

import (
    "fmt"
    "sync"
    //  "time"
)

func main() {
    var wg sync.WaitGroup //定义一个waitgroup(结构体)类型的变量,针对大批量goroutine时比较方便。
    ch := make(chan string)
    wg.Add(3) //3个goroutine,就传入3,Add方法至关于计数
    go sendData(ch, &wg) //,至关于goroutine执行完,Add计数就减1,因此咱们将wg传入,但注意结构体必需要传入一个地址进去
    go getData(ch, &wg)
    go getData2(ch, &wg)

    wg.Wait() //只要Add中计数依然存在,就一直Wait,除非为0
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, waitGroup *sync.WaitGroup) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch)
    fmt.Printf("send data exited")
    waitGroup.Done()  //goroutine退出时,计数减1,因此这里用Done方法来通知Add方法
}

func getData(ch chan string, waitGroup *sync.WaitGroup) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch
        if !ok {
            break
        }
        // 此处 打印出来的顺序 和写入的顺序 是一致的
        // 遵循队列的原则: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    waitGroup.Done()
}

func getData2(ch chan string, waitGroup *sync.WaitGroup) {
    //var input2 string
    for {
        //input2 = <- ch
        input2, ok := <-ch
        if !ok {
            break
        }
        // 此处 打印出来的顺序 和写入的顺序 是一致的
        // 遵循队列的原则: 先入先出
        fmt.Printf("getData2中的input值:%s\n", input2)
    }
    fmt.Printf("get data2 exited\n")
    waitGroup.Done()
}

 执行结果以下:

10、for range 遍历channel

for range遍历channel的好处,channel关闭了,for range循环会自动退出

for range结束判断的标准也是看channel是否close关闭,否则就会阻塞,具体可看以下例子:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(100 * time.Second)
}
func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
    close(ch)
}
func getData(ch chan string) {
    for input := range ch {
        fmt.Println(input)
    }
}

 执行结果以下:

 

下面再看一个有channel关闭的例子,for range执行完会自动退出

实例以下:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan string)
    wg.Add(2)
    go sendData(ch, &wg)
    go getData(ch, &wg)

    wg.Wait()
    fmt.Printf("main goroutine exited\n")
}

func sendData(ch chan string, waitGroup *sync.WaitGroup) {
    ch <- "aaa"
    ch <- "bbb"
    ch <- "ccc"
    ch <- "ddd"
    ch <- "eee"
    close(ch)
    fmt.Printf("send data exited")
    waitGroup.Done()
}

func getData(ch chan string, waitGroup *sync.WaitGroup) {
    //var input string
    for {
        //input = <- ch
        input, ok := <-ch
        if !ok {
            break
        }
        // 此处 打印出来的顺序 和写入的顺序 是一致的
        // 遵循队列的原则: 先入先出
        fmt.Printf("getData中的input值:%s\n", input)
    }
    fmt.Printf("get data exited\n")
    waitGroup.Done()
}

 执行结果以下:

11、channel的关闭

1. 使用内置函数close进行关闭, chan关闭以后, for range遍历chan中

已经存在的元素后结束

2. 使用内置函数close进行关闭, chan关闭以后,没有使用for range的写法

须要使用, v, ok := <- ch进行判断chan是否关闭

12、channel的只读和只写

a. 只读chan的声明

Var 变量的名字 <-chan int

Var readChan <- chan int

 

只读实例:

package main

func main() {
    var intChan <-chan int = make(chan int, 100)
    intChan <- 100
}

 执行结果以下:

解释:

只读实例进行写入,能够看见编译时直接报错。

 

b. 只写chan的声明

Var 变量的名字 chan<- int

Var writeChan chan<- int

 

只写实例:

package main

func main() {
    var ch chan<- int = make(chan int, 100)
    <-ch
}

执行结果:

解释:

能够看见只写实例进行读取channel时,也是编译时直接报错。

 

应用场景:

好比说写一个第三方的自定义包,暴露channel给别人去掉用,这个时候就能够控制返回给别人channel的权限控制,来防止误操做。

十3、对channel进行select操做

13.1 场景:

假如channel中有数据或无数据,咱们是经过一个阻塞的读或者阻塞的写去操做数据,若是程序是去阻塞的读,那么至关于程序直接是阻塞的了,这种形式是很差的,好比说处理一个web请求,不可以阻塞的,这时候就有一种机制select操做,经过判断channel中有没有数据,若是没有数据,则当即返回。

13.2 为何要用select操做channel?

经过select语句来监测channel究竟是满了仍是空了,来避免程序阻塞,可是若是没有加default分支,程序依然仍是会被阻塞。

 

补充:

1)select语句的形式其实和switch语句有点相似,这里每一个case表明一个通讯操做;

2)在某个channel上发送或者接收,而且会包含一些语句组成的一个语句块 ;

3)select中的default来设置当 其它的操做都不可以立刻被处理时程序须要执行哪些逻辑;

4)channel 的零值是nil,  而且对nil的channel 发送或者接收操做都会永远阻塞,在select语句中操做nil的channel永远都不会被select到。因此咱们能够用nil来激活或者禁用case,来达成处理其余输出或者输出时间超时和取消的逻辑

13.3 声明语法

语法以下:

select {
    case u := <- ch1:  //channel有数据,该分支就会被激活
    case e := <- ch2: //channel有数据,该分支也会被激活
    default: //若是上述分支都未被激活,则进入default分支
}

注意:不一样的case分支调度整体来讲是平衡的,不是说永远只执行第1个分支,而不执行第2个分支。

13.4 实例

实例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var intChan chan int = make(chan int, 10)       //定义1个int类型channel,给10个空间
    var strChan chan string = make(chan string, 10) //定义1个string类型channel,给10个空间

    var wg sync.WaitGroup //经过waitgroup来控制goroutine的同步
    wg.Add(2)
    //插入数据,空间满了,channel也会阻塞,因此经过select解决
    go func() {
        var count int //由于目前for循环是一个死循环,因此须要有一个限制条件来break
        for count < 15 {
            count++
            select {
            case intChan <- 10: //插入一个10进去
                fmt.Printf("write to int chan succ\n")
            case strChan <- "hello": //插入一个hello进去
                fmt.Printf("write to str chan succ\n")
            default: //当上述全部case分支对应的管道都被被插满数据后,会走到以下default分支
                fmt.Printf("all chan is full\n")
                time.Sleep(time.Second)
            }
        }
        wg.Done() //for循环结束就能够退出了
    }()

    //读取数据
    go func() {
        var count int
        for count < 15 {
            count++
            select {
            case a := <-intChan: //读取intChan中的数据
                fmt.Printf("read to int chan succ a:%d\n", a)
            case <-strChan: //若是只想读出来strChan中的数据,并不赋值,能够这么写,但实际数据仍是读出来了
                fmt.Printf("read to str chan succ\n")
            default: //当取完上述case分支对应的全部channel数据后,其会走以下的default分支
                fmt.Printf("all chan is empty\n")
                time.Sleep(time.Second)
            }
        }
        wg.Done() //for循环结束就能够退出了
    }()
    wg.Wait()
}

 执行结果:

解释:

如上图为插入数据匿名函数执行结果(往channel中插入数据)咱们能够看到当前两个分支都写满以后,就会进入default分支,能够看到程序是不会阻塞的

十4、定时器使用

14.1 定时器的使用

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.NewTicker(time.Second)
    for v := range t.C {  //定时器Newticker会返回一个时间的channel
        fmt.Println("hello, ", v)
    }
}

  执行结果:

解释:

定时器C的方法实际上是一个只读的channel,里面放的是时间。

由于是channel,因此咱们能够用for range去遍历。

14.2 一次性定时器

package main

import (
    "fmt"
    "time"
)

func main() {
    select {
    case <-time.After(time.Second): //用time.After方法,到了1秒以后,就会触发这个分支
        fmt.Println("after")
    }
}

 执行结果以下:

14.3 超时定时器

场景

线上进行DB查询时,若是超过必定时间没有返回,那么咱们就应该给调用方返回一个值,不能一直在干等着吧,因此咱们就须要有一个超时控制。好比说:查询结果1秒没有返回,就返回一个错误给调用方。

 

如何作一个超时控制呢?

经过select来实现。

 

实例:

package main

import (
    "fmt"
    "time"
)

func queryDb(ch chan int) {
    time.Sleep(time.Second)
    ch <- 100
}
func main() {
    ch := make(chan int)
    go queryDb(ch) //起了1个goroutine,异步查询db,传入一个channel进去(异步的线程查询完,会将结果放入到channel中)。
    t := time.NewTicker(time.Second)
    select { //主线程进行查询,若是channel中有数据,就会去指定分支,若是没有也会去指定分支
    case v := <-ch:
        fmt.Println("result", v)
    case <-t.C: //超过1秒,就会触发该分支,上面channel中还有数据的话,就会走以下分支,也就是超时了。
        fmt.Println("timeout")
    }
}

执行结果:

 

十5、goroutine中使用recover

应用场景,若是某个goroutine panic了,并且这个goroutine里面没有捕获(recover), 那么整个进程就会挂掉。因此,好的习惯是每当go产生一个goroutine,就须要写下recover。

首先咱们来模拟一下这种状况:

实例:

package main

import (
    "fmt"
    "time"
)

func main() {
    go func() {
        var p *int
        *p = 1000
        fmt.Printf("hello")
    }()

    var i int
    for {
        fmt.Printf("%d\n", i)
        time.Sleep(time.Second)
    }
}

 执行结果:

解释:

咱们能够看到匿名函数所在的goroutine线程由于打印了一个空指针致使panic了,进而最终致使主线程也panic了。

 

因此咱们该如何取捕获(recover)子线程的panic,使其不影响主线程的运行呢?

如何解决是很重要的,好比web应用场景,不能由于一个web请求挂掉而影响其余的web请求致使服务崩掉。下面咱们来看一看解决方案:

解决方案:

经过recover函数来捕获goroutine内的任何异常。

实例以下:

package main

import (
    "fmt"
    "time"
)

func main() {
    go func() {
        defer func() { //捕获异常
            err := recover() //调用recover函数来作
            if err != nil {
                fmt.Printf("catch panic exception err:%v\n", err)
            }
        }()
        var p *int
        *p = 1000
        fmt.Printf("hello")
    }()

    var i int
    for {
        fmt.Printf("%d\n", i)
        time.Sleep(time.Second)
    }
}

 执行结果:

能够看到经过捕获(recover)goroutine的panic异常后,只会影响panic的goroutine,并不会影响到其余goroutine和主线程。

总结:

因此以后咱们须要养成一个好的习惯,每起一个goroutine时,须要捕获一下异常,至关于记一个日志错误,这样咱们也能够经过这个错误日志知道程序出问题在哪里,也能够去修复了

相关文章
相关标签/搜索