goroutine 并发之搜索文件内容

golang并发编程 - 例子解析

February 26, 2013

最近在看《Programming in Go》, 其中关于并发编程写得很不错, 受益非浅, 其中有一些例子是须要多思考才能想明白的, 因此我打算记录下来, 强化一下思路html

《Programming in Go》在 Chapter 7. Concurrent Programming 里面一共用3个例子来说述并发编程的3个模式, 第一个是 filter , 筛选出后缀名和文件大小文件列表, 还算简单就不说, 而后第二个是升级版, 正则版 filter , 不一样的是他是根据正则搜索出文件的文本而且列出来. 这个例子我起初看是有点蒙的, 这样写是没错, 可是为何要这样写, 他的设计思路是什么, 和其余方法相比他有什么优点, 这些都不清楚, 因而决定好好分析一下. 实际上这个例子实现的功能并不复杂, 因此个人文章其实是在讨论怎么产生出和做者类似的思路.golang

若是不考虑用 goroutine 的话, 思路其实很简单:编程

1. 列出文件列表, 编译正则.
2. 遍历文件, 打开并遍历每行, 若是正则能匹配, 记录下来.
3. 列出来.

若是用 goroutine , 就会有如下思路:数组

1. 在获得文件路径数组以后, 分发任务给N个核.
2. 每一个核负责打开文件,  将符合条件的那行文本写入到 `channel`
3. 主线程等待并接收`channel`的结果. 显示出来, 完毕

** 而后下文才是重点 **缓存

1. channel关闭的时机

在go中, channel 是不会自动关闭的, 因此须要在咱们使用完以后手动去关闭, 并且若是使用for语法来遍历channel每次获得的数据, 若是channel没有关闭的话会陷入死循环. 在 goroutine 中会形成 deadlock安全

for job := range jobs { fmt.Println(job) } 

若是没close, 会触发dead lock. 由于for...range...会自动阻塞直到读取到数据或者channel关闭, 没close的话就会致使整个channel处于睡眠状态. channel关闭后, 就不容许写入(缓冲的数据还在, 还能够读取), 因此, channel 关闭的时机很重要.数据结构

2. 分发任务

我所知道任务分发方法有两种:并发

第一种是固定分配, 若是说我想计算1+2+3+...+100, 而后分红4份, 也就是 1+2+..+25......86+87+...+100, 而后再将结果累加起来.测试

还有一种是抢占式的, 这里须要使用一个队列, 将全部任务写入队列, 而后开N个goroutine, 每一个goroutine从队列读取任务(要确保线程安全), 处理, 完成后再继续读取任务. 再也不是固定分配, 本身那份作完了就休息了, 因此看来第二种要好一点.spa

采用第二种方式的话, 对应go的作法, 那就是使用一个channel, 命名为 jobs, 将全部的任务写入进去, 写入完毕以后关闭这个 channel, 固然, 由于是N核, 系统能同时处理的任务咱们设置为N个(也就是咱们使用了N个goroutine), 那么声明 jobs 是缓冲区长度为N的 channel.

Buffered channel 和普通的 channel 的差异是他能够同时容纳多个单位数据, 当缓存的数据单位数量等于 channel 容量的时候, 再执行写入将会阻塞, 不然都是及时处理的.

3. 结果集

当咱们将数据处理后, 就须要将结果收集起来. 须要注意的是, 这些操做不是在主 goroutine 执行, 因此咱们须要经过 channel 传递给主 goroutine . 因此只须要在外部声明一个名为 results 的 channel . 而后在主 goroutine 经过 for 来显示, 这时候就会发现一个问题, 这个 results 关闭的时机问题. 正确的关闭时机是写入全部的 Result 以后. 可是别忘了咱们同时开了多个 goroutine , 因此 results 应该在 执行任务的 goroutine 完成信号累计到N个 这个时机关闭. 因此咱们再引入一个名叫 done 的 channel 来解决. 每一个 goroutine 发送完 result 后会写入一次done, 而后咱们就能够遍历 done , 遍历以后说明所有完成了, 再执行显示.

Result 的数据结构

type Result struct {
    filename string
    lino int
    line string
}

 

书中的 cgrep1 就是这样的

func awaitCompletion(done <-chan struct{}, results chan Result) {
    for i := 0; i < workers; i++ {
        <-done
    }
    close(results)
}

 

可是这样有可能形成死锁, 由于书中 results 缓冲区长度限定为最大1000个, 也就是超过1000个 result 的时候再打算写入 result 会等待取出 result 后才执行, done 也不会写入, 而 awaitCompletion 是等到全部 goroutine 都完成了才会取出 results, 并且当 result 很是大的时候由于内存的缘故也是不可能一次性取出的. 因此就须要在读取 results 的同时读取 done, 当读取 done 次数大于 N 后关闭 results, 因此, 由于要在多个 channel 中同时读取, 因此须要使用 select.

