《Go 语言并发之道》读后感 - 第四章

《Go 语言并发之道》读后感-第四章

约束

约束能够减轻开发者的认知负担以便写出有更小临界区的并发代码。确保某一信息再并发过程当中仅能被其中之一的进程进行访问。程序中一般存在两种可能的约束:特定约束和词法约束。程序员

特定约束

经过公约实现约束,不管是由语言社区、你所在的团队,仍是你的代码库设置。在 Go 语言官方默认安装 gofmt 去格式化你的代码,争取让你们都写同样的代码golang

词法约束

设计使用词法做用域仅公开用于多个并发进程的正确数据和并发原语,这使得作错事是不可能的,例如:Go 中 goroutine 和 channel ,而不是使用 Thread 包(不管是官方,第三方)。在 Go 的世界里操做系统线程不用程序员管理,须要并发 go 就能够了。缓存

for-select 循环

在 Go 语言中你常常看到 for-select 循环。它的结构相似这样的安全

for{	// 无限循环或者用 range 语句循环
    select {
        // 使用 channel 的任务
    }
}

向 channel 发送数据

for _,v := range []string{"jisdf","jisdf","ier"}{
    select {
    case <- done:
        return
    case stringChan <- v:
        // 作些什么
    }
}

循环等待中止

// 第一种保持 select 语句尽量短: 
// 若是完成的 channel 未关闭,咱们将退出 select 语句并继续执行 for 循环
for {
    select {
    case <- done:
        return
    default:    
    }
    // 非抢占业务
}

// 第二种将工做嵌入 select 的 default 中
// 若是完成的 channel 还没有关闭,则执行 default 内容的任务
for {
    select {
    case <- done:
        return
    default:
        // 非抢占业务
    }
}

防止 goroutine 泄露

线程安全,是每个程序员常常讨论的话题。 在 Go 中对应的是 goroutine 协程,虽然 goroutine 开销很是小,很是廉价,可是过多的 goroutine 未获得释放或终止,也是会消耗资源的。goroutine 有如下几种方式被终止:多线程

  • 当它完成了它的工做。
  • 由于不可恢复的错误,它不能继续工做。
  • 当它被告知须要终止工做。

前两种方式很是简单明了,而且隐含在你的程序中。那么咱们如何来取消工做?Go 程序在运行时默认会有一个主 goroutine (main goroutine),他会将一些没有工做的 goroutine 设置为自旋,这会致使内存利用率的降低。思考下,既然 main goroutine 可以将其余 goroutine 设置自旋,那么它能不能通知其余 goroutine 中止或退出呢?Of sure ,首先咱们须要一个 channel 辅助 main goroutine,它能够包含多种指令,例如超时、异常、特定条件等 。它一般被命名为 done,而且只读。举个例子:并发

doWork := func(done <- chan int ,s <-chan string) <-chan s{
    terminated := make(chan int)
    go func () {
        // 当前函数 return 后打印一条信息用于验证,for {} 死循环是否被终止
        defer fmt.Println("doWork exited")
        defer close(termainted)
        for {
            select {
            case l := <- s:
                fmt.Println(l)
            case <- done: // 因为 select 会相对均匀的挑选 case ,当 done 被读取,则 return 跳出整个并发
                return
            }
        }
    }()
    return terminated
}

// 建立控制并发的 channel done
done := make(chan int)
terminated := doWork(done, "a")

// 启动一个 goroutine 在 1s 后关闭 done channel
go func() {
    time.Sleep(1 * time.Second)
    fmt.Println("取消工做的 goroutine")
    close(done)
}()

// main goroutine 中读出 termainated 中的数据,验证咱们是否成功通知工做的 goroutine 终止工做
<- terminated 
fmt.Println("Done")

当一个 goroutine 阻塞了向channel 进行写入的请求,咱们能够这样作:框架

