Go并发编程

概述

简而言之,所谓并发编程是指在一台处理器上“同时”处理多个任务。linux

随着硬件的发展,并发程序变得愈来愈重要。Web服务器会一次处理成千上万的请求。平板电脑和手机app在渲染用户画面同时还会在后台执行各类计算任务和网络请求。即便是传统的批处理问题--读取数据,计算,写输出--如今也会用并发来隐藏掉I/O的操做延迟以充分利用现代计算机设备的多个核心。计算机的性能每一年都在以非线性的速度增加。算法

宏观的并发是指在一段时间内,有多个程序在同时运行。编程

并发在微观上,是指在同一时刻只能有一条指令执行,但多个程序指令被快速的轮换执行,使得在宏观上具备多个进程同时执行的效果,但在微观上并非同时执行的,只是把时间分红若干段,使多个程序快速交替的执行。安全

并行和并发

并行(parallel):指在同一时刻,有多条指令在多个处理器上同时执行。bash

并发(concurrency):指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具备多个进程同时执行的效果,但在微观上并非同时执行的,只是把时间分红若干段,经过cpu时间片轮转使多个进程快速交替的执行。服务器

大师曾以咖啡机的例子来解释并行和并发的区别。网络

  • 并行是两个队列同时使用两台咖啡机 (真正的多任务)
  • 并发是两个队列交替使用一台咖啡机 ( 假 的多任务)

下面的两张图是国人作的一些不错的比较并行与并发区别的图。数据结构

常见并发编程技术

进程并发

程序和进程

程序,是指编译好的二进制文件,在磁盘上,不占用系统资源(cpu、内存、打开的文件、设备、锁....)多线程

进程,是一个抽象的概念,与操做系统原理联系紧密。进程是活跃的程序,占用系统资源。在内存中执行。(程序运行起来,产生一个进程)闭包

咱们能够把程序比做剧本(纸),把进程比做一场戏(舞台、演员、灯光、道具...)

同一个剧本能够在多个舞台同时上演。一样,同一个程序也能够加载为不一样的进程(彼此之间互不影响)

如:同时开两个终端。各自都有一个bash但彼此ID不一样。

进程状态

进程基本的状态有5种。分别为初始态,就绪态,运行态,挂起态与终止态。其中初始态为进程准备阶段,常与就绪态结合来看。

进程并发

在使用进程 实现并发时会出现什么问题呢?

  1. 系统开销比较大,占用资源比较多,开启进程数量比较少。
  2. 在unix/linux系统下,还会产生“孤儿进程”和“僵尸进程”。

咱们知道在操做系统中,能够产生不少的进程。在unix/linux系统中,正常状况下,子进程是经过父进程fork建立的,子进程再建立新的进程。

而且父进程永远没法预测子进程 到底何时结束。 当一个 进程完成它的工做终止以后,它的父进程须要调用系统调用取得子进程的终止状态。

孤儿进程

父进程先于子进程结束,则子进程成为孤儿进程,子进程的父进程成为init进程,称为init进程领养孤儿进程。

僵尸进程

进程终止,父进程还没有回收,子进程残留资源(PCB)存放于内核中,变成僵尸(Zombie)进程。

Windows下的进程和Linux下的进程是不同的,它比较懒惰,历来不执行任何东西,只是为线程提供执行环境。而后由线程负责执行包含在进程的地址空间中的代码。当建立一个进程的时候,操做系统会自动建立这个进程的第一个线程,称为主线程。

线程并发

什么是线程

LWP:light weight process 轻量级的进程,本质还是进程 (Linux下)

进程:独立地址空间,拥有PCB

线程:有独立的PCB,但没有独立的地址空间(共享)

区别:在因而否共享地址空间。独居(进程);合租(线程)。

  • 线程:最小的执行单位
  • 进程:最小分配资源单位,可当作是只有一个线程的进程。

Windows系统下,能够直接忽略进程的概念,只谈线程。由于线程是最小的执行单位,是被系统独立调度和分派的基本单位。而进程只是给线程提供执行环境。

线程同步

同步即协同步调,按预约的前后次序运行。

线程同步,指一个线程发出某一功能调用时,在没有获得结果以前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。

举例1: 银行存款 5000。柜台,折:取3000;提款机,卡:取 3000。剩余:2000

举例2: 内存中100字节,线程T1欲填入全1, 线程T2欲填入全0。但若是T1执行了50个字节失去cpu,T2执行,会将T1写过的内容覆盖。当T1再次得到cpu继续 从失去cpu的位置向后写入1,当执行结束,内存中的100字节,既不是全1,也不是全0。

