Golang —— goroutine(协程)和channel(管道)

协程(goroutine)

协程(goroutine)是Go中应用程序并发处理的部分,它能够进行高效的并发运算。算法

  • 协程是轻量的,比线程更廉价。使用4K的栈内存就能够在内存中建立。
  • 可以对栈进行分割,动态地增长或缩减内存的使用。栈的管理会在协程退出后自动释放。
  • 协程的栈会根据须要进行伸缩,不出现栈溢出。

协程的使用

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("In main()")
	go longWait()
	go shortWait()
	fmt.Println("About to sleep in main()")

	//time.Sleep(4 * 1e9)
	time.Sleep(10 * 1e9)
	fmt.Println("At the end of main()")
}

func longWait() {
	fmt.Println("Beginning longWait()")
	time.Sleep(5 * 1e9)
	fmt.Println("End of longWait()")
}

func shortWait() {
	fmt.Println("Beginning shortWait()")
	time.Sleep(2 * 1e9)
	fmt.Println("End of shortWait()")
}
复制代码

Go中用go关键字来开启一个协程,其中main函数也能够看作是一个协程。缓存

不难理解,上述代码的输出为:bash

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
End of longWait()
At the end of main()
复制代码

可是,当咱们将main的睡眠时间设置成4s时,输出发生了改变。并发

In main()
About to sleep in main()
Beginning shortWait()
Beginning longWait()
End of shortWait()
At the end of main()
复制代码

程序并无输出End of longWait(),缘由在于,longWait()main()运行在不一样的协程中,二者是异步的。也就是说,早在longWait()结束以前,main已经退出,天然也就看不到输出了。异步

通道(channel)

通道(channel)是Go中一种特殊的数据类型,能够经过它们发送类型化的数据在协程之间通讯,避开内存共享致使的问题。async

通道的通讯方式保证了同步性,而且同一时间只有一个协程可以访问数据,不会出现数据竞争函数

以工厂的传输带为例,一个机器放置物品(生产者协程),通过传送带,到达下一个机器打包装箱(消费者协程)。学习

通道的使用

在学习使用管道以前,咱们先来看一个“悲剧”。ui

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("Reveal romantic feelings...")
	go sendLove()
	go responseLove()
	waitFor()
	fmt.Println("Leaving ☠️....")
}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove() {
	time.Sleep(6 * 1e9)
	fmt.Println("Love you, too")
}
复制代码

用上面学习的知识,不难看出。。。真的惨啊spa

Reveal romantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Leaving ☠️....
复制代码

明明收到了暗恋女孩的回应,然而却觉得对方不接受本身的情感,含泪离去。【TAT】

可见,协程之间没有互相通讯将会引发多么大的误解。幸亏,咱们有了channel,如今就来一块儿改写故事的结局吧~

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)
	var answer string

	fmt.Println("Reveal fomantic feelings...")
	go sendLove()
	go responseLove(ch)
	waitFor()
	answer = <-ch

	if answer != "" {
		fmt.Println(answer)
	} else {
		fmt.Println("Dead ☠️....")
	}

}

func waitFor() {
	for i := 0; i < 5; i++ {
		fmt.Println("Keep waiting...")
		time.Sleep(1 * 1e9)
	}
}

func sendLove() {
	fmt.Println("Love you, mm ❤️")
}

func responseLove(ch chan string) {
	time.Sleep(6 * 1e9)
	ch <- "Love you, too"
}
复制代码

输出为:

Reveal fomantic feelings...
Love you, mm ❤️
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Keep waiting...
Love you, too
复制代码

皆大欢喜。

这里咱们用ch := make(chan string)建立了一个string类型的管道,固然咱们还能够构建其余类型好比ch := make(chan int),甚至一个函数管道funcChan := chan func()

咱们还用到了一个通讯操做符<-

  • 流向通道:ch <- content,用管道ch发送变量content。

  • 从通道流出:answer := <- ch,变量answer从通道ch接收数据。

  • <- ch能够单独调用,以获取通道的下一个值,当前值会被丢弃,可是能够用来验证,好比:

    if <- ch != 100 {
        /* do something */
    }
    复制代码

通道阻塞

  • 对于同一通道,发送操做在接受者准备好以前是不会结束的。这就意味着,若是一个无缓冲通道在没有空间接收数据的时候,新的输入数据没法输入,即发送者处于阻塞状态。
  • 对于同一通道,接收操做是阻塞的,直到发送者可用。若是通道中没有数据,接收者会保持阻塞。

以上两条性质,反映了无缓冲通道的特性:同一时间只容许至多一个数据存在于通道中

咱们经过例子来感觉一下:

package main

import "fmt"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	fmt.Println(<-ch1)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}
复制代码

程序输出:

0
复制代码

这里的pump()函数被称为生产者

解除通道阻塞

package main

import "fmt"
import "time"

func main() {
	ch1 := make(chan int)
	go pump(ch1)
	go suck(ch1)
	time.Sleep(1e9)
}

func pump(ch chan int) {
	for i := 0; ; i++ {
		ch <- i
	}
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}
复制代码

这里咱们定义了一个suck函数,做为接收者,并给main协程一个1s的运行时间,因而,便产生了70W+的输出【TAT】。

通道死锁

通道两段互相阻塞对方,会造成死锁状态。Go运行时会检查并panic,中止程序。无缓冲通道会被阻塞。

package main

import "fmt"

func main() {
	out := make(chan int)
	out <- 2
	go f1(out)
}

func f1(in chan int) {
	fmt.Println(<-in)
}
复制代码
fatal error: all goroutines are asleep - deadlock!
复制代码

显然在out <- 2的时候,因为没有接受者,主线程被阻塞。

同步通道

除了普通的无缓存通道外,还有一种特殊的带缓存通道——同步通道

buf := 100
ch1 := make(chan string, buf)
复制代码

buf是通道能够同时容纳的元素个数,即ch1的缓冲区大小,在buf满以前,通道都不会阻塞。

若是容量大于0,通道就是异步的:在缓冲满载或边控以前通讯不会阻塞,元素会按照发送的顺序被接收。

同步:ch := make(chan type, value)

  • value ==0 --> synchronous, unbuffered(阻塞)
  • value > 0 --> asynchronous, buffered(非阻塞)取决于value元素

使用通道缓冲能使程序更具备伸缩性(scalable)。

尽可能在首要位置使用无缓冲通道,只在不肯定的状况下使用缓冲。

package main

import "fmt"
import "time"

func main() {
	c := make(chan int, 50)
	go func() {
		time.Sleep(15 * 1e9)
		x := <-c
		fmt.Println("received", x)
	}()
	fmt.Println("sending", 10)
	c <- 10
	fmt.Println("send", 10)
}

复制代码

信号量模式

func compute(ch chan int) {
    ch <- someComputation()
}

func main() {
    ch := make(chan int)
    go compute(ch)
    doSomethingElaseForAWhile()
    result := <-ch
}
复制代码

协程经过在通道ch中放置一个值来处理结束信号。main线程等待<-ch直到从中获取到值。

咱们能够用它来处理切片排序:

done := make(chan bool)

doSort := func(s []int) {
    sort(s)
    done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done
复制代码

带缓冲通道实现信号量

信号量时实现互斥锁的经常使用同步机制,限制对资源的访问,解决读写问题。

  • 带缓冲通道的容量要和同步的资源容量相同
  • 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
  • 容量减去通道的长度等于未处理的资源个数
//建立一个长度可变但容量为0的通道
type Empty interface {}
type semaphore chan Empty
复制代码

初始化信号量

sem = make(semaphore, N)
复制代码

对信号量进行操做,创建互斥锁

func (s semaphore) P (n int) {
    e := new(Empty)
    for i := 0; i < n; i++ {
        s <- e
    }
}

func (a semaphore) V (n int) {
    for i := 0; i < n; i++ {
        <- s
    }
}

/* mutexes */
func (s semaphore) Lock() {
	s.P(1)
}

func (s semaphore) Unlock(){
	s.V(1)
}

/* signal-wait */
func (s semaphore) Wait(n int) {
	s.P(n)
}

func (s semaphore) Signal() {
	s.V(1)
}
复制代码

通道工厂模式

不将通道做为参数传递,而是在函数内生成一个通道,并返回。

package main

import (
	"fmt"
	"time"
)

func main() {
	stream := pump()
	go suck(stream)
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	for {
		fmt.Println(<-ch)
	}
}
复制代码

通道使用for循环

for循环能够从ch中持续获取值,直到通道关闭。(这意味着必须有另外一个协程写入ch,而且在写入完成后关闭)

for v := range ch {
    fmt.Println("The value is", v)
}
复制代码
package main

import (
	"fmt"
	"time"
)

func main() {
	suck(pump())
	time.Sleep(1e9)
}

func pump() chan int {
	ch := make(chan int)
	go func() {
		for i := 0; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func suck(ch chan int) {
	go func() {
		for v := range ch {
			fmt.Println(v)
		}
	}()
}
复制代码

通道的方向

通道能够表示它只发送或者只接受:

var send_only chan<- int    // channel can only send data
var recv_only <-chan int    // channel can only receive data
复制代码

只接收的通道(<-chan T)没法关闭,由于关闭通道是发送者用来表示再也不给通道发送值,因此对只接收通道是没有意义的。

管道和选择器模式

借鉴一个经典的例子筛法求素数来学习这一内容。

这个算法的主要思想是,引入筛法(一种时间复杂度为O(x * ln(lnx))的算法),对一个给定返回的正整数从大到小排序,而后从中筛选掉全部的非素数,那么剩下的数中最小的就是素数,再去掉该数的倍数,以此类推。

假设一个范围为1~30的正整数集,已经从大到小排序。

第一遍筛掉非素数1,而后剩余数中最小的是2。

因为2是一个素数,将其取出,而后去掉全部2的倍数,那么剩下的数为:

3 5 7 9 11 13 15 17 19 21 23 25 27 29

剩下的数中3最小,且为素数,取出并去除全部3的倍数,循环直至全部数都筛完。

代码以下:

// 通常写法
package main

import (
	"fmt"
)

func generate(ch chan int) {
	for i := 2; i < 100; i++ {
		ch <- i
	}
}

func filter(in, out chan int, prime int) {
	for {
		i := <-in
		if i%prime != 0 {
			out <- i
		}
	}
}

func main() {
	ch := make(chan int)
	go generate(ch)
	for {
		prime := <-ch
		fmt.Print(prime, " ")
		ch1 := make(chan int)
		go filter(ch, ch1, prime)
		ch = ch1
	}
}
复制代码
// 习惯写法
package main

import (
	"fmt"
)

func generate() chan int {
	ch := make(chan int)
	go func() {
		for i := 2; ; i++ {
			ch <- i
		}
	}()
	return ch
}

func filter(in chan int, prime int) chan int {
	out := make(chan int)
	go func() {
		for {
			if i := <-in; i%prime != 0 {
				out <- i
			}
		}
	}()
	return out
}

func sieve() chan int {
	out := make(chan int)
	go func() {
		ch := generate()
		for {
			prime := <-ch
			ch = filter(ch, prime)
			out <- prime
		}
	}()
	return out
}

func main() {
	primes := sieve()
	for {
		fmt.Println(<-primes)
	}
}
复制代码
相关文章
相关标签/搜索