newRandstream := func(done <-chan interface{}) <- chan int{
    randStream := make(chan int)
    go func(){
        defer fmt.Println("newRanstream 关闭了")
        defer close(randStream)
        for{
            select {
            case randStream <- rand.int():
            case <-done:
                return 
            }
        }
    }()
    return
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("遍历三次")
for i := 1; i<=3;i++{
    fmt.Println("%d: %d\n",i,<-randStream)
}

close(done)
// 模拟正在进行的工做,暂停 1s
time.Sleap(1 * time.Second)

or-channel

以上部分咱们了解到单一条件下如何取消 goroutine 防止泄露。若是咱们有多种条件触发取消 goroutine ,咱们要怎么办呢?让我来了解下 or-channel,建立一个复合 done channel 来处理这种复杂状况。函数

咱们以使用更多的 goroutine 为代价,实现了简洁性。f(x)=x/2 ,其中 x 是 goroutine 的数量,但你要记住 Go 语言种的一个优势就是可以快速建立,调度和运行 goroutine ,而且该语言积极鼓励使用 goroutine 来正确建模问题。没必要担忧在这里建立的 goroutine 的数量多是一个不成熟的优化。此外,若是在编译时你不知道你正在使用多少个 done channel ,则将会没有其余方式能够合并 done channel。post

错误处理

说到错误处理,也许不少程序程序员以为 Go 语言错误处理简直太糟糕了。漫天的 if err != nil{} ,try catch 捕捉并打印错误多么好。我要说首先咱们须要注意 Go 的并发模式,与其余语言有着很大的区别。Go 项目开发者但愿咱们将错误视为一等公民,合并入咱们定义的消息体内,channel 中的数据被读出的时候咱们进行判断,程序并发过程当中是否出现错误。这避免了多进程多线程模型下,try catch 丢失一些报错,在故障回顾的时候很是麻烦。性能

// 建议的消息体
type MyMessage struct{
    Data string
    Err error
}

让错误成为一等公民合并进你的结构体中,代码也许会更易懂

type MyMessage struct{
    N int
    Err error
}
func myfuncation(n string) MyMessage{
    var mm MyMessage
    mm.N,mm.Err = anotherFunc(n)
    return mm
}
func anotherFunc(n string) (int,error){
    i,err := strconv.Atoi(n)
    if err !=nil{
        return i,err
    }
    return i,nil
}
func main(){
    mymsg := myfuncation("Concurrency In GO")
    if mymsg.Err != nil{
        // 这里能够换成其余的 log 框架,部分 log 框架会自动识别 error 来源。例如:func (m *MyMessage) myfuncation() 这样的函数就会被抓到错误来自于哪里。
        fmt.Println(mymsg.Err)
    }
}

pipeline

我曾经在祖传代码中见到一个约 2000 行的函数。我但愿看见这篇文章的你,不要这么作。咱们已经了解了数据如何在两个或多个 goroutine 之间经过 channel 传递,那我咱们把这样的程序用多个 channel组合在一块儿,其中的每一次读出,或写入channel 都是这一环上的一个 stage(步),这就是 pipeline。Go 语言的并发模式,让咱们很方便,快捷,安全的在一个进程中实现了流式处理。咱们来看一个官方 pipeline 的例子:

package main

import (
	"fmt"
	"sync"
	"time"
)

func gen(nums ...int) <-chan int {
	genOut := make(chan int)
	go func() {
		for _, n := range nums {
			genOut <- n
		}
		fmt.Println("Input gen Channel number =>", len(genOut))
		close(genOut)
	}()
	return genOut
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
	sqOut := make(chan int)
	go func() {
		// 这个 close(sqOut) 必定要先写,执行的时候优先压入栈,待函数执行完成关闭 sqOut channel
		defer close(sqOut)
		for n := range in {
            // 利用 select {} 均衡调度 channel 
			select {
			case sqOut <- n * n:
				fmt.Printf("=> %v <= write into sqOut channel \n", n*n)
			case <-done:
				return
			}
		}
		//fmt.Printf("Wait close the chan => %v\n", len(sqOut))
	}()
	return sqOut
}

// merge Fan-In 函数合并多个结果
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	mergeOut := make(chan int, 1)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			select {
			case mergeOut <- n:
			case <-done:
				return
			}
		}
	}

	wg.Add(len(cs))

	for _, c := range cs {
		go output(c)
	}

	go func() {
		wg.Wait()
		close(mergeOut)
	}()
	return mergeOut

}

// pfnumber 计算算数平方数
func pfnumber() {
    // 定义 don channel 用于终止 pipeline
	don := make(chan struct{}, 3)
	don <- struct{}{}
	don <- struct{}{}
	close(don)
    // 传入 don 通知发送方中止发送
	for n := range sq(don, sq(don, gen(3, 4, 2))) {
		fmt.Println("Last result ", n)
	}
	fmt.Println("============================================")
}

func fanInOut() {
	don := make(chan struct{}, 3)
	in := gen(2, 3)
	c1 := sq(don, in)
	c2 := sq(don, in)

	for n := range merge(don, c1, c2) {
		fmt.Println(n)
	}

	don <- struct{}{}
	don <- struct{}{}
	don <- struct{}{}
	fmt.Println("Finish channel len => ", len(don))
	<-don
	close(don)

}

func f1(i chan int) {
	fmt.Println(<-i)
}

func runf1() {
	out := make(chan int)
	go f1(out)
	time.Sleep(2 * time.Second)
	out <- 2
	time.Sleep(2 * time.Second)
}

func main() {
	//runf1()

	pfnumber()
    // FanIn and FanOut
	//fanInOut()

}

