并发的关键概念

并发

并发是什么?

在计算机科学中,并发的定义是指:在一个程序的运行过程当中,程序的不一样部分能够以乱序或者部分有序的方式执行,可是最终程序的输出结果与顺序执行一致。linux

定义中有两个关键点golang

  1. 乱序或者部分有序
  2. 结果与顺序执行一致

假设程序 XA,B 两部分组成,B 依赖 A,顺序执行状况下,先执行 A 而后执行 B,输出结果为 y,耗时:T_A + T_B数据库

进一步研究发现,A 能够分红 A_1,A_2 两部分,且 A_2 依赖于另外一个任务 C,也就是执行完A_1以后,须要等待 C 也执行完才能继续往下走,令等待 C 完成的时间为 T_C,那么就有 T_A + T_B= T_{A1} + T_C + T_{A2} + T_B,这是顺序执行状况下的耗时。编程

再进一步研究发现,B 仅仅依赖于 A_1,为了提升效率,咱们能够这样作,执行完 A_1 以后,为了不CPU等待空闲,直接调度任务 B,等任务 B 完成以后,假设任务 C 也完成了,那么切换到任务 A 执行完 A_2 部分。数据结构

这是一个简单的并发case,能够看到多线程

  1. 程序的执行顺序变成部分有序,从A_1 -> A_2 -> B 变成 A_1 -> B -> A_2
  2. 任务 BA_2 的前置依赖均被知足,保证了最终输出结果依然是 y

为何要并发?

那为何要费这么多事来实现并发呢,老老实实顺序执行很差吗?换句话说,经过并发咱们得到了什么。并发

效率是关键。在上面的例子中,采用并发的执行方式,T_C 被节省下来。编程语言

并发带来的另外一个明显好处是多任务,就算是在一个单核CPU(single processor)机器上,也能同时运行多个应用,这是由于多个应用能够分时复用CPU,这是多个应用之间的并发。实际上,单核CPU同一时刻只能运行一个应用(这就是为何我把上文的“同时”二字加粗的缘由),可是从用户的视角来看好像有多个CPU同样,应用之间的并发虚拟化出了多个CPU的效果。函数

没有并发的世界是可怕的,想想你只有把所有的工做作完才能去玩游戏,但是工做哪有作完的时候呢?我只好在工做和游戏之间来回切换,切换是有代价的,全情投入工做以前,要先把游戏里的心思先收回来,回忆起上一段工做的内容,这跟线程切换几乎如出一辙。把我类比成CPU,实际上,我从事的各类活动是在分时复用“我”这个资源的。工具

并发有什么问题?

通常咱们遇到线程并发和协程并发的状况比较多,这里的线程和协程就是并发单位。为了具体的说明问题,咱们拿线程的并发举例。

第一个关键概念是临界区(critical section)。临界区指的是一段代码,这段代码会访问共享资源,这个共享资源多是一个简单变量,也多是一个更加复杂的数据结构。

第二个关键概念是竞争条件(race condition)。竞争条件是指,在多线程程序中,多个线程有可能几乎同时访问临界区,而且尝试更新共享资源,这可能致使意想不到的结果。好比说,两个线程同时对共享变量x执行自增操做,结果多是+1,也多是+2。也就是程序的运行结果是不肯定的,这样的程序叫作非肯定程序(indeterminate program),这是第三个关键概念。

程序运行的非肯定性,这是并发要解决的本质问题,至于并发程序难于编写、难于理解、编写不当还会出现死锁,这些都是属于技术层面的问题(因此并发的定义中没有提到效率)。

怎么实现并发?

为了保证并发程序的肯定性,咱们须要使用一些工具,这些工具叫同步原语(mutual exclusion primitives),具体来讲有:

  1. 互斥变量(mutex)
  2. 条件变量(condition variable,也叫作monitor)
  3. 信号量(semaphore)

互斥变量提供一种加锁机制。在访问临界区的以前,调用互斥变量的lock函数,可以保证每次只有一个线程进入到临界区,固然,离开临界区以后作的第一件事就是调用互斥变量提供的unlock函数,释放共享资源,保证其余线程或者当前线程下一次可以再次进入临界区。多线程环境下,共享变量x的自增操做可使用互斥变量来保证正确性。

互斥变量提供一种互斥访问的机制,条件变量提供的则是同步机制。想让任务B在任务A以后执行,只须要使用互斥变量m,在调度任务B以前调用m的wait函数,在执行任务A以后调用m的signal函数。m的做用是,无论调度顺序怎么样,在signal执行以前,wait会一直等待。

信号量最先由Dijkstra提出,目的也是为了防止竞争条件的出现,可是其原始语义与条件变量和信号量不同,而且咱们会看到,互斥变量和条件变量都是信号量的一种特殊形式。

每一个信号量都有一个counter,表明当前可用资源数,信号量还提供两个操做,sem_wait:当counter-1大于0的时候返回成功,且执行counter减1,当counter-1小于0的时候阻塞;sem_post,执行counter加1操做,且若是当前有线程正在等待,随机唤醒其中一个线程。

counter值只能取0或者1的的信号量称之为布尔信号量(binary semaphore),counter初始值为1的布尔信号量功能至关于互斥变量,counter初始值为0的布尔信号量至关于条件变量。

GOLANG中的互斥变量、条件变量、信号量

下面用具体的case说明为何binary semaphore能够实现互斥变量、条件变量的功能。

GOLANG中的互斥变量