产生的现象叫作“与时间有关的错误”(time related)。为了不这种数据混乱,线程须要同步。

“同步”的目的,是为了不数据混乱,解决与时间有关的错误。实际上,不只线程间须要同步,进程间、信号间等等都须要同步机制。

所以,全部“多个控制流,共同操做一个共享资源”的状况,都须要同步。

锁的应用

互斥量mutex

Linux中提供一把互斥锁mutex(也称之为互斥量)。

每一个线程在对资源操做前都尝试先加锁,成功加锁才能操做,操做结束解锁。

资源仍是共享的,线程间也仍是竞争的,

但经过“锁”就将资源的访问变成互斥操做,然后与时间有关的错误也不会再产生了。

但,应注意:同一时刻,只能有一个线程持有该锁。

当A线程对某个全局变量加锁访问,B在访问前尝试加锁,拿不到锁,B阻塞。C线程不去加锁,而直接访问该全局变量,依然可以访问,但会出现数据混乱。

因此,互斥锁实质上是操做系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制。但,并无强制限定。

所以,即便有了mutex,若是有线程不按规则来访问数据,依然会形成数据混乱。

读写锁

与互斥量相似,但读写锁容许更高的并行性。其特性为:写独占,读共享。

读写锁状态:

特别强调:读写锁只有一把,但其具有两种状态:

  1. 读模式下加锁状态 (读锁)
  2. 写模式下加锁状态 (写锁)

读写锁特性:

  1. 读写锁是“写模式加锁”时, 解锁前,全部对该锁加锁的线程都会被阻塞。
  2. 读写锁是“读模式加锁”时, 若是线程以读模式对其加锁会成功;若是线程以写模式加锁会阻塞。
  3. 读写锁是“读模式加锁”时, 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求。优先知足写模式锁。读锁、写锁并行阻塞,写锁优先级高

读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。写独占、读共享。

读写锁很是适合于对数据结构读的次数远大于写的状况。

协程并发

协程:coroutine。也叫轻量级线程。

与传统的系统级线程和进程相比,协程最大的优点在于“轻量级”。能够轻松建立上万个而不会致使系统资源衰竭。而线程和进程一般很难超过1万个。这也是协程别称“轻量级线程”的缘由。

一个线程中能够有任意多个协程,但某一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源

多数语言在语法层面并不直接支持协程,而是经过库的方式支持,但用库的方式支持的功能也并不完整,好比仅仅提供协程的建立、销毁与切换等能力。若是在这样的轻量级线程中调用一个同步 IO 操做,好比网络通讯、本地文件读写,都会阻塞其余的并发执行轻量级线程,从而没法真正达到轻量级线程自己指望达到的目标。

在协程中,调用一个任务就像调用一个函数同样,消耗的系统资源最少!但能达到进程、线程并发相同的效果。

在一次并发任务中,进程、线程、协程都可以实现。从系统资源消耗的角度出发来看,进程至关多,线程次之,协程最少。

Go并发

Go 在语言级别支持协程,叫goroutine。Go 语言标准库提供的全部系统调用操做(包括全部同步IO操做),都会出让CPU给其余goroutine。这让轻量级线程的切换管理不依赖于系统的线程和进程,也不须要依赖于CPU的核心数量。

有人把Go比做21世纪的C语言。第一是由于Go语言设计简单,第二,21世纪最重要的就是并行程序设计,而Go从语言层面就支持并行。同时,并发程序的内存管理有时候是很是复杂的,而Go语言提供了自动垃圾回收机制。

Go语言为并发编程而内置的上层API基于顺序通讯进程模型CSP(communicating sequential processes)。这就意味着显式锁都是能够避免的,由于Go经过相对安全的通道发送和接受数据以实现同步,这大大地简化了并发程序的编写。

Go语言中的并发程序主要使用两种手段来实现。goroutine和channel。

goroutine

什么是goroutine

goroutine是Go并行设计的核心。goroutine说到底其实就是协程,它比线程更小,十几个goroutine可能体如今底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),固然会根据相应的数据伸缩。也正由于如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。

通常状况下,一个普通计算机跑几十个线程就有点负载过大了,可是一样的机器却能够轻松地让成百上千个goroutine进行资源竞争。

goroutine的建立

只需在函数调⽤语句前添加 go 关键字,就可建立并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。

在并发编程中,咱们一般想将一个过程切分红几块,而后让每一个goroutine各自负责一块工做,当一个程序启动时,主函数在一个单独的goroutine中运行,咱们叫它main goroutine。新的goroutine会用go语句来建立。而go语言的并发设计,让咱们很轻松就能够达成这一目的。

示例代码:

package main

import (
    "fmt"
    "time"
)