简单总结一下如何正确构建一个 pipeline:

  • 当全部的发送已完成,stage 应该关闭输出 channel
  • stage 应该持续从只读 channel 中读出数据,除非 channel 关闭或主动通知到发送方中止发送

Golang Pipeline Blog 译文

Golang Pipeline Blog

扇出、扇入

扇出模式优先的场景:

  • 它不依赖于以前的 stage 计算的值
  • 须要运行很长时间,例如:I/O 等待,远程调用,访问 REST full API等

扇入模式优先:

扇入意味着多个数据流复用或者合并成一个流。例如:上文 pipeline 中的 merge 函数,能够经过打开 fanInOut() 函数执行一下试试。

or-done-channel

在防止 goroutine 泄露,pipeline 中咱们都在函数执行过程当中嵌入了 done channel 以便终止须要中止的 goroutine。咱们能够看出他们有个统一的特色,传入 done ,jobChannel ,返回 resultChannel 。那么咱们能够把它封装起来,像这样:

orDone := func(done ,c <-chan interface{}) <- chan interface{}{
    valStream := make(chan interface{})
    go func(){
        defer close(valStream)
        for {
            select{
            case <- done:
            case v,ok := <- c:
                if ok == false{
                    return
                }
                select{
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

tee-channel

可能须要将同一个结果发送给两个接收者,这个时候就须要用到 tee-channel 的方式。

应用场景:

  • 流量镜像
  • 操做审计
tee := func(done <- chan interface{},in <-chan interface{}
           )(_,_ <- chan interface{}) { <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})
    go func(){
        defer close(out1)
        defer close(out2)
        for val := range orDone(done, in){
            var out1,out2 = out1,out2
            for i:=0;i<2; i++{
                select{
                case <- done:
                case out1 <- val:
                    out1 = nil
                case out2 <- val:
                    out2 = nil
                }
            }
        }
    }()
    return out1,out2
}

其余的应用场景

桥接 channel

在 channel 中传递 channel 。笔者学术才浅,纸上谈兵多,动手实践少,着实想不到合适的场景,但愿读者能为我补充一下。

队列

队列多是咱们第一次看见 channel 的感觉,这玩意一个队列,很是具有队列的特性。

队列在什么样的状况下能够提高总体性能

  • 若是在一个 stage 批处理请求能够节省时间。
  • 须要缓存的场景,例如:批量日志刷盘,热数据缓存等。

context 包

在前文中常常会定义 done channel 的作法,防止 goroutine 泄露,或者主动中断须要中止的 pipeline 。难道咱们每次构建 pipeline 的时候都要建立 done channel 吗?答案是否认的,Go 团队为咱们准备了 context 包,专用于干相似的工做。

type Context interface {
    // 当该 context 工做的 work 被取消时,返回超时时间
    Deadline() (deadline time.Time, ok bool)
    // done 返回中止 pipeline 的 channel 
    Done() <chan struct{}
    // error 一等公民。
    // 若是 context 被取消,超时,返回取消,超时的缘由,以 error 形式返回。 
    Err() error
    // 返回与此 context 关联的 key
    Value(key interface{}) interface{}
}

context 包有两个主要目的:

  • 提供一个能够取消你的调用意图中分支的 API.
  • 提供用于经过呼叫传输请求范围数据的数据包

在 防止 goroutine 泄露中学到,函数中的取消有三个方面,context 包能够帮你管理它:

  • goroutine 的父 goroutine 可能想要取消它。
  • 一个 goroutine 可能想要取消它的子 goroutine。
  • goroutine 中任何阻塞操做都必须是可抢占的 ,以便它能够被取消。

Context.Value(key interface{}) ,因为使用 interface{} 做为函数参数,这里咱们须要强调一下使用注意事项,及建议:

  • 虽然能够在 context.Context 中出传递 value,可是并不建议这么作,由于咱们须要保证这个值必须是安全的,能够被多个 goroutine 访问。要知道不通的 goroutine 处理逻辑多是不一样的。
  • 值传递适合在远程 API 调用时使用,请勿在进程内使用。
  • 数据应该时不可变的。
  • 使用简单类型,例如:int,float,string 等基础类型。
  • 数据应该是数据,而不是类型与方法。
  • 数据应该用于修饰操做,而不是驱动操做

结束语

第四章能够称之为全书核心章节,它将前面的部分总结概括,并造成不少的 Go 语言并发技巧讲解,能够帮助咱们写出可维护的并发代码。熟悉了这些并发模式,咱们能够将多种模式组合,以帮助咱们编写大型系统。

笔者能力优先,才疏学浅,但愿读者可以翻阅原书,深刻理解并充分运用在工做中。

相关文章
相关标签/搜索