golang中的sync.Mutex就是互斥变量,如上所述,互斥变量能够解决多线程共享变量自增的正确性。

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m *sync.Mutex) {
  m.Lock()
  x = x + 1
  m.Unlock()
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  var m sync.Mutex
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, &m) // 這裡必定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

复制代码

互斥变量做用等同于容量为1的信号量,因此上面的case能够改写成:

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m chan int) {
  m <- 1 // 信号量代替mutex
  x = x + 1
  <- m
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  m := make(chan int, 1)
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, m) // 這裡必定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

复制代码

GOLANG中的条件变量

GOLANG中的条件变量就是unbuffered channel。实际上,channel就是golang中的信号量实现,buffered channel的capacity就是信号量中的counter,不防统一称之为容量。

事实上,unbuffered channel的capacity等于0,前面说过,容量为0的信号量做用等同于条件变量。

下面的case我想在程序退出(也就是main goroutine结束)以前在屏幕上输出hello world,为了实现这点,我使用了done这个类型为chan bool的channel变量。

package main

import (
    "fmt"
    "time"
)

func hello(done chan bool) {
    fmt.Println("hello world")
    time.Sleep(4 * time.Second)
    done <- true
}
func main() {
    done := make(chan bool) // done的做用等同于条件变量
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <- done // 管道读操做一直block,直到 hello goroutine执行并往管道中写数据,注释掉此行,main goroutine会一直执行到结束,hello goroutine不会被调度
    fmt.Println("Main received data")
}

复制代码

C语言中的互斥变量、条件变量、信号量

C语言中同步原语的实现体如今pthread(POSIX Threads,POSIX是个标准,pthread是按照POSIX关于线程的标准实现的线程库)这个库中。

  • pthread_mutex_t:互斥变量类型,加锁:pthread_mutex_lock,释放锁:pthread_mutex_unlock
  • pthread_cond_t:条件变量类型,等待:pthread_cond_wait,唤醒:pthread_cond_signal
  • sem_t:信号量类型,wait:sem_wait,post:sem_post

另外,pthread还提供pthread_join函数,其语义与GOLANG中的waitgroup一致。

GOLANG中的SELECT语义pthread库没有直接提供,可是POSIX标准里面定义了select和pselect这两个功能差很少的函数来实现这个语义,linux中这两个函数都是做为系统调用实现,不一样的是select和pselect监听的都是文件描述符(poll epoll select的区别与联系)。

生产者消费者问题

下面用C和GOLANG两种语言实现多producer,多consumer的生产者消费者队列。

  1. C语言版
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>

#define MAX 4

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

int loops = 100;

void put(int value) {
    buffer[fill] = value;
    fill = (fill + 1) % MAX;
    count++;
}

int get() {
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    count--;

    return tmp;
}

sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg) {
    int i;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&empty);
        sem_wait(&mutex);
        put(i);
        sem_post(&mutex);
        sem_post(&full);
    }

    return 0;
}

void *consumer(void *arg) {
    int i, tmp = 0;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&full);
        sem_wait(&mutex);
        tmp = get();
        sem_post(&mutex);
        sem_post(&empty);
        printf("current number : %d\n", tmp);
    }

    return 0;
}


int main() {
    sem_init(&empty, 0, MAX);
    sem_init(&full, 0, 0);
    sem_init(&mutex, 0, 1);

    pthread_t p, c, p1, p2, c1, c2;

    pthread_create(&p, NULL, producer, NULL);
    pthread_create(&p1, NULL, producer, NULL);
    pthread_create(&p2, NULL, producer, NULL);

    pthread_create(&c, NULL, consumer, NULL);
    pthread_create(&c1, NULL, consumer, NULL);
    pthread_create(&c2, NULL, consumer, NULL);

    pthread_join(p, NULL);
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);
    pthread_join(c, NULL);
    pthread_join(c1, NULL);
    pthread_join(c2, NULL);

    return 0;
}
复制代码
  1. GOLANG版
package main

import (
  "fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100

var msgs = make(chan int, MSG_BUFFER)
// 多个消费者,用buffered channel控制消费者所有执行完以后推出main goroutine
var done = make(chan int, COSUMER_CNT)

func produce() {
    for i := 0; i < NUM_CNT; i++ {
        msgs <- i
    }
}

func consume() {
    for i := 0; i < NUM_CNT; i++ {
        msg := <-msgs
        fmt.Println(msg)
    }
    done <- 1
}

func main () {
    for i:= 0; i < COSUMER_CNT; i++ {
        go produce()
        go consume()
    }

    for i:= 0; i < COSUMER_CNT; i++ {
        <- done
    }
}
复制代码

总结以及下篇展望

互斥变量其实是一个锁,条件变量和信号量都是基于锁实现的,有必要说说锁的原理,下篇内容包括:

  • cpu层面的原子操做 TAS case(汇编代码)
  • 几种锁的实现方式(硬件中断锁、自旋锁、队列锁)
  • 评估不一样锁实现方式的关键:效率和公平
  • 几种编程语言中的锁:GOLANG、JAVA
  • 数据库中的锁

参考

  • 本文大部分知识经过阅读Operating Systems: Three Easy Pieces这本操做系统教材习得,这是我读过的最好的关于操做系统的教材。本书最大的特色是将操做系统仅仅分红三个部分来说述:虚拟化、并行、持久化,虽然只有三个部分,操做系统的核心部分却都覆盖到了。本书做者的语言功底很强,读起来朗朗上口。另外,本书的脉络清楚,整本书都在回答一个问题:操做系统做为第一个也是最重要的一个软件,是如何使得计算机系统易于用户使用的。
相关文章
相关标签/搜索