关于Goroutine与Channel

关于Goroutine的原理

原理上的内容比较多,好比goroutine启动的时候要执行哪些相关的操做,一点一点的补充一下。web

channel的基本原理

channel是go语言中的特殊的机制,既能够同步两个被并发执行的函数,又可让这两个函数经过相互传递特定类型的值来进行通讯。事实上这也是channel的两个主要功能。算法

按照通道在初始化时是否有缓冲值,又能够分为缓冲通道非缓冲通道。通道初始化的时候也仍是须要使用make进行,好比make(chan int,10)声明一个缓冲空间为10个int的通道,直接make(chan int)就是声明一个非缓冲通道编程

直接采用内建函数close(strChan)就能够关闭通道。应该保证在安全的状况下进行关闭通道的操做。基本的原则:内建函数 len(strChan)能够查看通道中当前有的元素的数量 cap(strChan)能够查看通道的总的容量,总容量一旦初始化以后就不会再发生改变了。json

关于select语句的使用,在go语言中,执行select语句的时候,会自动地自上而下地判断每一个case中的发送或者接受的操做能否被当即执行,便是说当前的Goroutine不会所以操做而被阻塞。select语句在执行的时候,会先对各个case中的表达式进行判断求值,并且直到全部的求值操做都完成以后才会考虑选其中的某个case去执行。这要依据当时通道的特性来判断,当发现第一个知足选择条件的case的时候,这个case中的语句就会被执行,其余的语句就会被忽略,当有多个case都知足状况的话,系统会根据一个伪随机算法决定哪一个case会被执行。default是一个特殊的case,若是没有合适的case的话,default中的语句就会被执行,若是select语句中没有加上default语句,那么若是此时没有case符合条件的话,当前goroutine就会一直阻塞在当前的这一条select语句上。所以default:对于select而言是必要的。安全

一般select还会和for语句结合在一块儿来使用,由于单独的select操做只会被选择一次,要想持续不断地使用select从通道中读出信息,仍是要和for结合在一块儿使用。因而跳出多层循环的时候,特别是添加了超时控制的案例,能够参考使用场景(2)中介绍的两种方法.websocket

  • 不管怎样都不该该在接收端关闭通道,由于没法判断发送端是否还有数据要发送,通道有一个很好的特性,就是发送端关闭通道后,接收端仍然能够正常接受已经存在通道中的数据。谁启的通道,谁最后负责关,是这个道理。
  • 注意element , ok := <-chann 的这种语法, 若是通道被关闭则ok的值会变为false,element的值会变为该通道类型的零值,一般用ok这种语法来判断是否退出某个循环。好比下面这段代码,同时也能够看下goroutine的相关使用模式:
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 1)
    sign := make(chan byte, 2)

    go func() {
        for i := 0; i < 5; i++ {
            ch <- i

            time.Sleep(1 * time.Second)
        }

        close(ch)
        fmt.Println("The channel is closed.")
        sign <- 0
    }()

    go func() {
        //这个循环会一直尝试从ch中读取信息出来 即便ch已经被发送端关闭
        //但仍是能够读信息出来 最后当ok 为false的时候 说明已经没有数据从ch中读出
        //跳出循环 注意这种判断方式
        for {
            fmt.Printf("before extract channel len: %v ,", len(ch))
            e, ok := <-ch
            fmt.Printf("channel value: %d if extract ok :(%v) after extraction channel len : %v channel cap : %v \n", e, ok, len(ch), cap(ch))
            if !ok {
                break
            }

            time.Sleep(2 * time.Second)
        }
        fmt.Println("Done.")
        sign <- 1
    }()
    //要是不添加两次取值的操做的话 主进程就会立刻结束 这里至关因而实现了一个
    //同步的操做 等待两个go func都结束以后 再结束主进程 注意这种技巧
    <-sign
    <-sign
}

/*output:
before extract channel len: 1 ,channel value: 0 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 1 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 2 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 3 if extract ok :(true) after extraction channel len : 0 channel cap : 1
The channel is closed.
before extract channel len: 1 ,channel value: 4 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 0 ,channel value: 0 if extract ok :(false) after extraction channel len : 0 channel cap : 1
Done.
*/
  • 一样的从通道中迭代取出元素的操做还可使用 for range 来进行操做,当通道已经被关闭或者没有值能够再接收的话,for循环会当即被结束,好比使用场景(3)中的Batch函数,能够修改为以下的方式,更加简洁:
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    dests := make(chan Person, 100)
    go func() {
        for p :=range origs{
            handler.Handle(&p)
            log.Printf("old value : %v\n", p)
            //time.Sleep(time.Second)
            dests <- p
        }
        fmt.Println("alll the info has been handled")
        close(dests)
    }()
    return dests
}

关于通道的基本原则

  • 通道缓冲已经满了的时候,再向通道中发送数据,会形成Goroutine的阻塞,通道没有初始化,即值为nil的时候,向其中发送数据会形成通道永久阻塞。
  • 关闭通道的操做应该由发送端来进行,通道关闭后,若是还有数据,接收端仍能够正常接受数据。
  • 向通道中发送值,进行的是值传递

