最近在看《Programming in Go》, 其中关于并发编程写得很不错, 受益非浅, 其中有一些例子是须要多思考才能想明白的, 因此我打算记录下来, 强化一下思路html
《Programming in Go》在 Chapter 7. Concurrent Programming
里面一共用3个例子来说述并发编程的3个模式, 第一个是 filter
, 筛选出后缀名和文件大小文件列表, 还算简单就不说, 而后第二个是升级版, 正则版 filter
, 不一样的是他是根据正则搜索出文件的文本而且列出来. 这个例子我起初看是有点蒙的, 这样写是没错, 可是为何要这样写, 他的设计思路是什么, 和其余方法相比他有什么优点, 这些都不清楚, 因而决定好好分析一下. 实际上这个例子实现的功能并不复杂, 因此个人文章其实是在讨论怎么产生出和做者类似的思路.golang
若是不考虑用 goroutine 的话, 思路其实很简单:编程
1. 列出文件列表, 编译正则. 2. 遍历文件, 打开并遍历每行, 若是正则能匹配, 记录下来. 3. 列出来.
若是用 goroutine , 就会有如下思路:数组
1. 在获得文件路径数组以后, 分发任务给N个核. 2. 每一个核负责打开文件, 将符合条件的那行文本写入到 `channel` 3. 主线程等待并接收`channel`的结果. 显示出来, 完毕
** 而后下文才是重点 **缓存
在go中, channel 是不会自动关闭的, 因此须要在咱们使用完以后手动去关闭, 并且若是使用for语法来遍历channel每次获得的数据, 若是channel没有关闭的话会陷入死循环. 在 goroutine 中会形成 deadlock安全
for job := range jobs { fmt.Println(job) }
若是没close, 会触发dead lock. 由于for...range...会自动阻塞直到读取到数据或者channel关闭, 没close的话就会致使整个channel处于睡眠状态. channel关闭后, 就不容许写入(缓冲的数据还在, 还能够读取), 因此, channel 关闭的时机很重要.数据结构
我所知道任务分发方法有两种:并发
第一种是固定分配, 若是说我想计算1+2+3+...+100, 而后分红4份, 也就是
1+2+..+25
,...
,...
,86+87+...+100
, 而后再将结果累加起来.测试还有一种是抢占式的, 这里须要使用一个队列, 将全部任务写入队列, 而后开N个goroutine, 每一个goroutine从队列读取任务(要确保线程安全), 处理, 完成后再继续读取任务. 再也不是固定分配, 本身那份作完了就休息了, 因此看来第二种要好一点.spa
采用第二种方式的话, 对应go的作法, 那就是使用一个channel, 命名为 jobs
, 将全部的任务写入进去, 写入完毕以后关闭这个 channel, 固然, 由于是N核, 系统能同时处理的任务咱们设置为N个(也就是咱们使用了N个goroutine), 那么声明 jobs
是缓冲区长度为N的 channel.
Buffered channel
和普通的 channel 的差异是他能够同时容纳多个单位数据, 当缓存的数据单位数量等于 channel 容量的时候, 再执行写入将会阻塞, 不然都是及时处理的.
当咱们将数据处理后, 就须要将结果收集起来. 须要注意的是, 这些操做不是在主 goroutine 执行, 因此咱们须要经过 channel 传递给主 goroutine . 因此只须要在外部声明一个名为 results
的 channel . 而后在主 goroutine 经过 for
来显示, 这时候就会发现一个问题, 这个 results
关闭的时机问题. 正确的关闭时机是写入全部的 Result
以后. 可是别忘了咱们同时开了多个 goroutine , 因此 results
应该在 执行任务的 goroutine 完成信号累计到N个
这个时机关闭. 因此咱们再引入一个名叫 done
的 channel 来解决. 每一个 goroutine 发送完 result 后会写入一次done, 而后咱们就能够遍历 done , 遍历以后说明所有完成了, 再执行显示.
Result 的数据结构
type Result struct { filename string lino int line string }
书中的 cgrep1
就是这样的
func awaitCompletion(done <-chan struct{}, results chan Result) { for i := 0; i < workers; i++ { <-done } close(results) }
可是这样有可能形成死锁, 由于书中 results
缓冲区长度限定为最大1000个, 也就是超过1000个 result 的时候再打算写入 result 会等待取出 result 后才执行, done 也不会写入, 而 awaitCompletion
是等到全部 goroutine 都完成了才会取出 results
, 并且当 result
很是大的时候由于内存的缘故也是不可能一次性取出的. 因此就须要在读取 results
的同时读取 done
, 当读取 done
次数大于 N 后关闭 results
, 因此, 由于要在多个 channel 中同时读取, 因此须要使用 select
.
下面是书中的 cgrep3
, 改进版:
func waitAndProcessResults(timeout int64, done <-chan struct{}, results <-chan Result) { finish := time.After(time.Duration(timeout)) for working := workers; working > 0; { select { // Blocking case result := <-results: fmt.Printf("%s:%d:%s\n", result.filename, result.lino, result.line) case <-finish: fmt.Println("timed out") return // Time's up so finish with what results there were case <-done: working-- } } for { select { // Nonblocking case result := <-results: fmt.Printf("%s:%d:%s\n", result.filename, result.lino, result.line) case <-finish: fmt.Println("timed out") return // Time's up so finish with what results there were default: return } } }
看到这里, 我就有个疑问, 为何在所有完成以后(done都接收到N个了), 还要再遍历出 results
, 直到读取不到才算读取完成呢(我反应一贯比较慢^_^)? 因而我作了个实验, 去掉了后面再次循环的部分, 发现有时会遗漏掉数据(我用4个测试文件...), 证实这段代码是有用的!!!
个人想法是, 他是在处理完 result, 而后写入 results
, 写完了才发送 done
, 也就是在收到全部的 done 以后, 全部的数据应该是已经处理完成的. 为了验证这个想法, 我写了一下代码:
for working := workers; working > 0; { select { // Blocking case result := <-results: // received result case <-done: working-- if working <= 0 { println(len(results)) } } }
而后看到输出的数是大于0的, 也就是说在接收到所有 done 以后, results
还有数据在缓冲区中, 而后在看看发送result
的代码, 忽然就明白了
func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) { for job := range jobs { job.Do(lineRx) } done <- struct{}{} }
我把写入和读取想固然认为一块儿发生了, 由于有缓冲区的缘故, doJobs在发送进 results
的缓冲区以后就马上发送 done
了, 可是写入的数据有没有被处理, 是不知道的, 因此在接收到全部 done
以后, results
缓冲区还有数据, 须要再循环一遍.
附个人代码一份:
package main import ( "bufio" "fmt" "log" "os" "regexp" "runtime" ) type Job struct { filename string results chan<- Result } type Result struct { filename string line string lino int } var worker = runtime.NumCPU() func main() { // config cpu number runtime.GOMAXPROCS(worker) files := os.Args[2:] regex, err := regexp.Compile(os.Args[1]) if err != nil { log.Fatal(err) return } // 任务列表, 并发数目为CPU个数 jobs := make(chan Job, worker) // 结果 results := make(chan Result, minimum(1000, len(files))) defer close(results) // 标记完成 dones := make(chan int, worker) defer close(dones) go addJob(files, jobs, results) for i := 0; i < worker; i++ { go doJob(jobs, regex, dones) } awaitForCloseResult(dones, results) } func addJob(files []string, jobs chan<- Job, results chan<- Result) { for _, filename := range files { jobs <- Job{filename, results} } close(jobs) } func doJob(jobs <-chan Job, regex *regexp.Regexp, dones chan int) { for job := range jobs { job.Do(regex) } dones <- 1 } func awaitForCloseResult(dones <-chan int, results chan Result) { working := 0 MyForLable: for { select { case result := <-results: println(result) case <-dones: working++ if working >= worker { if rlen := len(results); rlen > 0 { println("----------------------------------") println("left:", rlen) println("----------------------------------") for i := 1; i <= rlen; i++ { println(<-results) } } break MyForLable } } } } func (j *Job) Do(re *regexp.Regexp) { f, err := os.Open(j.filename) if err != nil { println(err) return } defer f.Close() b := bufio.NewReader(f) lino := 0 for { line, _, err := b.ReadLine() if re.Match(line) { j.results <- Result{j.filename, string(line), lino} } if err != nil { break } lino += 1 } } func minimum(a, b int) int { if a > b { return b } return a } func println(o ...interface{}) { fmt.Println(o...) }
转自:http://chenye.org/goroutine-note.html