func newTask() {
    i := 0
    for {
        i++
        fmt.Printf("new goroutine: i = %d\n", i)
        time.Sleep(time.Second) //延时1秒
    }
}

func main() {

    //建立一个goroutine,启动另一个任务
    go newTask()

    //循环打印

    for i := 0; i < 5; i++ {
        fmt.Printf("main goroutine: i = %d\n", i)
        time.Sleep(time.Second) //延时1秒
        i++
    }
}

程序运行结果:

goroutine特性

主goroutine退出后,其它的工做的子goroutine也会自动退出:

能够看出,因为主goroutine(main函数)执行太快了,因此致使newTask还没执行,程序就退出了

package main

import (
    "fmt"
    "time"
)

func newTask() {
    i := 0
    for {
        i++
        fmt.Printf("new goroutine: i = %d\n", i)
        time.Sleep(time.Second) //延时1秒
    }
}

func main() {

    //建立一个goroutine,启动另一个任务
    go newTask()

    fmt.Println("hello world")
}

程序运行结果:

经过运行结果(运行了三次)能够看出来,其中有一次newTask获得了执行,可是也只输出了一次程序就退出了。另外两次newTask彻底彻底没有执行就退出程序了。

runtime包

Gosched

runtime.Gosched() 用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其余等待的任务运行,并在下次再得到cpu时间轮片的时候,从该出让cpu的位置恢复执行。

有点像跑接力赛,A跑了一会碰到代码runtime.Gosched() 就把接力棒交给B了,A歇着了,B继续跑。

示例代码:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    
    //建立一个goroutine
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")
    
    for i := 0; i < 2; i++ {
        runtime.Gosched() //import "runtime"包
        fmt.Println("hello")
    }
    /*
    屏蔽runtime.Gosched()运行结果以下:
    hello
    hello
    
    没有runtime.Gosched()运行结果以下:
    world
    world
    hello
    hello
     */
}

以上程序的执行过程以下:

主协程进入main()函数,进行代码的执行。当执行到go func()匿名函数时,建立一个新的协程,开始执行匿名函数中的代码,主协程继续向下执行,执行到runtime.Gosched( )时会暂停向下执行,直到其它协程执行完后,再回到该位置,主协程继续向下执行。

Goexit

调用 runtime.Goexit() 将当即终止当前 goroutine 执⾏,调度器确保全部已注册 defer延迟调用被执行。

示例代码:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func() {
        defer fmt.Println("A.defer")
        func() {
            defer fmt.Println("B.defer")
            runtime.Goexit()//终止当前 goroutine, import "runtime"
            fmt.Println("B") //不会执行
        }()
        fmt.Println("A") //不会执行
    }()
    //死循环,目的不让主goroutine结束
    for {}

}

程序运行结果:

GOMAXPROCS

调用 runtime.GOMAXPROCS() 用来设置能够并行计算的CPU核数的最大值,并返回以前的值。(默认是跑满整个CPU)

示例代码:

package main

import (
    "fmt"
    "runtime"
)

func main() {

    //n := runtime.GOMAXPROCS(1) //第一次 测试
    //打印结果: 111111111111111111111111110000000000000000000000000....

    n := runtime.GOMAXPROCS(2) //第二次 测试
    //打印结果: 1111111111111111111111110000000000000011111110000100000000111100001111
    fmt.Println(n)
    for {
        go fmt.Print(0)
        fmt.Print(1)
    }
}

在第一次执行runtime.GOMAXPROCS(1) 时,最多同时只能有一个goroutine被执行。因此会打印不少1。过了一段时间后,GO调度器会将其置为休眠,并唤醒另外一个goroutine,这时候就开始打印不少0了,在打印的时候,goroutine是被调度到操做系统线程上的。

在第二次执行runtime.GOMAXPROCS(2) 时, 咱们使用了两个CPU,因此两个goroutine能够一块儿被执行,以一样的频率交替打印0和1。

channel

channel是Go语言中的一个核心类型,能够把它当作管道。并发核心单元经过它就能够发送或者接收数据进行通信,这在必定程度上又进一步下降了编程的难度。

channel是一个数据类型,主要用来解决协程的同步问题以及协程之间数据共享(数据传递)的问题。

goroutine运行在相同的地址空间,所以访问共享内存必须作好同步。goroutine 奉行经过通讯来共享内存,而不是共享内存来通讯

引⽤类型 channel可用于多个 goroutine 通信。其内部实现了同步,确保并发安全。

定义channel变量

和map相似,channel也一个对应make建立的底层数据结构的引用

