[快速入门]Go语言的CSP并发模型

Go语言的并发模型

Go语言实现了一下两种并发形式:编程

第一种是你们广泛认知的:多线程共享内存。其实就许多主流编程语言中的多线程开发。bash

另一种是Go语言特有的,也是Go语言推荐的:CSP(communicating sequential processes)并发模型。该方式是Go语言最大的两个亮点goroutine和chan,两者合体的典型应用。多线程

CSP 是 Communicating Sequential Process 的简称,中文能够叫作通讯顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体经过共享的通信 channel(管道)进行通讯的并发模型。并发

Go语言其实只用到了 CSP 的很小一部分,即理论中的 Process/Channel(对应到语言中的 goroutine/channel):这两个并发原语之间没有从属关系, Process 能够订阅任意个 Channel,Channel 也并不关心是哪一个 Process 在利用它进行通讯;Process 围绕 Channel 进行读写,造成一套有序阻塞和可预测的并发模型。异步

相信你们必定见过一句话:编程语言

Do not communicate by sharing memory; instead, share memory by communicating.函数

不要经过共享内存来通讯,而要经过通讯来实现内存共享。单元测试

这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。测试

channel

channel的建立

channel 字面意义是 “通道”,相似于 Linux 中的管道。声明 channel 的语法以下:ui

chan T          // 能够接收和发送类型为 T 的数据
chan<- float64  // 只能够用来发送 float64 类型的数据
<-chan int      // 只能够用来接收 int 类型的数据
复制代码

使用make初始化Channel,而且能够设置容量:

make(chan int, 100)
复制代码

由于 channel 是一个引用类型,因此在它被初始化以前,它的值是 nil,channel 使用 make 函数进行初始化。能够向它传递一个 int 值,表明 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。

Channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操做实际上能够看做 “同步模式”,带缓冲的则称为 “异步模式”。

同步模式下,发送方和接收方要同步就绪,只有在二者都 ready 的状况下,数据才能在二者间传输。不然,任意一方先行进行发送或接收操做,都会被挂起,等待另外一方的出现才能被唤醒。

异步模式下,在缓冲槽可用的状况下(有剩余容量),发送和接收操做均可以顺利进行。不然,操做的一方(如写入)一样会被挂起,直到出现相反操做(如接收)才会被唤醒。

代码示例

//这里定义两个函数,下面分别验证同步模式执行以及异步模式执行的效果
func service() {
	time.Sleep(time.Millisecond * 30)
	return "Done"
}
func otherTask() {
	fmt.Println("this is other task B")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("Task B is done")
}
复制代码

同步模式执行

func AsyncService() chan string { 
	//阻塞模式,即A将信息放进channel直到有人读取,不然将一直阻塞	
	retCh := make(chan string) 
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

//单元测试
func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
复制代码

单测结果运行以下,能够看出等到当othertask执行完开始从chan中取数据时协程才继续向下执行,在这以前一直处于挂起状态

this is other task B
service return result
Task B is done
Done
service exited
复制代码

异步模式执行

func AsyncService() chan string { 
	retCh := make(chan string,1) //buffer模式,非阻塞 丢进channel就继续向下执行
	go func () {
		ret := service()
		fmt.Println("service return result")
		retCh <- ret 
		fmt.Println("service exited")
	}()
	return retCh
}

func TestAsynService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
	time.Sleep(time.Second * 1)
}
复制代码

执行结果以下,能够明显的看到这种模式下并无等待从chan中获取消息,直接向下继续运行

this is other task
service return result
service exited
Task B is done
Done
复制代码

channel的使用

1.send操做

c := make(chan int)
c <- 3
复制代码

注意,往一个已经被close的channel中继续发送数据会致使run-time panic

2.recive操做

c := make(chan int)
c <- 3
i := <-c
fmt.Println(i) //3
复制代码

从一个nil channel中接收数据会一直被block,直到有数据能够接收;从一个被close的channel中接收数据不会被阻塞,而是当即返回,会返回元素类型的零值(zero value)以及一个表明当前channel状态的bool值。能够经过这个特性判断channel是否关闭

if x, ok := <-ch;ok {    //ok 为bool值,true标识正常接收,false表示通道关闭
    ...
}else{
    ...
} 
复制代码

3.close操做

c := make(chan int)
close(c)
复制代码

全部的channel接受者都会在channel关闭时,马上从阻塞等待中返回且上述ok值为false(若是有值可取依旧会正常取值)。这个广播机制常被利用,进行向多个订阅者同时发送信号

代码示例

//数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		close(ch)	//channel关闭

		wg.Done()
	}()

}

//数据接受者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok {	//channel关闭后,ok值将变为false
				fmt.Println(data)
			} else {
				break
			}
		}
		wg.Done()
	}()

}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg)
	wg.Add(1)
	dataReceiver(ch, &wg)
	wg.Wait()
复制代码

与switch-case搭配实现选路

select-case语句配合channel能够实现多路选择以及超时控制功能,每一个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,若是均没有响应则执行default

//多渠道选择
//原理以下,采用select-case语句 每一个case后面跟一个阻塞事件,当有事件收到响应后则结束等待,若是均没有响应则执行default
func TestSwitch(t *testing.T){
	select{
		case ret1 := <-retCH1:
			t.Logf("case 1 return")
		case ret2 := <-retCH2:
			t.Logf("case 2 return")
		default:
			t.Logf("no one return")
	}
}

//超时控制
func TestTimeOut(t *testing.T){
	select {
	case ret := <- retCH1:
		t.Logf("case 1 return")
	case <-time.After(time.Second*1):
		t.Logf("time out")
	}
}
复制代码
相关文章
相关标签/搜索