Golang的channel使用以及并发同步技巧

在学习《The Go Programming Language》第八章并发单元的时候仍是遭遇了很多问题,和值得总结思考和记录的地方。git

作一个相似于unix du命令的工具。可是阉割了一些功能,这里应该只实现-c(统计total大小) 和-h(以human比较容易辨识的显示出来)的功能。github

 

首先咱们须要构造一个 可以返回FileInfo信息数组的函数,咱们把它取名为dirEntries:数组

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

传入一个路径字符串,而后使用ioutil.ReadDir解析这个路径下面的全部文件以及文件夹生成一个FileInfo的profile。并发

Fileinfo interface下面包含了:函数

type FileInfo interface {
    Name() string       // base name of the file
    Size() int64        // length in bytes for regular files; system-dependent for others
    Mode() FileMode     // file mode bits
    ModTime() time.Time // modification time
    IsDir() bool        // abbreviation for Mode().IsDir()
    Sys() interface{}   // underlying data source (can return nil)
}

多种方法,能够直接调用,其做用就是后面注释写的同样。工具

 

有了可以获取文件夹下面文件和文件夹的函数以后,咱们须要一个调用方用来walk指定的目录:oop

// 入参是一个文件目录,一个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

这里咱们定义一个目录,而后需求传入一个单向接收channel用于在多goroutine中计算总共的文件大小。学习

使用range方法来遍历咱们上面写的dirEntries的返回文件或文件夹,若是是文件夹则继续迭代。优化

若是不是则将文件大小存入放入fileSizes channel中。spa

 

搞定上面两个函数,咱们来写主函数部分:

func main() {
    root := ""
    flag.StringVar(&root, "-p", ".", "input dir.")
    flag.Parse()

    fileSizes := make(chan int64)
    // 起一个goroutine去walk目录
    go func() {
        walkDir(root, fileSizes)
        // Walk完毕以后要关闭该channel下面使用range读取数据的时候才会有尽头
        close(fileSizes)
    }()

    var nfiles, nbytes int64
    for size := range fileSizes {
        nfiles++
        nbytes += size
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

这里注意一点,由于起goroutine的walk函数,和下面同时在range遍历是在同步进行,若是下面range速度太快读到管道里面没有值了会阻塞住等待有数据继续进来以后读取,而不是会跳出。只有当close(fileSizes)这句执行到,显示关闭掉channel以后,才会跳出range循环而且这时已经读取完了全部的数据。这里有点像,close channel的时候给range发送了一个中止信号同样,感受这个利用起来会比较有用? 后续可能会再研究一下。

 

让咱们继续来优化咱们的程序,添加一个-v参数,打印出扫描文件的进度,当咱们要扫描整个盘的时候,可能会花费大量的时间,咱们须要知道进度如何了。

其实这个需求只须要很小的改动,让咱们来从新改写一下main函数,用select多路复用来完成这个事情。

func main() {
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    fileSizes := make(chan int64)
    // 起一个goroutine去walk目录
    go func() {
        walkDir(root, fileSizes)
        // Walk完毕以后要关闭该channel下面使用range读取数据的时候才会有尽头
        close(fileSizes)
    }()

loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

上面其实都差很少,这里我直接从loop那里开始说吧,遇到这个loop的时候我其实还蛮疑惑的,由于我在go语言保留关键字里面并无看到他的身影,可是这里他的确是个关键字,和里面的break连用 里面break后面跟上的loop 能够直接跳出到最外层loop包裹的循环,而不是break默认的只跳出一层循环。明白了这个道理以后,这个就不难理解了,当咱们还在遍历文件的时候,select 会持续读取文件大小赋值给size,而且返回true给ok。若是咱们开启了verbose,每隔500毫秒tick会收到来自time.Tick的消息。咱们都知道select会在都准备好的状况下随机pick一个执行,因此这里也或快或慢的被打印进度(前提是同时收到信号,可是实际上这个发生速度可能在nm级别,凭感觉很难感受到谁先)。当最后都执行完毕后filesSizes channel会被上面的携程函数close(),当close以后,在读取完剩余数据后,fileSizes会返回给ok nil。就能够跳出循环。

 

看到这里可能会以为有点绕,因此要尽量的多理解一下,固然咱们可让这个du程序更快。能够注意到咱们并无在walkdir里面开启goroutines进行并发处理。下面我将尝试开启goroutine处理它们,而且用channel给他们加个锁控制一下goroutine的数量,在此以前咱们先来看看如今完成了的代码:

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
)

// 入参是一个文件目录,一个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    t1 := time.Now()
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    fileSizes := make(chan int64)
    // 起一个goroutine去walk目录
    go func() {
        walkDir(root, fileSizes)
        // Walk完毕以后要关闭该channel下面使用range读取数据的时候才会有尽头
        close(fileSizes)
    }()

loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

观察上面代码能够看出咱们并不能直接在这个代码的基础上直接给walkDir加上goroutine,这样会致使channel直接被关闭,而后啥也没跑就结束了。

咱们须要让主goroutine等待其余goroutine都完成以后再结束,因此主goroutine须要在这里阻塞住,等到获得能够结束的信号以后再结束。

 

咱们可使用sync.WaitGroup 来对仍旧活跃的walkDir调用进行计数。等到数量为0的时候就算咱们能够结束了。

sync.WaitGroup提供了三个方法:

  Add:添加或减小goroutine的数量。

  Done:至关于Add(-1)。

  Wait:阻塞住等待WaitGroup数量变成0.

 

明白这个道理以后咱们改写了一下代码,让它使用sync.WaitGroup来支持同步,最后当全部goroutine都结束以后,关闭channel完成任务。

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
    "sync"
)

// 入参是一个文件目录,一个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
    defer n.Done()
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, fileSizes, n)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    t1 := time.Now()
    root := ""
    verbose := false
    tick := make(<-chan time.Time)
    fileSizes := make(chan int64)

    var n sync.WaitGroup
    var nfiles, nbytes int64

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    n.Add(1)
    go walkDir(root, fileSizes, &n)

    go func() {
        n.Wait()
        close(fileSizes)
    }()


loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

随便跑跑。。感受快得飞起,然而跑不了几秒就会报错,这个程序最大的问题就是咱们彻底没有办法以后它会本身打开多少个goroutine,感受会爆炸。因此咱们要限制这种夸张的写法,使用channel来作一个并发协程池,把同时开启的goroutine的数量控制一下。

 

最后上一下完整代码,注意defer关键字,只接收函数,因此我会在释放锁的时候使用匿名函数:

package main

import (
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "flag"
    "time"
    "sync"
)

var token = make(chan int, 100)


// 入参是一个文件目录,一个INT64的只接收的单向channel
func walkDir(dir string, fileSizes chan<- int64, n *sync.WaitGroup) {
    defer n.Done()
    for _, entry := range dirEntries(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, fileSizes, n)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

func dirEntries(dir string) []os.FileInfo {
    token <- 1
    defer func() {<-token}()
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du: %v\n", err)
        return nil
    }
    return entries
}

func main() {
    var nfiles, nbytes int64
    var n sync.WaitGroup

    root := ""
    verbose := false
    t1 := time.Now()
    fileSizes := make(chan int64)
    tick := make(<-chan time.Time)

    flag.StringVar(&root, "p", ".", "input dir.")
    flag.BoolVar(&verbose, "v", false, "add verbose if you want")
    flag.Parse()

    if verbose {
        tick = time.Tick(500 * time.Millisecond)
    }

    n.Add(1)
    go walkDir(root, fileSizes, &n)

    go func() {
        n.Wait()
        close(fileSizes)
    }()


loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop
            }
            nfiles++
            nbytes += size
        case <-tick:
            fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
        }
    }
    fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
    fmt.Println(time.Since(t1))
}

 

 

Reference:

https://github.com/gopl-zh/gopl-zh.github.com  The Go Programming Language

相关文章
相关标签/搜索