当咱们复制一个channel或用于函数参数传递时,咱们只是拷贝了一个channel引用,所以调用者和被调用者将引用同一个channel对象。和其它的引用类型同样,channel的零值也是nil。

定义一个channel时,也须要定义发送到channel的值的类型。channel可使用内置的make()函数来建立:

chan是建立channel所需使用的关键字。Type 表明指定channel收发数据的类型。

make(chan Type) //等价于make(chan Type, 0)
make(chan Type, capacity)

当咱们复制一个channel或用于函数参数传递时,咱们只是拷贝了一个channel引用,所以调用者和被调用者将引用同一个channel对象。和其它的引用类型同样,channel的零值也是nil。

当 参数capacity= 0 时,channel 是无缓冲阻塞读写的;当capacity > 0 时,channel 有缓冲、是非阻塞的,直到写满 capacity个元素才阻塞写入。

channel很是像生活中的管道,一边能够存放东西,另外一边能够取出东西。channel经过操做符 <- 来接收和发送数据,发送和接收数据语法:

channel <- value   //发送value到channel
<- channel         //取出channel里的一个值并丢弃
x := <-channel     //从channel中接收数据,并赋值给x
x, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空

默认状况下,无缓冲的channel接收和发送数据都是阻塞的,除非另外一端已经准备好,这样就使得goroutine同步变的更加的简单,而不须要显式的lock。

示例代码:

package main

import "fmt"

func main() {
    c := make(chan int)
    go func() {
        defer fmt.Println("子协程结束")
        fmt.Println("子协程正在运行.....")
        c <- 666 //666发送到c
    }()

    num := <-c //从c中接收数据,并赋值给num
    fmt.Println("num = ", num)
    fmt.Println("main协程结束")
}

程序运行结果:

无缓冲的channel

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操做。不然,通道会致使先执行发送或接收操做的 goroutine 阻塞等待。

这种对通道进行发送和接收的交互行为自己就是同步的。其中任意一个操做都没法离开另外一个操做单独存在。

阻塞:因为某种缘由数据没有到达,当前协程(线程)持续处于等待状态,直到条件知足,才接触阻塞。

同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。

下图展现两个 goroutine 如何利用无缓冲的通道来共享一个值:

  • 在第 1 步,两个 goroutine 都到达通道,但哪一个都没有开始执行发送或者接收。
  • 在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。
  • 在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 同样也会在通道中被锁住,直到交换完成。
  • 在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 获得释放。两个 goroutine 如今均可以去作别的事情了。

无缓冲的channel建立格式:

make(chan Type)  //等价于make(chan Type, 0)

若是没有指定缓冲区容量,那么该通道就是同步的,所以会阻塞到发送者准备好发送和接收者准备好接收。

示例代码:

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan int, 0) //建立无缓冲的通道c

    //内置函数len返回未被读取的缓冲元素数量,cap返回缓冲区大小
    fmt.Printf("len(c) = %d, cap(c) = %d\n", len(c), cap(c))

    go func() {
        defer fmt.Println("子协程结束")
        for i := 0; i < 3; i++ {
            c <- i
            fmt.Printf("子协程正在运行[%d]: len(c) = %d, cap(c) = %d\n", i, len(c), cap(c))
        }
    }()

    time.Sleep(time.Second * 2) //延时2s

    for i := 0; i < 3; i++ {
        num := <-c //从c中接收数据,并复制给num
        fmt.Printf("num = %d\n", num)
    }
    fmt.Println("main协程结束")
}

有缓冲的channel

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。

这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动做的条件也不一样。

只有通道中没有能够接收的值时,接收动做才会阻塞。

只有通道没有可用缓冲区容纳被发送的值时,发送动做才会阻塞。

这致使有缓冲的通道和无缓冲的通道之间的一个很大的不一样:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。

示例以下:

  • 在第 1 步,右侧的 goroutine 正在从通道接收一个值。
  • 在第 2 步,右侧的这个 goroutine独立完成了接收值的动做,而左侧的 goroutine 正在发送一个新值到通道里。
  • 在第 3 步,左侧的goroutine 还在向通道发送新值,而右侧的 goroutine 正在从通道接收另一个值。这个步骤里的两个操做既不是同步的,也不会互相阻塞。
  • 最后,在第 4 步,全部的发送和接收都完成,而通道里还有几个值,也有一些空间能够存更多的值。

有缓冲的channel建立格式:

make(chan Type, capacity)

若是给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含能够接收的数据,那么其通讯就会无阻塞地进行。