channel使用场景分析

使用场景(1)

注意 app.go文件夹中的 346 行左右开始地方的一个坑 注意time.After的返回值 因为放在了for循环中 所以 每次会新new 一个 channel出来 还有注意跳出多层循环的方式
主要参考的是《Go并发编程实战的相关内容》并发

代码以下:app

package main

import (
    "fmt"
    "runtime"
)

func main() {
    names := []string{"E", "H", "R", "J", "M"}
    for _, name := range names {
        go func() {
            fmt.Printf("Hello , %s \n", name)
        }()
    }
    //要是不添加runtime的话 就不会有内容输出
    //由于for循环执行速度太快了 直接循环结束跳出了最后的循环
    //以后 for循环中生成的5个go func 会被分别进行调度
    runtime.Gosched()
}

/* output
Hello , M 
Hello , M 
Hello , M 
Hello , M 
Hello , M
*/

根据代码能够看出,具体循环的时候for循环中的go func 的调度并非按照想象的那样,一次循环一个go func ,不要对go func的执行时机作任何假设。异步

优化方案

一种思路是把runtime.Gosched()函数放在每次for循环结束的时候,这样每次for循环以后,都会被从新调度一次,可能会出现正确的结果,并非每次都准确,好比go func的程序须要运行一段时间,在这段运行的时间以内,可能就已经循环了几个元素过去了socket

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    for _, name := range names {
        go func() {
            time.Sleep(1000 * time.Nanosecond)
            fmt.Printf("Hello , %s \n", name)
        }()

        runtime.Gosched()
    }

}

/* output:
Hello , E
Hello , J
Hello , J
Hello , P
Hello , P
Hello , P
*/

还有一种思路是采用传递参数的方式,就是给goroutine带上了参数,虽然goroutine已经脱离了main函数的控制,可是它已经带上了main函数给的烙印,至关因而某种解耦的感受,for循环屡次就不会影响打印的结果了,好比下面代码:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    for _, name := range names {
        go func(who string) {
            time.Sleep(1000 * time.Nanosecond)
            fmt.Printf("Hello , %s \n", who)
        }(name)

    }
    runtime.Gosched()

}

/* output:
Hello , E
Hello , H
Hello , R
Hello , J
Hello , M
*/

可是这个方法仍然颇有问题,只能保证在函数执行时间很短的时候结果正常,并且不输出重复的内容,若是程序执行时间比较长的话,颇有可能main函数会被提早结束,按顺序生成的多个goroutine在cpu那里会不会仍然按照顺序被调度执行?这个仍然不肯定?有几个goroutine会不能被正常调度到而且执行,好比像上面的代码的输出样子,并且每次输出的结果也都是不肯定的。

使用场景(2)

编码的时候遇到这样一个场景,服务建立成功以后,须要等待ip被分配,ip被分配好以后,服务才正式部署成功,最后将全部的信息返回给前台,因而打算这样实现,在服务建立成功以后就不断循环,查询ip若是分配成功了就返回,若是超过了时间也返回失败,最后这部分的代码像下面这样,
第一个例子中退出的方式采用的是标记的思路形式,每次循环结束的时候会检查一下标记看看是否退出,第二个采用的是特殊的语法,直接跳出最外层的循环,注意这种时间控制的实现,仍是弄成一个defalt一个case比较好,因为case的调度可能有随机性,所以正常执行的内容放在default的部分,时间控制的那个channel放在某一个case当中。

package main

import (
    "fmt"
    "time"
)

func main() {
    sign := make(chan int)
    chtemp := make(chan int, 5)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Millisecond * 300)
            chtemp <- i
        }
        close(chtemp)

    }()
    var e int
    ok := true

    //new 一个新的channel返回 注意这里要提早声明好
    t := time.After(time.Second)
    go func() {
        for {
            select {
            case <-t:
                fmt.Println("time out")
                ok = false
                break
                //注意这里是使用 = 而不是 :=

            default:
                e, ok = <-chtemp
                fmt.Printf("value : %v \n", e)
                if !ok {
                    break
                }
            }
            if !ok {
                sign <- 1
                break
            }
        }
    }()
    <-sign
}
//一个时间控制的channel
//注意这个要在循环以外单独声明 不然每次都会分配一个新的 time.After的channel返回过来
t := time.After(time.Second * 10)

//注意这种跳出多层循环的操做方式 要是单层使用break的话 仅仅跳出的是 select 那一层的循环

A:
    for {
        select {
        //若是时间到了 就返回错误信息
        case <-t:
            log.Println("time out to allocate ip")
            //delete the se which deploy failed
            a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
            http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+"deploy error : time out"+`"}`, 406)
            break A
        //若是时间没到 就是 t 尚未发回信息 select语句就默认跳转到default块中
        //执行查找ip是否分配的操做
        default:
            //log.Println("logout:", <-timeout)
            sename := service.ObjectMeta.Labels["name"]
            podslist, err := a.Podip(sename)
            if err != nil {
                log.Println(err.Error())
                a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
                http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+err.Error()+`"}`, 406)
                break A
            }
            if len(podslist) == 0 {
                continue
            } else {
                log.Println("allocation ok ......")
                a.Data["json"] = detail
                a.ServeJson()
                break A
            }

        }

    }

