看着身边优秀的小伙伴们早就开始写博客,本身深感落后,还好迟作总比不作好,勉励本身见贤思齐。趁着年前最后一个周末,阳光正好,写下第一篇博客,为2019年开个头,以期完成今年为本身立下的flags。git
从PHPer转Gopher,很大一个缘由就是业务对性能和并发的持续需求,另外一个主要缘由就是Go语言原生的并发特性,能够在提供同等高可用的能力下,使用更少的机器资源,节约可观的成本。所以本文就结合本身在学习Go并发的实战demo中,把遇到的一些坑点写下来,共享进步。github
a) Channel
- 分为无缓冲、有缓冲通道;docker
b) WaitGroup
- sync包提供的goroutine间的同步机制;并发
c) Context
- 在调用链不一样goroutine间传递和共享数据;函数
本文demo中主要用到了前两种,基本使用请查看官方文档。性能
需求:实现一个EDM的高效邮件发送:须要支持多个国家(能够当作是多个任务),须要记录每条任务发送的状态(当前成功、失败条数),须要支持可暂停(stop)、从新发送(run)操做。学习
分析:从需求能够看出,在邮件发送中能够经过并发实现多个国家(多个任务)并发、单个任务分批次并发实现快速、高效EDM需求。ui
3.1 main.gospa
package main import ( "bufio" "fmt" "io" "log" "os" "strconv" "sync" "time" ) var ( batchLength = 20 wg sync.WaitGroup finish = make(chan bool) ) func main() { startTime := time.Now().UnixNano() for i := 1; i <= 3; i++ { filename := "./task/edm" + strconv.Itoa(i) + ".txt" start := 60 go RunTask(filename, start, batchLength) } // main 阻塞等待goroutine执行完成 fmt.Println(<-finish) fmt.Println("finished all tasks.") endTime := time.Now().UnixNano() fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) } // 单任务 func RunTask(filename string, start, length int) (retErr error) { for { readLine, err := ReadLines(filename, start, length) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } fmt.Println("current line:", readLine) start += length // 等待一批完成才进入下一批 //wg.Wait() } wg.Wait() finish <- true return retErr }
注意上面wg.Wait()
的位置(下面有讨论),在finish channel
以前,目的是为了等待子goroutine
运行完,再经过一个无缓冲通道finish
通知main goroutine
,而后main
运行结束。指针
func ReadLines()读取指定行数据:
// 读取指定行数据 func ReadLines(filename string, start, length int) (line int, retErr error) { fmt.Println("current file:", filename) fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳过开始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go并发执行 go SendEmail(line) if startLine == endLine { break } } startLine++ } return startLine, retErr } // 模拟邮件发送 func SendEmail(email string) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil }
运行上面main.go
,3个任务在1s内并发完成全部邮件(./task/edm1.txt
中一行表示一个邮箱)发送。
true finished all tasks. Total cost(ms): 1001
那么问题来了:没有实现分批每次并发batchLength = 20
,由于若是不分批发送,只要其中某个任务或某一封邮件出错了,那下次从新run的时候,会不知道哪些用户已经发送过了,出现重复发送。而分批发送即便中途出错了,下一次从新run可从上次出错的end行开始,最可能是[start - end]
一个batchLength
发送失败,能够接受。
因而,将倒数第5行wg.Wait()
注释掉,倒数第8行注释打开,以下:
// 单任务 func RunTask(filename string, start, length int) (retErr error) { for { readLine, err := ReadLines(filename, start, length) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } fmt.Println("current line:", readLine) start += length // 等待一批完成才进入下一批 wg.Wait() } //wg.Wait() finish <- true return retErr }
运行就报错:
panic: sync: WaitGroup is reused before previous Wait has returned
提示WaitGroup
在goroutine
之间重用了,虽然是全局变量,看起来是使用不当。怎么调整呢?
3.2 main.go
package main import ( "bufio" "fmt" "io" "log" "os" "strconv" "sync" "time" ) var ( batchLength = 10 outerWg sync.WaitGroup ) func main() { startTime := time.Now().UnixNano() for i := 1; i <= 3; i++ { filename := "./task/edm" + strconv.Itoa(i) + ".txt" start := 60 outerWg.Add(1) go RunTask(filename, start, batchLength) } // main 阻塞等待goroutine执行完成 outerWg.Wait() fmt.Println("finished all tasks.") endTime := time.Now().UnixNano() fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) } // 单任务 func RunTask(filename string, start, length int) (retErr error) { for { isFinish := make(chan bool) readLine, err := ReadLines(filename, start, length, isFinish) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { fmt.Println(err) retErr = err break } // 等待一批完成才进入下一批 fmt.Println("current line:", readLine) start += length <-isFinish // 关闭channel,释放资源 close(isFinish) } outerWg.Done() return retErr }
从上面能够看出:调整的思路是外层用WaitGroup
控制,里层用channel
控制,执行又报错 : (
fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0x55fe7c) /usr/local/go/src/runtime/sema.go:56 +0x39 sync.(*WaitGroup).Wait(0x55fe70) /usr/local/go/src/sync/waitgroup.go:131 +0x72 main.main() /home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab goroutine 5 [chan send]: main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0)
仔细检查,发现上面代码中定义的isFinish
是一个无缓冲channel
,在发邮件SendMail()
子协程没有完成时,读取一个无数据的无缓冲通道将阻塞当前goroutine
,其余goroutine
也是同样的都被阻塞,这样就出现了all goroutines are asleep - deadlock!
因而将上面代码改成有缓冲继续尝试:
isFinish := make(chan bool, 1) // 读取指定行数据 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) { fmt.Println("current file:", filename) // 控制每一批发完再下一批 var wg sync.WaitGroup fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳过开始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go并发执行 go SendEmail(line, wg) if startLine == endLine { isFinish <- true break } } startLine++ } wg.Wait() return startLine, retErr } // 模拟邮件发送 func SendEmail(email string, wg sync.WaitGroup) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil }
运行,又报错了 : (
fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0x55fe7c) /usr/local/go/src/runtime/sema.go:56 +0x39 sync.(*WaitGroup).Wait(0x55fe70)
此次提示有点不同,看起来是里层的WaitGroup
致使了死锁,继续检查发现里层wg
是值传递,应该使用指针传引用。
// go并发执行 go SendEmail(line, wg)
最后修改代码以下:
// 读取指定行数据 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) { fmt.Println("current file:", filename) // 控制每一批发完再下一批 var wg sync.WaitGroup fileObj, err := os.Open(filename) if err != nil { panic(err) } defer fileObj.Close() // 跳过开始行以前的行-ReadString方式 startLine := 1 endLine := start + length reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString(byte('\n')) if err == io.EOF { fmt.Println("Read EOF:", filename) retErr = err break } if err != nil { log.Fatal(err) retErr = err break } if startLine > start && startLine <= endLine { wg.Add(1) // go并发执行 go SendEmail(line, &wg) if startLine == endLine { isFinish <- true break } } startLine++ } wg.Wait() return startLine, retErr } // 模拟邮件发送 func SendEmail(email string, wg *sync.WaitGroup) error { defer wg.Done() time.Sleep(time.Second * 1) fmt.Println(email) return nil }
赶忙运行一下,此次终于成功啦 : )
current line: 100 current file: ./task/edm2.txt Read EOF: ./task/edm2.txt Read EOF: ./task/edm2.txt finished all tasks. Total cost(ms): 4003
每一个任务模拟的是100行,从第60行开始运行,四个任务并发执行,每一个任务分批内再次并发,而且控制了每一批次完成后再进行下一批,因此总运行时间约4s,符合指望值。完整源码请阅读原文或移步GitHub:https://github.com/astraw99/edm
本文经过两层嵌套Go 并发,模拟实现了高性能并发EDM,具体的一些出错行控制、任务中断与再次执行将在下次继续讨论,主要逻辑已跑通,几个坑点小结以下:
a) WaitGroup 通常用于main 主协程等待所有子协程退出后,再优雅退出主协程;嵌套使用时注意wg.Wait()放的位置;
b) 合理使用channel,无缓冲chan将阻塞当前goroutine,有缓冲chan在cap未满的状况下不会阻塞当前goroutine,使用完记得释放chan资源;
c) 注意函数间传值或传引用(本质上仍是传值,传的指针的指针内存值)的合理使用;
后记:第一篇博客写到这里差很少算完成了,一不当心一个下午就过去了,写的逻辑、可读性可能不太好请见谅,欢迎留言批评指正。感谢您的阅读。