示例代码:

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan int, 3) //建立无缓冲的通道c

    //内置函数len返回未被读取的缓冲元素数量,cap返回缓冲区大小
    fmt.Printf("len(c) = %d, cap(c) = %d\n", len(c), cap(c))

    go func() {
        defer fmt.Println("子协程结束")
        for i := 0; i < 3; i++ {
            c <- i
            fmt.Printf("子协程正在运行[%d]: len(c) = %d, cap(c) = %d\n", i, len(c), cap(c))
        }
    }()

    time.Sleep(time.Second * 2) //延时2s

    for i := 0; i < 3; i++ {
        num := <-c //从c中接收数据,并复制给num
        fmt.Printf("num = %d\n", num)
    }
    fmt.Println("main协程结束")
}

关闭channel

若是发送者知道,没有更多的值须要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,由于接收者能够中止没必要要的接收等待。这能够经过内置的close函数来关闭channel实现。

示例代码:

 

注意:

  • channel不像文件同样须要常常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,才去关闭channel;
  • 关闭channel后,没法向channel 再发送数据(引起 panic 错误后致使接收当即返回channel类型的零值);
  • 关闭channel后,能够继续从channel接收数据;
  • 对于nil channel,不管收发都会被阻塞。

可使用 range 来迭代不断操做channel:

package main

import "fmt"

func main() {

    c := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            c <- i
        }
        //把close(c)注释掉,程序会死锁
        //close(c)
    }()

    for {
        //ok为true说明channel没有关闭,为false说明channel已经关闭
        if data, ok := <-c; ok {
            fmt.Println(data)
        } else {
            break
        }
    }
    fmt.Println("main协程结束")
}

单向channel

有的时候咱们会将通道做为参数在多个任务函数间传递,不少时候咱们在不一样的任务函数中使用通道都会对其进行限制,好比限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种状况。例如,咱们把上面的例子改造以下:

package main

import (
    "fmt"
)

func counter(in chan<- int) {
    for i := 0; i < 100; i++ {
        in <- i
    }
    close(in)
}

func squarer(in chan<- int, out <-chan int) {
    for i := range out {
        in <- i * i
    }
    close(in)
}

func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中

  • chan<- int是一个只能发送的通道,能够发送可是不能接收;
  • <-chan int是一个只能接收的通道,能够接收可是不能发送。

在函数传参及任何赋值操做中将双向通道转换为单向通道是能够的,但反过来是不能够的。

channel总结

定时器

time.Timer

Timer是一个定时器,表明将来的一个单一事件,你能够告诉timer你须要等待多长事件,它提供一个channel,在未来的那个时间那个channel提供了一个时间值。

它提供一个channel,在定时时间到达以前,没有数据写入timer.C会一直阻塞。直到定时时间到,向channel写入值,阻塞解除,能够从中读取数据。

示例代码:

package main

import (
    "fmt"
    "time"
)

func main() {
    //建立定时器,2秒后,定时器就会向本身的C字段发送一个time.Time类型的元素值
    timer1 := time.NewTimer(time.Second * 2)
    t1 := time.Now() //当前时间
    fmt.Printf("t1: %v\n", t1)

    t2 := <-timer1.C
    fmt.Printf("t2: %v\n", t2)

    //若是只是想单纯的等待的话,可使用time.Sleep来实现
    timer2 := time.NewTimer(time.Second * 2)
    <-timer2.C
    fmt.Println("2s后")

    time.Sleep(time.Second * 2)
    fmt.Println("再一次2s后")

    <-time.After(time.Second * 2)
    fmt.Println("再再一次2s后")

    timer3 := time.NewTimer(time.Second)
    go func() {
        <-timer3.C
        fmt.Println("Timer 3 expired")
    }()

    stop := timer3.Stop() //中止定时器
    if stop {
        fmt.Println("Timer 3 stopped")
    }

    fmt.Println("before")
    timer4 := time.NewTimer(time.Second * 5) //原来设置5秒
    timer4.Reset(time.Second * 1)            //从新设置时间
    <-timer4.C
    fmt.Println("after")
}

定时器的经常使用操做:

  1. 实现延迟功能 (1) <-time.After(time.Second * 2) //定时2s,阻塞2s, 2s后产生一个事件,往channel写内容
    fmt.Println("时间到") (2) time.Sleep(time.Second * 2)
    fmt.Println("时间到")

    (3) 延时2s后打印一句话
    timer := time.NewTimer(time.Second * 2)
    <-timer.C
    fmt.Println("时间到")

  2. 定时器中止

    timer := time.NewTimer(time.Second * 3)
    go func() {
      <-timer.C
      fmt.Println("子协程能够打印了,由于定时器的时间到")
    }()
    
    timer.Stop() //中止定时器
    for {
    }
  3. 定时器重置

    timer := time.NewTimer(3 * time.Second)
    ok := timer.Reset(1 * time.Second) //从新设置为1s
    fmt.Println("ok = ", ok)
    <-timer.C
    fmt.Println("时间到")

