golang goroutine和channel

goroutine

顺序通讯进程 CSP

“顺序通讯进程”(communicating sequential processes)或被简称为CSP。CSP是一种现代的并发编程模型,在这种编程模型中值会在不一样的运行实例(goroutine)中传递,尽管大多数状况下仍然是被限制在单一实例中。express

当一个程序启动时,其主函数即在一个单独的goroutine中运行,咱们叫它main goroutine。新的goroutine会用go语句来建立。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新建立的goroutine中运行。而go语句自己会迅速地完成。编程

示例 斐波那契数列

计算斐波那契数列+可见的标识来代表程序在正常运行缓存

// animation
func spinner(delay time.Duration) {
	for {
		for _, r := range `-\|/`{
			fmt.Printf("\r%c", r)
			time.Sleep(delay)
		}
	}
}

// get fibonacci of Nth
func fib(x int) int {
	if x < 2 {
		return x
	}
	return fib(x-1) + fib(x-2)
}

func main() {
	go spinner(100 * time.Millisecond)
	const n = 40
	fibN := fib(n) // slow
	fmt.Printf("\rfibonacci(%d) = %d\n", n, fibN)
}

程序分析

main()函数开始运行建立了一个main goroutine性能优化

go spinner建立一个子协程,子协程中存在死循环,因此只有在main goroutine退出时,子协程才会结束。在子协程printsleep时,子协程会主动交出控制权,而后主协程继续运行;服务器

主协程在计算斐波那契数列也会花费较多的时间,在函数递归调用时,也会交出控制权,让其子他协程运行;网络

主函数返回时,全部的goroutine都会被直接打断,程序退出。数据结构

主协程、子协程交替取得控制权,就造成了并发的效果:一边进行显示输出,一边进行斐波那契计算。并发

示例 并发的Clock服务

例子是一个顺序执行的时钟服务器,它会每隔一秒钟将当前时间写到客户端。tcp

clock服务器每个链接都会起一个goroutine。函数

func main() {
	listener, err := net.Listen("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Print(err) // e.g., connection aborted
			continue
		}
		handleConn(conn) // handle one connection at a time
	}
}

func handleConn(c net.Conn) {
	defer c.Close()
	for {
		_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
		if err != nil {
			return // e.g., client disconnected
		}
		time.Sleep(1 * time.Second)
	}
}

程序分析

Listen函数建立了一个net.Listener的对象,这个对象会监听一个网络端口上到来的链接,listener对象的Accept方法会直接阻塞,直到 一个新的链接被建立,而后会返回一个net.Conn对象来表示这个链接。

handleConn函数会处理一个完整的客户端链接。在一个for死循环中,将当前的时候用 time.Now()函数获得,而后写到客户端。因为net.Conn实现了io.Writer接口,咱们能够直接向 其写入内容。这个死循环会一直执行,直到写入失败。最可能的缘由是客户端主动断开链接。这种状况下handleConn函数会用defer调用关闭服务器侧的链接,而后返回到主函数,继续等待下一个链接请求。

这里能够对服务端程序作一点小改动, 使其支持并发: 在handleConn函数调用的地方增长go关键字,让每一次handleConn的调用都 进入一个独立的goroutine。

模拟简单的telnet程序

func main() {
	conn, err := net.Dial("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	mustCopy(os.Stdout, conn)
}

func mustCopy(dst io.Writer, src io.Reader) {
	if _, err := io.Copy(dst, src); err != nil {
		log.Fatal(err)
	}
}

程序分析

这个程序会从链接中读取数据,并将读到的内容写到标准输出中,直到遇到end of file的条件 或者发生错误。

回声程序

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"strings"
	"time"
)

//
func echo(c net.Conn, shout string, delay time.Duration) {
	fmt.Fprintln(c, strings.ToUpper(shout), time.Now())
	time.Sleep(delay)
	fmt.Fprintln(c, shout, time.Now())
	time.Sleep(delay)
	fmt.Fprintln(c, strings.ToLower(shout), time.Now())
}