下面是书中的 cgrep3 , 改进版:

func waitAndProcessResults(timeout int64, done <-chan struct{}, results <-chan Result) {
    finish := time.After(time.Duration(timeout))
    for working := workers; working > 0; {
        select { // Blocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            case <-done: 
                working--
        } 
    }
    for {
        select { // Nonblocking
            case result := <-results:
                fmt.Printf("%s:%d:%s\n", result.filename, result.lino,
                result.line)
            case <-finish:
                fmt.Println("timed out")
                return // Time's up so finish with what results there were
            default: 
                return
        } 
    }
}

 

看到这里, 我就有个疑问, 为何在所有完成以后(done都接收到N个了), 还要再遍历出 results, 直到读取不到才算读取完成呢(我反应一贯比较慢^_^)? 因而我作了个实验, 去掉了后面再次循环的部分, 发现有时会遗漏掉数据(我用4个测试文件...), 证实这段代码是有用的!!!

个人想法是, 他是在处理完 result, 而后写入 results, 写完了才发送 done, 也就是在收到全部的 done 以后, 全部的数据应该是已经处理完成的. 为了验证这个想法, 我写了一下代码:

for working := workers; working > 0; {
    select { // Blocking
        case result := <-results:
            // received result
        case <-done: 
            working--
            if working <= 0 {
                println(len(results))
            }
    } 
}

 

而后看到输出的数是大于0的, 也就是说在接收到所有 done 以后, results 还有数据在缓冲区中, 而后在看看发送result 的代码, 忽然就明白了

func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
    for job := range jobs {
        job.Do(lineRx)
    }
    done <- struct{}{}
}

 

我把写入和读取想固然认为一块儿发生了, 由于有缓冲区的缘故, doJobs在发送进 results 的缓冲区以后就马上发送 done 了, 可是写入的数据有没有被处理, 是不知道的, 因此在接收到全部 done 以后, results 缓冲区还有数据, 须要再循环一遍.


附个人代码一份:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "regexp"
    "runtime"
)

type Job struct {
    filename string
    results  chan<- Result
}

type Result struct {
    filename string
    line     string
    lino     int
}

var worker = runtime.NumCPU()

func main() {
    // config cpu number
    runtime.GOMAXPROCS(worker)
    files := os.Args[2:]
    regex, err := regexp.Compile(os.Args[1])
    if err != nil {
        log.Fatal(err)
        return
    }

    // 任务列表, 并发数目为CPU个数
    jobs := make(chan Job, worker)
    // 结果
    results := make(chan Result, minimum(1000, len(files)))
    defer close(results)
    // 标记完成
    dones := make(chan int, worker)
    defer close(dones)

    go addJob(files, jobs, results)
    for i := 0; i < worker; i++ {
        go doJob(jobs, regex, dones)
    }
    awaitForCloseResult(dones, results)
}

func addJob(files []string, jobs chan<- Job, results chan<- Result) {
    for _, filename := range files {
        jobs <- Job{filename, results}
    }
    close(jobs)
}

func doJob(jobs <-chan Job, regex *regexp.Regexp, dones chan int) {
    for job := range jobs {
        job.Do(regex)
    }

    dones <- 1
}

func awaitForCloseResult(dones <-chan int, results chan Result) {
    working := 0
MyForLable:
    for {
        select {
        case result := <-results:
            println(result)
        case <-dones:
            working++
            if working >= worker {
                if rlen := len(results); rlen > 0 {
                    println("----------------------------------")
                    println("left:", rlen)
                    println("----------------------------------")
                    for i := 1; i <= rlen; i++ {
                        println(<-results)
                    }
                }
                break MyForLable
            }
        }
    }
}

func (j *Job) Do(re *regexp.Regexp) {
    f, err := os.Open(j.filename)
    if err != nil {
        println(err)
        return
    }
    defer f.Close()

    b := bufio.NewReader(f)
    lino := 0
    for {
        line, _, err := b.ReadLine()
        if re.Match(line) {
            j.results <- Result{j.filename, string(line), lino}
        }

        if err != nil {
            break
        }
        lino += 1
    }
}

func minimum(a, b int) int {
    if a > b {
        return b
    }
    return a
}

func println(o ...interface{}) {
    fmt.Println(o...)
}

转自:http://chenye.org/goroutine-note.html

相关文章
相关标签/搜索