time.Ticker

Ticker是一个定时触发的计时器,它会以一个间隔(interval)往channel发送一个事件(当前时间),而channel的接受者能够以固定的时间间隔从channel中读取事件。

示例代码:

package main

import (
    "fmt"
    "time"
)

func main() {

    //建立定时器,每隔1s后,定时器就会给channel发送一个事件(当前时间)
    ticker := time.NewTicker(1 * time.Second)

    go func() {
        i := 0
        for { //循环
            <-ticker.C
            i++
            fmt.Println("i =", i)

            if i == 5 {
                ticker.Stop() //中止定时器
            }
        }
    }()

    //死循环,特意不让main goroutine结束
    for {

    }
}

select

select做用

Go里面提供了一个关键字select,经过select能够监听channel上的数据流动。

select的用法与switch语言很是相似,由select开始一个新的选择块,每一个选择条件由case语句来描述。

与switch语句相比, select有比较多的限制,其中最大的一条限制就是每一个case语句里必须是一个IO操做,大体的结构以下:

select {
    case <-chan1:
    // 若是chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
    // 若是成功向chan2写入数据,则进行该case处理语句
    default:
    // 若是上面都没有成功,则进入default处理流程
}

在一个select语句中,Go语言会按顺序从头到尾评估每个发送和接收的语句。

若是其中的任意一语句能够继续执行(即没有被阻塞),那么就从那些能够执行的语句中任意选择一条来使用。

若是没有任意一条语句能够执行(即全部的通道都被阻塞),那么有两种可能的状况:

  • 若是给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。
  • 若是没有default语句,那么select语句将被阻塞,直到至少有一个通讯能够进行下去。

示例代码:

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 1, 1
    for {
        select {
        case c <-x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {

    c := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 6; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

运行结果以下:

示例2:

package main

import "fmt"

func main() {
    var ch = make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
        }
    }
}

使用select语句能提升代码的可读性。

  • 可处理一个或多个channel的发送/接收操做。
  • 若是多个case同时知足,select会随机选择一个。
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

超时

有时候会出现goroutine阻塞的状况,那么咱们如何避免整个程序进入阻塞的状况呢?咱们能够利用select来设置超时,经过以下的方式实现:

package main

import (
    "fmt"
    "time"
)

func main() {

    c := make(chan int)
    o := make(chan bool)

    go func() {
        for {
            select {
            case v := <-c:
                fmt.Println(v)
            case <-time.After(5 * time.Second):
                fmt.Println("timeout")
                o <- true
                break
            }
        }
    }()
    //c <- 666 //注释掉,引起timeout
    <-o
}

前面咱们为了解决协程同步的问题咱们使用了channel,可是GO也提供了传统的同步工具。

它们都在GO的标准库代码包sync和sync/atomic中。

什么是锁呢?就是某个协程(线程)在访问某个资源时先锁住,防止其它协程的访问,等访问完毕解锁后其余协程再来加锁进行访问。这和咱们生活中加锁使用公共资源类似,例如:公共卫生间。

死锁

死锁是指两个或两个以上的进程在执行过程当中,因为竞争资源或者因为彼此通讯而形成的一种阻塞的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁

示例代码:

package main

import "fmt"

func main() {
    ch := make(chan int)
    ch <- 1 // I'm blocked because there is no channel read yet.
    fmt.Println("send")
    go func() {
        <-ch // I will never be called for the main routine is blocked!
        fmt.Println("received")
    }()
    fmt.Println("over")
}

互斥锁

每一个资源都对应于一个可称为 "互斥锁" 的标记,这个标记用来保证在任意时刻,只能有一个协程(线程)访问该资源。其它的协程只能等待。

互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库sync中的Mutex结构体类型表示。sync.Mutex类型只有两个公开的指针方法,Lock和Unlock。Lock锁定当前的共享资源,Unlock进行解锁。

在使用互斥锁时,必定要注意:对资源操做完成后,必定要解锁,不然会出现流程执行异常,死锁等问题。

有时候在Go代码中可能会存在多个goroutine同时操做一个资源(临界区),这种状况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车箱里的人竞争。

举个例子:

package main

import (
    "fmt"
    "sync"
)

var (
    x int64
    wg sync.WaitGroup
)

func add() {
    defer wg.Done()
    for i := 0; i < 5000; i++ {
        x = x + 1
    }
}

func main() {

    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代码中咱们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,致使最后的结果与期待的不符。

互斥锁是一种经常使用的控制共享资源访问的方法,它可以保证同时只有一个goroutine能够访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:

package main

import (
    "fmt"
    "sync"
)

var (
    x int64
    wg sync.WaitGroup
    lock sync.Mutex
)

func add() {
    defer wg.Done()
    for i := 0; i < 5000; i++ {
        lock.Lock() //加锁
        x = x + 1
        lock.Unlock() //解锁
    }
}

func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥锁可以保证同一时间有且只有一个goroutine进入临界区,其余的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才能够获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写锁

互斥锁的本质是当一个goroutine访问的时候,其余goroutine都不能访问。这样在资源同步,避免竞争的同时也下降了程序的并发性能。程序由原来的并行执行变成了串行执行。

其实,当咱们对一个不会变化的数据只作“读”操做的话,是不存在资源竞争的问题的。由于数据是不变的,无论怎么读取,多少goroutine同时读取,都是能够的。

因此问题不是出在“读”上,主要是修改,也就是“写”。修改的数据要同步,这样其余goroutine才能够感知到。因此真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操做的必要的。

所以,衍生出另一种锁,叫作读写锁

读写锁可让多个读操做并发,同时读取,可是对于写操做是彻底互斥的。也就是说,当一个goroutine进行写操做的时候,其余goroutine既不能进行读操做,也不能进行写操做。

GO中的读写锁由结构体类型sync.RWMutex表示。此类型的方法集合中包含两对方法:

一组是对写操做的锁定和解锁,简称“写锁定”和“写解锁”:

func (*RWMutex)Lock()
func (*RWMutex)Unlock()

另外一组表示对读操做的锁定和解锁,简称为“读锁定”与“读解锁”:

func (*RWMutex)RLock()
func (*RWMutex)RUlock()

读写锁基本示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    x int64
    wg sync.WaitGroup
    rwlock sync.RWMutex
)

func write() {
    defer wg.Done()
    rwlock.Lock() //加写锁
    x = x + 1
    time.Sleep(time.Millisecond * 10) //假设写操做耗时10毫秒
    rwlock.Unlock() //解写锁
}

func read() {
    defer wg.Done()
    rwlock.RLock() //加读锁
    time.Sleep(time.Millisecond) //假设读操做耗时一毫秒
    rwlock.RUnlock() //解读锁
}

func main() {

    var start = time.Now()

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }
    wg.Wait()

    var end = time.Now()
    fmt.Println(end.Sub(start))
}