使用场景(3)

经常有这样一种场景,把某些信息从旧的资源池中取出来,通过一些加工处理,再放入新的资源池中,这个过程若是按传统的方式就是采用彻底串行的方式效率会很低,粒度太粗了,具体的粒度能够细化以每次所取的单位资源为粒度。
好比以书上p339为例,有一个资源池存储这person的信息,将每一个person从中取出来,以后进行一些处理,再存到新的资源池中,这里用oldarray以及newarray来模拟旧的和新的资源池:

具体的代码以下:

package main

//参考go 并发编程实战 p337
import (
    "log"
    "strconv"
    "time"
)

type Person struct {
    name string
    age  int
    addr string
}

var oldpersonarray = [5]Person{}
var newpersonarray = [5]Person{}

type PersonHandler interface {
    Batch(origs <-chan Person) <-chan Person
    Handle(orig *Person)
}

//struct 实现了personhandler 接口
type PersonHandlerImpl struct{}

//从origs接收信息 处理以后再返回给新的channel
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    dests := make(chan Person, 100)
    go func() {
        for {
            p, ok := <-origs
            if !ok {
                close(dests)
                break
            }
            handler.Handle(&p)
            log.Printf("old value : %v\n", p)
            //time.Sleep(time.Second)
            dests <- p
        }
    }()
    return dests
}

//这里要使用引用传递
func (handler PersonHandlerImpl) Handle(orig *Person) {
    orig.addr = "new address"
}

func getPersonHandler() PersonHandler {
    return &PersonHandlerImpl{}

}

//print the oldpersonarray into the chan<-Person
func fetchPerson(origs chan<- Person) {
    for _, v := range oldpersonarray {
        //fmt.Printf("get the value : %v %v \n", k, v)
        time.Sleep(time.Second)
        origs <- v
    }
    close(origs)

}

//fetch the value from the channel and store it into the newpersonarray
func savePerson(dest <-chan Person) <-chan int {
    intChann := make(chan int)
    go func() {
        index := 0
        for {
            p, ok := <-dest
            if !ok {
                break
            }
            //time.Sleep(time.Second)
            log.Printf("new value transfer %v \n", p)

            newpersonarray[index] = p
            index++

        }

        intChann <- 1
    }()
    return intChann
}

func init() {
    //使用range的话是值传递 这里要给oldpersonarray赋值进来
    tmplen := len(oldpersonarray)
    for i := 0; i < tmplen; i++ {
        oldpersonarray[i].addr = "old address"
        oldpersonarray[i].age = i
        oldpersonarray[i].name = strconv.Itoa(i)

    }

    log.Printf("first print init value : %v\n", oldpersonarray)

}
func main() {

    handeler := getPersonHandler()
    origs := make(chan Person, 100)
    dests := handeler.Batch(origs)
    //go func() { fetchPerson(origs) }()
    // 不加go func的话 要等这句执行完 才能执行下一句
    // 则orgis信息都输出 彻底关闭掉 这个时候 从dest接收信息的语句才开始执行
    // 因此不会动态输出 这句加上go func的话 就会没隔 1s 动态输出
    // 若是将fetchPerson 再往前面放一句 则old value也不会动态输出
    fetchPerson(origs)
    sign := savePerson(dests)
    <-sign
    log.Printf("last print new value : %v \n", newpersonarray)

}

总体的结构图以下:

代码结构图

代码基本分析:

  • 首先声明一个 PersonHandler 的接口,以后声明一个struct PersonHandlerImpl 将接口中的两个方法都实现了,init函数用于进行oldarray的初始化工做。注意为了减小出错,内部的函数在方声明的时候都是单向的channel。
  • 1,2 fetchperson从oldarray中区数据,并把数据存到origs channel中,注意最后取完数据到通道以后,要由发送方将channel关闭,不然可能形成deadlock。注意在main函数中,若是fech操做没有放到一个goroutine中来执行,就仍然是串行的,至关因而把数据都放入到channel中,另外一端才开始取,没发挥出并发的优点。
  • 3,4 Batch函数将person信息从origs中取出来,进行处理后,同时传到dests中,最后将dests返回,注意这里不是所有传入以后才将dests返回,而是新启动一个goroutine执行传入操做,同时将dests返回,注意要主动关闭channel。
  • 5 savePerson操做接收一个<-chann 以后从中接受person信息,将值写入到新的资源池中,最后所有写入结束以后,传一个sign channel给主进程,结束。
  • 总结,在须要动态输出信息的时候,goroutine每每是和channel结合在一块儿使用。最多见的用法是,一个goroutine负责向channel中写入数据,以后将channel返回,由其余进程取出信息。好比以前写过的一些websocket从前台接受信息,后台处理信息以后再动态返回给前台打出结果的模型,就和这个差很少,总之具体的异步执行流程要理清楚,都有哪些channel,负责传递的信息分别是什么。
相关文章
相关标签/搜索