func handleConn(c net.Conn) {
	input := bufio.NewScanner(c)
	for input.Scan() {
		if input.Text() == "quit" {
			c.Close()
		} else {
			echo(c, input.Text(), 1*time.Second)
		}
	}
	// NOTE: ignoring potential errors from input.Err()
	c.Close()
}

func main() {
	listen, err := net.Listen("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := listen.Accept()
		if err != nil {
			log.Print(err) // e.g., connection aborted
			continue
		}
		go handleConn(conn)
	}
}

Channels

goroutine是Go语音程序的并发机制,channels是goroutine间的通讯机制。

一个 channels是一个通讯机制,它可让一个goroutine经过它给另外一个goroutine发送值信息。每 个channel都有一个特殊的类型,也就是channels可发送数据的类型。一个能够发送int类型数据的channel通常写为chan int。

建立 channel

ch := make(chan int)
ch = make(chan int) // unbuffered channel 
ch = make(chan int, 0) // unbuffered channel 
ch = make(chan int, 3) // buffered channel with capacity 3

和map相似,channel也一个对应make建立的底层数据结构的引用。channel的零值也是nil。

一个channel有发送和接受两个主要操做,都是通讯行为。一个发送语句将一个值从一个 goroutine经过channel发送到另外一个执行接收操做的goroutine。

ch <- x  // a send statement 
x = <-ch // a receive expression in an assignment statement 
<-ch     // a receive statement; result is discarded

消息事件

有些消息事件并不携带额外的信息,它仅仅是用做两个goroutine之间的同步,这时候能够用struct{}空结构体做为channels元素的类型。

什么时候关闭channel

Channel还支持close(ch)操做,用于关闭channel。随后对基于该channel的任何发送操做都将致使panic异常。对一个已经被close过的channel之行接收操做依然能够接受到以前已经成功发送的数据;若是channel中已经没有数据的话讲产生一个零值的数据。

只有当须要告诉接收者goroutine,全部的数据已经所有发送时才须要关闭channel。无论一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。试图重复关闭一个channel将致使panic异常,试图关闭一个nil值的channel也将致使panic异常。关闭一个channels还会触发一个广播机制。

串联的channel和单方向的Channel

Channels也能够用于将多个goroutine链接在一块儿,一个Channel的输出做为下一个Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。

22-57-08-V7hU7g

第一个goroutine是一个计数器,用于生成0、一、二、……形式的整数序列,而后经过channel将该整数序列发送给第二个goroutine;第二个goroutine是一个求平方的程序,对收到的每一个整数求平方,而后将平方后的结果经过第二个channel发送给第三个goroutine;第三个goroutine是一个打印程序,打印收到的每一个整数。

关闭channel

// Squarer
go func() {
    for {
        x, ok := <-naturals
        if ok != nil {
            break // channel was closed and drained
        }
        squares <- x * x
    }
    close(squares)
}()

单方向的channel

package main

import "fmt"