咱们在read里使用读锁,也就是RLock和RUnlock,写锁的方法名和咱们平时使用的同样,是Lock和Unlock。这样,咱们就使用了读写锁,能够并发地读,可是同时只能有一个写,而且写的时候不能进行读操做。

须要注意的是读写锁很是适合读多写少的场景,若是读和写的操做差异不大,读写锁的优点就发挥不出来。

总结:读写锁控制下的多个写操做之间都是互斥的,而且写操做与读操做之间也都是互斥的。可是,多个读操做之间不存在互斥关系。

从互斥锁和读写锁的源码能够看出,它们是同源的。读写锁的内部用互斥锁来实现写锁定操做之间的互斥。能够把读写锁看做是互斥锁的一种扩展。

sync.WaitGroup

在代码中生硬的使用time.Sleep确定是不合适的,Go语言中可使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有如下几个方法:

方法名 功能
(wg *WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值能够增长和减小。例如当咱们启动了N 个并发任务时,就将计数器值增长N。每一个任务完成时经过调用Done()方法将计数器减1。经过调用Wait()来等待并发任务执行完,当计数器值为0时,表示全部并发任务已经完成。

咱们利用sync.WaitGroup将上面的代码优化一下:

package main

import (
    "fmt"
    "sync"
)

func sayHello(wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Hello")
}
func main() {

    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go sayHello(&wg)
    }
    fmt.Println("main goroutine done!")
    wg.Wait()
}

须要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

sync.Once

说在前面的话:这是一个进阶知识点。

在编程的不少场景下咱们须要确保某些操做在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个Do方法,其签名以下:

func (o *Once) Do(f func()) {}

备注:若是要执行的函数f须要传递参数就须要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操做到真正用到它的时候再执行是一个很好的实践。由于预先初始化一个变量(好比在init函数中完成初始化)会增长程序的启动耗时,并且有可能实际执行过程当中这个变量没有用上,那么这个初始化操做就不是必需要作的。咱们来看一个例子:

var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每一个goroutine都知足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为如下结果:

func loadIcons() {
    icons = make(map[string]image.Image)
    icons["left"] = loadIcon("left.png")
    icons["up"] = loadIcon("up.png")
    icons["right"] = loadIcon("right.png")
    icons["down"] = loadIcon("down.png")
}

