在计算机科学中,并发的定义是指:在一个程序的运行过程当中,程序的不一样部分能够以乱序或者部分有序的方式执行,可是最终程序的输出结果与顺序执行一致。linux
定义中有两个关键点golang
假设程序 由
两部分组成,
依赖
,顺序执行状况下,先执行
而后执行
,输出结果为
,耗时:
。数据库
进一步研究发现, 能够分红
两部分,且
依赖于另外一个任务
,也就是执行完
以后,须要等待
也执行完才能继续往下走,令等待
完成的时间为
,那么就有
,这是顺序执行状况下的耗时。编程
再进一步研究发现, 仅仅依赖于
,为了提升效率,咱们能够这样作,执行完
以后,为了不CPU等待空闲,直接调度任务
,等任务
完成以后,假设任务
也完成了,那么切换到任务
执行完
部分。数据结构
这是一个简单的并发case,能够看到多线程
那为何要费这么多事来实现并发呢,老老实实顺序执行很差吗?换句话说,经过并发咱们得到了什么。并发
效率是关键。在上面的例子中,采用并发的执行方式, 被节省下来。编程语言
并发带来的另外一个明显好处是多任务,就算是在一个单核CPU(single processor)机器上,也能同时运行多个应用,这是由于多个应用能够分时复用CPU,这是多个应用之间的并发。实际上,单核CPU同一时刻只能运行一个应用(这就是为何我把上文的“同时”二字加粗的缘由),可是从用户的视角来看好像有多个CPU同样,应用之间的并发虚拟化出了多个CPU的效果。函数
没有并发的世界是可怕的,想想你只有把所有的工做作完才能去玩游戏,但是工做哪有作完的时候呢?我只好在工做和游戏之间来回切换,切换是有代价的,全情投入工做以前,要先把游戏里的心思先收回来,回忆起上一段工做的内容,这跟线程切换几乎如出一辙。把我类比成CPU,实际上,我从事的各类活动是在分时复用“我”这个资源的。工具
通常咱们遇到线程并发和协程并发的状况比较多,这里的线程和协程就是并发单位。为了具体的说明问题,咱们拿线程的并发举例。
第一个关键概念是临界区(critical section)。临界区指的是一段代码,这段代码会访问共享资源,这个共享资源多是一个简单变量,也多是一个更加复杂的数据结构。
第二个关键概念是竞争条件(race condition)。竞争条件是指,在多线程程序中,多个线程有可能几乎同时访问临界区,而且尝试更新共享资源,这可能致使意想不到的结果。好比说,两个线程同时对共享变量x执行自增操做,结果多是+1,也多是+2。也就是程序的运行结果是不肯定的,这样的程序叫作非肯定程序(indeterminate program),这是第三个关键概念。
程序运行的非肯定性,这是并发要解决的本质问题,至于并发程序难于编写、难于理解、编写不当还会出现死锁,这些都是属于技术层面的问题(因此并发的定义中没有提到效率)。
为了保证并发程序的肯定性,咱们须要使用一些工具,这些工具叫同步原语(mutual exclusion primitives),具体来讲有:
互斥变量提供一种加锁机制。在访问临界区的以前,调用互斥变量的lock函数,可以保证每次只有一个线程进入到临界区,固然,离开临界区以后作的第一件事就是调用互斥变量提供的unlock函数,释放共享资源,保证其余线程或者当前线程下一次可以再次进入临界区。多线程环境下,共享变量x的自增操做可使用互斥变量来保证正确性。
互斥变量提供一种互斥访问的机制,条件变量提供的则是同步机制。想让任务B在任务A以后执行,只须要使用互斥变量m,在调度任务B以前调用m的wait函数,在执行任务A以后调用m的signal函数。m的做用是,无论调度顺序怎么样,在signal执行以前,wait会一直等待。
信号量最先由Dijkstra提出,目的也是为了防止竞争条件的出现,可是其原始语义与条件变量和信号量不同,而且咱们会看到,互斥变量和条件变量都是信号量的一种特殊形式。
每一个信号量都有一个counter,表明当前可用资源数,信号量还提供两个操做,sem_wait:当counter-1大于0的时候返回成功,且执行counter减1,当counter-1小于0的时候阻塞;sem_post,执行counter加1操做,且若是当前有线程正在等待,随机唤醒其中一个线程。
counter值只能取0或者1的的信号量称之为布尔信号量(binary semaphore),counter初始值为1的布尔信号量功能至关于互斥变量,counter初始值为0的布尔信号量至关于条件变量。
下面用具体的case说明为何binary semaphore能够实现互斥变量、条件变量的功能。
golang中的sync.Mutex就是互斥变量,如上所述,互斥变量能够解决多线程共享变量自增的正确性。
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m) // 這裡必定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
复制代码
互斥变量做用等同于容量为1的信号量,因此上面的case能够改写成:
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m chan int) {
m <- 1 // 信号量代替mutex
x = x + 1
<- m
wg.Done()
}
func main() {
var w sync.WaitGroup
m := make(chan int, 1)
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, m) // 這裡必定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
复制代码
GOLANG中的条件变量就是unbuffered channel。实际上,channel就是golang中的信号量实现,buffered channel的capacity就是信号量中的counter,不防统一称之为容量。
事实上,unbuffered channel的capacity等于0,前面说过,容量为0的信号量做用等同于条件变量。
下面的case我想在程序退出(也就是main goroutine结束)以前在屏幕上输出hello world,为了实现这点,我使用了done这个类型为chan bool的channel变量。
package main
import (
"fmt"
"time"
)
func hello(done chan bool) {
fmt.Println("hello world")
time.Sleep(4 * time.Second)
done <- true
}
func main() {
done := make(chan bool) // done的做用等同于条件变量
fmt.Println("Main going to call hello go goroutine")
go hello(done)
<- done // 管道读操做一直block,直到 hello goroutine执行并往管道中写数据,注释掉此行,main goroutine会一直执行到结束,hello goroutine不会被调度
fmt.Println("Main received data")
}
复制代码
C语言中同步原语的实现体如今pthread
(POSIX Threads,POSIX是个标准,pthread是按照POSIX关于线程的标准实现的线程库)这个库中。
另外,pthread
还提供pthread_join
函数,其语义与GOLANG中的waitgroup一致。
GOLANG中的SELECT语义pthread
库没有直接提供,可是POSIX标准里面定义了select和pselect这两个功能差很少的函数来实现这个语义,linux中这两个函数都是做为系统调用实现,不一样的是select和pselect监听的都是文件描述符(poll epoll select的区别与联系)。
下面用C和GOLANG两种语言实现多producer,多consumer的生产者消费者队列。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>
#define MAX 4
int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;
int loops = 100;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
sem_t empty;
sem_t full;
sem_t mutex;
void *producer(void *arg) {
int i;
for(i = 0; i < loops; i++)
{
sem_wait(&empty);
sem_wait(&mutex);
put(i);
sem_post(&mutex);
sem_post(&full);
}
return 0;
}
void *consumer(void *arg) {
int i, tmp = 0;
for(i = 0; i < loops; i++)
{
sem_wait(&full);
sem_wait(&mutex);
tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("current number : %d\n", tmp);
}
return 0;
}
int main() {
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
pthread_t p, c, p1, p2, c1, c2;
pthread_create(&p, NULL, producer, NULL);
pthread_create(&p1, NULL, producer, NULL);
pthread_create(&p2, NULL, producer, NULL);
pthread_create(&c, NULL, consumer, NULL);
pthread_create(&c1, NULL, consumer, NULL);
pthread_create(&c2, NULL, consumer, NULL);
pthread_join(p, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_join(c, NULL);
pthread_join(c1, NULL);
pthread_join(c2, NULL);
return 0;
}
复制代码
package main
import (
"fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100
var msgs = make(chan int, MSG_BUFFER)
// 多个消费者,用buffered channel控制消费者所有执行完以后推出main goroutine
var done = make(chan int, COSUMER_CNT)
func produce() {
for i := 0; i < NUM_CNT; i++ {
msgs <- i
}
}
func consume() {
for i := 0; i < NUM_CNT; i++ {
msg := <-msgs
fmt.Println(msg)
}
done <- 1
}
func main () {
for i:= 0; i < COSUMER_CNT; i++ {
go produce()
go consume()
}
for i:= 0; i < COSUMER_CNT; i++ {
<- done
}
}
复制代码
互斥变量其实是一个锁,条件变量和信号量都是基于锁实现的,有必要说说锁的原理,下篇内容包括: