goroutine的退出

goroutine的退出git

有时候咱们须要通知goroutine中止它正在干的事情,好比一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的链接。web

Go语言并无提供在一个goroutine中终止另外一个goroutine的方法,因为这样会致使goroutine之间的共享变量落在未定义的状态上。并发

若是咱们想要退出两个或者任意多个goroutine怎么办呢?app

假设有一个abort channel,全部的goroutine订阅这个channel,能够向这个channel发送发送和goroutine数目同样多的事件来退出它们。若是这些goroutine中已经有一些本身退出了,那么会致使咱们的channel里的事件数比goroutine还多,这样致使咱们的发送直接被阻塞。另外一方面,若是这些goroutine又生成了其它的goroutine,咱们的channel里的数目又太少了,因此有些goroutine可能会没法接收到退出消息。通常状况下咱们是很难知道在某一个时刻具体有多少个goroutine在运行着的。另外,当一个goroutine从abort channel中接收到一个值的时候,他会消费掉这个值,这样其它的goroutine就无法看到这条信息。为了可以达到咱们退出goroutine的目的,咱们须要更靠谱的策略,来经过一个channel把消息广播出去,这样goroutine们可以看到这条事件消息,而且在事件完成以后,能够知道这件事已经发生过了。oop

回忆一下咱们关闭了一个channel而且被消费掉了全部已发送的值,操做channel以后的代码能够当即被执行,而且会产生零值。咱们能够将这个机制扩展一下,来做为咱们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。ui

简单的来讲广播机制的原理就是经过关闭channel,这样对该channel的读取操做都不会阻塞,而是会获得一个零值。经过关闭channel来广播消息事件。code

简单的用代码表示以下,token

package main

import (
	"fmt"
	"os"
	"time"
)

var done = make(chan struct{})

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()

	for {
		if cancelled() {
			fmt.Println("cancell")
			return
		} else {
			fmt.Println("press enter to cancell")
		}

		time.Sleep(1000 * time.Millisecond)
	}
}

运行结果,事件

➜  close git:(master) ✗ go run close.go
press enter to cancell
press enter to cancell
press enter to cancell
press enter to cancell

cancell

那么咱们能够利用这个广播机制来关闭全部的goroutine。input

首先来看一段代码,

package main

import (
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"time"
)

var done = make(chan struct{})

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
	for _, entry := range dirents(dir) {
		if entry.IsDir() {
			subdir := filepath.Join(dir, entry.Name())
			walkDir(subdir, fileSizes)
		} else {
			fileSizes <- entry.Size()
		}
	}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du1: %v\n", err)
		return nil
	}
	return entries
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()
	var roots = []string{"/Users/xinxingegeya"}

	// Traverse the file tree.
	fileSizes := make(chan int64)
	go func() {
		for _, root := range roots {
			walkDir(root, fileSizes)
		}
		close(fileSizes)
	}()

	// Print the results.

	tick := time.Tick(1 * time.Second)
	var nfiles, nbytes int64

loop:
	for {
		select {
		case size, ok := <-fileSizes:
			if !ok {
				break loop // fileSizes was closed
			}
			nfiles++
			nbytes += size
		case <-tick:
			printDiskUsage(nfiles, nbytes)
		}
	}
	printDiskUsage(nfiles, nbytes) // final totals
}

func printDiskUsage(nfiles, nbytes int64) {
	fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

这段代码,会遍历指定目录,计算出该目录下文件的数目和所用空间的大小。这段代码还不是并发的执行,咱们下面改为并发的执行,而且经过上面所说的广播机制来中断全部运行中的goroutine退出计算任务。

package main

import (
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sync"
	"time"
)

var done = make(chan struct{})

// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
	defer n.Done()
	if cancelled() {
		return
	}
	for _, entry := range dirents(dir) {
		if entry.IsDir() {
			n.Add(1)
			subdir := filepath.Join(dir, entry.Name())
			go walkDir(subdir, n, fileSizes)
		} else {
			fileSizes <- entry.Size()
		}
	}
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
	select {
	case sema <- struct{}{}: // acquire token
	case <-done:
		return nil // cancelled
	}
	defer func() { <-sema }() // release token
	entries, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Fprintf(os.Stderr, "du1: %v\n", err)
		return nil
	}
	return entries
}

func main() {

	// Cancel traversal when input is detected.
	go func() {
		os.Stdin.Read(make([]byte, 1)) // read a single byte
		close(done)
	}()
	var roots = []string{"/Users/xinxingegeya"}

	// Traverse the file tree.
	fileSizes := make(chan int64)

	var n sync.WaitGroup
	for _, root := range roots {
		n.Add(1)
		go walkDir(root, &n, fileSizes)
	}
	go func() {
		n.Wait()
		close(fileSizes)
	}()

	// Print the results.
	tick := time.Tick(1 * time.Second)
	var nfiles, nbytes int64

loop:
	for {
		select {
		case <-done:
			// Drain fileSizes to allow existing goroutines to finish.
			for range fileSizes {
				// Do nothing.
			}
			return
		case size, ok := <-fileSizes:
			if !ok {
				break loop // fileSizes was closed
			}
			nfiles++
			nbytes += size
		case <-tick:
			printDiskUsage(nfiles, nbytes)
		}
	}
	printDiskUsage(nfiles, nbytes) // final totals
}

func printDiskUsage(nfiles, nbytes int64) {
	fmt.Printf("%d files  %.1f GB\n", nfiles, float64(nbytes)/1e9)
}

运行结果,

➜  interrupt git:(master) ✗ go run interrupt.go
du1: open /Users/xinxingegeya/Library/Saved Application State/com.bitrock.appinstaller.savedState: permission denied
11553 files  32.6 GB
27593 files  44.1 GB
27929 files  44.2 GB
56182 files  46.3 GB
70592 files  48.2 GB
85680 files  49.9 GB
97835 files  49.9 GB
110396 files  49.9 GB
119635 files  49.9 GB

当按下enter键后,程序会退出。

===========END===========

相关文章
相关标签/搜索