在这种状况下就会出现即便判断了icons不是nil也不意味着变量初始化完成了。考虑到这种状况,咱们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其余的goroutine操做,可是这样作又会引起性能问题。

使用sync.Once改造的示例代码以下:

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

并发安全的单利模式

下面是借助sync.Once实现的并发安全的单例模式:

package main

import "sync"

type singleton struct{}

var (
    instance *singleton
    once sync.Once
)

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操做的时候是并发安全的而且初始化操做也不会被执行屡次。

sync.Map

Go语言中内置的map不是并发安全的。请看下面的示例:

package main

import (
    "fmt"
    "strconv"
    "sync"
)

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}

func set(key string, value int) {
    m[key] = value
}

func main() {

    var wg = sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            key := strconv.Itoa(n)
            set(key, n)
            fmt.Printf("k: %v, v: %v\n", key, get(key))
        }(i)
    }
    wg.Wait()
}

上面的代码开启少许几个goroutine的时候可能没什么问题,当并发多了以后执行上面的代码就会报fatal error: concurrent map writes错误。

像这种场景下就须要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map同样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操做方法。

package main

import (
    "fmt"
    "strconv"
    "sync"
)

func main() {
    var m = sync.Map{}
    var wg = sync.WaitGroup{}
    for i := 0; i < 40; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k: %v, v: %v\n", key, value)
        }(i)
    }
    wg.Wait()
}

原子操做

代码中的加锁操做由于涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型咱们还可使用原子操做来保证并发安全,由于原子操做是Go语言提供的方法它在用户态就能够完成,所以性能比加锁操做更好。Go语言中原子操做由内置的标准库sync/atomic提供。

atomic包

方法 解释
func LoadInt32(addr int32) (val int32) func
LoadInt64(addr
int64) (val int64)
func LoadUint32(addr uint32) (val uint32)
func LoadUint64(addr
uint64) (val uint64)
func LoadUintptr(addr uintptr) (val uintptr)
func LoadPointer(addr
unsafe.Pointer) (val unsafe.Pointer)
读取操做
func StoreInt32(addr int32, val int32)
func StoreInt64(addr
int64, val int64)
func StoreUint32(addr uint32, val uint32)
func StoreUint64(addr
uint64, val uint64)
func StoreUintptr(addr uintptr, val uintptr)
func StorePointer(addr
unsafe.Pointer, val unsafe.Pointer)
写入操做
func AddInt32(addr int32, delta int32) (new int32)
func AddInt64(addr
int64, delta int64) (new int64)
func AddUint32(addr uint32, delta uint32) (new uint32)
func AddUint64(addr
uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操做
func SwapInt32(addr int32, new int32) (old int32)
func SwapInt64(addr
int64, new int64) (old int64)
func SwapUint32(addr uint32, new uint32) (old uint32)
func SwapUint64(addr
uint64, new uint64) (old uint64)
func SwapUintptr(addr uintptr, new uintptr) (old uintptr)
func SwapPointer(addr
unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操做
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr
int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr
uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr
unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操做

示例

咱们经过一个示例来比较下互斥锁和原子操做的性能。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type Counter interface {
    Inc()
    Load() int64
}

//普通版
type NormalCounter struct {
    x int64
}

func (normal *NormalCounter) Inc() {
    normal.x++
}

func (normal *NormalCounter) Load() int64 {
    return normal.x
}


//互斥锁版
type MutexCounter struct {
    x int64
    lock sync.Mutex
}

func (m *MutexCounter) Inc () {
    m.lock.Lock()
    m.x++
    m.lock.Unlock()
}

func (m *MutexCounter) Load() int64 {
    m.lock.Lock()
    defer m.lock.Unlock()
    return m.x
}

//原子操做版
type AtomicCounter struct {
    x int64
}

func (a *AtomicCounter) Inc() {
    atomic.AddInt64(&a.x, 1)
}

func (a *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&a.x)
}

func test(c Counter) {
    var wg sync.WaitGroup
    var start = time.Now()

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            c.Inc()
        }()
    }

    wg.Wait()
    var end = time.Now()
    fmt.Printf("执行用时: %v, 结果为: %v\n", end.Sub(start), c.Load())
}

func main() {

    var c1 = NormalCounter{} // 非并发安全
    test(&c1)

    var c2 = MutexCounter{} // 使用互斥锁实现并发安全
    test(&c2)

    var c3 = AtomicCounter{} // 并发安全且比互斥锁效率更高
    test(&c3)
}

atomic包提供了底层的原子级内存操做,对于同步算法的实现颇有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

相关文章
相关标签/搜索