func counter(out chan<- int) {
	for x := 0; x < 100; x++ {
		out <- x
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

func printer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

func main() {
	naturals := make(chan int)
	squares := make(chan int)
	go counter(naturals)
	go squarer(squares, naturals)
	printer(squares)
}

调用counter(naturals)时,naturals的类型将隐式地从chan int转换成chan<- int。任何双向channel向单向channel变量的赋值操做都将致使该隐式转换。这里并无反向转换的语法:也就是不能将一个相似chan<- int类型的单向型的channel转换为chan int类型的双向型的channel。

带缓存的Channels

带缓存的Channel内部持有一个元素队列。队列的最大容量是在调用make函数建立channel时经过第二个参数指定的。下面的语句建立了一个能够持有三个字符串元素的带缓存Channel。

ch = make(chan string, 3)

23-16-15-l5TFVg

向缓存Channel的发送操做就是向内部缓存队列的尾部插入元素,接收操做则是从队列的头部删除元素。若是内部缓存队列是满的,那么发送操做将阻塞直到因另外一个goroutine执行接收操做而释放了新的队列空间。相反,若是channel是空的,接收操做将阻塞直到有另外一个goroutine执行发送操做而向队列插入元素。

当channel的缓存队列将不是满的也不是空的,对该channel执行的发送或接收操做都不会发生阻塞。经过这种方式,channel的缓存队列解耦了接收和发送的goroutine。

在某些特殊状况下,程序可能须要知道channel内部缓存的容量,能够用内置的cap函数获取:

fmt.Println(cap(ch)) // "3"

一样,对于内置的len函数,若是传入的是channel,那么将返回channel内部缓存队列中有效元素的个数。由于在并发程序中该信息会随着接收操做而失效,可是它对某些故障诊断和性能优化会有帮助。

fmt.Println(len(ch)) // "2"

Go语言新手有时候会将一个带缓存的channel看成同一个goroutine中的队列使用,虽然语法看似简单,但实际上这是一个错误。Channel和goroutine的调度器机制是紧密相连的,一个发送操做——或许是整个程序——可能会永远阻塞。若是你只是须要一个简单的队列,使用slice就能够了。

示例--返回最快的请求

例子展现了一个使用了带缓存channel的应用。它并发地向三个镜像站点发出请求,三个镜像站点分散在不一样的地理位置。它们分别将收到的响应发送到带缓存channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。所以mirroredQuery函数可能在另外两个响应慢的镜像站点响应以前就返回告终果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。)

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

return以后,没结束的goroutine也会结束吗?

goroutines泄漏

若是咱们使用了无缓存的channel,那么两个慢的goroutines将会由于没有人接收而被永远卡住。这种状况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不一样,泄漏的goroutines并不会被自动回收,所以确保每一个再也不须要的goroutine能正常退出是重要的。

channel的选择--缓存和不带缓存

关于无缓存或带缓存channels之间的选择,或者是带缓存channels的容量大小的选择,均可能影响程序的正确性。无缓存channel更强地保证了每一个发送操做与相应的同步接收操做;可是对于带缓存channel,这些操做是解耦的。一样,即便咱们知道将要发送到一个channel的信息的数量上限,建立一个对应容量大小的带缓存channel也是不现实的,由于这要求在执行任何接收操做以前缓存全部已经发送的值。若是未能分配足够的缓冲将致使程序死锁。

蛋糕生产线的比喻

Channel的缓存也可能影响程序的性能。想象一家蛋糕店有三个厨师,一个烘焙,一个上糖衣,还有一个将每一个蛋糕传递到它下一个厨师在生产线。在狭小的厨房空间环境,每一个厨师在完成蛋糕后必须等待下一个厨师已经准备好接受它;这相似于在一个无缓存的channel上进行沟通。

若是在每一个厨师之间有一个放置一个蛋糕的额外空间,那么每一个厨师就能够将一个完成的蛋糕临时放在那里而立刻进入下一个蛋糕在制做中;这相似于将channel的缓存队列的容量设置为1。只要每一个厨师的平均工做效率相近,那么其中大部分的传输工做将是迅速的,个体之间细小的效率差别将在交接过程当中弥补。若是厨师之间有更大的额外空间——也是就更大容量的缓存队列——将能够在不中止生产线的前提下消除更大的效率波动,例如一个厨师能够短暂地休息,而后再加快遇上进度而不影响其余人。

另外一方面,若是生产线的前期阶段一直快于后续阶段,那么它们之间的缓存在大部分时间都将是满的。相反,若是后续阶段比前期阶段更快,那么它们之间的缓存在大部分时间都将是空的。对于这类场景,额外的缓存并无带来任何好处。

生产线的隐喻对于理解channels和goroutines的工做机制是颇有帮助的。例如,若是第二阶段是须要精心制做的复杂操做,一个厨师可能没法跟上第一个厨师的进度,或者是没法知足第三阶段厨师的需求。要解决这个问题,咱们能够雇佣另外一个厨师来帮助完成第二阶段的工做,他执行相同的任务可是独立工做。这相似于基于相同的channels建立另外一个独立的goroutine。

相关文章
相关标签/搜索