操做系统思考 第十章 条件变量

第十章 条件变量

做者:Allen B. Downeyjavascript

原文:Chapter 10 Condition variableshtml

译者:飞龙java

协议:CC BY-NC-SA 4.0nginx

像上一章所展现的那样,许多简单的同步问题均可以用互斥体解决。这一章中我会介绍一个更大的挑战,著名的“生产者-消费者”问题,以及一个用于解决它的新工具,条件变量。git

10.1 工做队列

在一些多线程的程序中,线程被组织用于执行不一样的任务。一般它们使用队列来相互通讯,其中一些线程叫作“生产者”,向队列中放入数据,另外一些线程叫作“消费者”,从队列取出数据。程序员

例如,在GUI应用中,可能有一个运行GUI的线程响应用户事件,而其它线程负责处理用户的请求。这里,GUI线程可能将数据放入队列中,而“后台”线程从队列中取出请求并执行。github

为了支持这种组织,咱们须要一个“线程安全”的队列实现,也就是说每一个线程均可以同时访问队列。咱们至少须要处理一个特殊状况,队列是空的,以及若是队列的大小有限制,队列是满的。web

我会从一个非线程安全的简单队列开始,以后咱们会观察其中的错误并修复它。这个示例的代码在本书仓库的queue目录中。queue.c文件包含了一个环形缓冲区的基本实现。你能够在环形缓冲区的维基百科查询更多信息。数组

下面是结构体的定义:安全

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
} Queue;

array是包含队列元素的数组。在这个例子中,元素都是整数,可是一般它们都是一些结构体,包含用户事件、工做项目以及其它。

length是数组的长度,next_in是数组的下标,用于索引下个元素应该添加到哪里;与之类似, next_out是应该被移除的下个元素的下标。

make_queue为这个结构体分配空间,而且初始化全部字段:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  return queue;
}

next_out的初始值须要一些解释。因为队列一开始为空,没有可移除的下一个元素,因此next_out是无效的。next_out==next_in是个特殊状况,它表示队列为空,因此咱们能够编写:

int queue_empty(Queue *queue) {
  return (queue->next_in == queue->next_out);
}

如今咱们可使用queue_push向队列里面添加元素:

void queue_push(Queue *queue, int item) {
  if (queue_full(queue)) {
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
}

若是队列满了,queue_push打印出错误信息并退出,我以后会解释queue_full

若是队列没有满,queue_push插入新元素,以后使用queue_incr增长next_in

int queue_incr(Queue *queue, int i) {
  return (i+1) % queue->length;
}

当索引i到达队列末尾时,它会转换为0。因而这样就很微妙了。若是咱们持续向队列添加元素,最后next_in会遇上next_out。可是若是next_in == next_out咱们会错误地认为队列是空的。

为了不这种状况,咱们定义另外一种特殊状况来表示队列是满的:

int queue_full(Queue *queue) {
  return (queue_incr(queue, queue->next_in) == queue->next_out);
}

若是next_in增长后与next_out重合,那么咱们若是添加新的元素,就会使队列看起来是空的。因此咱们在“末尾”留出一个元素(要记住队列的末尾可能位于任何地方,不必定是数组末尾)。

如今咱们能够编写queue_pop,它移除并返回队列的下一个元素:

int queue_pop(Queue *queue) {
  if (queue_empty(queue)) {
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  return item;
}

若是你尝试从空队列中弹出元素,queue_pop会打印错误信息并退出。

10.2 生产者和消费者

如今让咱们建立一些访问这个队列的线程。下面是生产者的代码:

void *producer_entry(void *arg)
{
  int i;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    printf("adding item %d\n", i);
    queue_push(shared->queue, i);
  }
  pthread_exit(NULL);
}

下面是消费者的代码:

void *consumer_entry(void *arg)
{
  int i;
  int item;
  Shared *shared = (Shared *) arg;

  for (i=0; i<QUEUE_LENGTH-1; i++) {
    item = queue_pop(shared->queue);
    printf("consuming item %d\n", item);
  }
  pthread_exit(NULL);
}

下面是用于启动线程并等待它们的主线程代码:

int i;
pthread_t child[NUM_CHILDREN];

Shared *shared = make_shared();

child[0] = make_thread(producer_entry, shared);
child[1] = make_thread(consumer_entry, shared);

for (i=0; i<NUM_CHILDREN; i++) {
    join_thread(child[i]);
}

最后,下面是包含队列的共享结构:

typedef struct {
  Queue *queue;
} Shared;

Shared *make_shared()
{
  Shared *shared = check_malloc(sizeof(Shared));
  shared->queue = make_queue(QUEUE_LENGTH);
  return shared;
}

到目前为止咱们所写的代码是一个好的开始,可是有以下几种问题:

  • 队列的访问不是线程安全的。不一样的线程能同时访问arraynext_innext_out,而且会使队列处于损坏的、“不一致”的状态。

  • 若是消费者首先被调度,它会发现队列为空,打印错误信息并退出。咱们应该阻塞住消费者,直到队列非空。与之类似,咱们应该在队列满了的状况下阻塞住生产者。

在下一节中,咱们会使用互斥体解决这一个问题。以后的章节中咱们会使用条件变量解决第二个问题。

10.3 互斥体

咱们可使用互斥体使队列线程安全。这个版本的代码在queue_mutex.c中。

首先咱们向队列结构中添加一个互斥体指针:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;          //-- this line is new
} Queue;

以后在make_queue中初始化互斥体:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();   //-- new
  return queue;
}

接下来向queue_push添加同步代码:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);   //-- new
  if (queue_full(queue)) {
    mutex_unlock(queue->mutex);   //-- new
    perror_exit("queue is full");
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);   //-- new
}

在检查队列是否已满以前,咱们须要锁住互斥体。若是队列是满的,咱们须要在退出以前解锁互斥体。不然线程应该保持互斥体锁住,使其它线程不能前进。

queue_pop的同步代码与之类似:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  if (queue_empty(queue)) {
    mutex_unlock(queue->mutex);
    perror_exit("queue is empty");
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  return item;
}

要注意其它队列函数,queue_fullqueue_emptyqueue_incr都不须要锁住互斥体。任何调用这些函数的线程都须要首先锁住互斥体。这些要求是这些函数的接口文档的一部分。

使用这些额外的代码,队列就线程安全了。若是你运行它,你不会看到任何的同步错误。可是彷佛消费者会在某个时间上退出,由于队列是空的。或者生产者会因为队列是知足而退出。

下一步就是添加条件变量。

10.4 条件变量

条件变量是条件相关的数据结构。它容许线程在某些条件变为真以前被阻塞。例如,thread_push可能但愿检查队列是否已满,若是是这样,就在队列未满以前阻塞。因此咱们感兴趣的“条件”就是“队列未满”。

与之类似,thread_pop但愿等待“队列非空”的条件。

下面是咱们向代码添加这些功能的方式。首先咱们向队列结构中添加两个条件变量:

typedef struct {
  int *array;
  int length;
  int next_in;
  int next_out;
  Mutex *mutex;
  Cond *nonempty;   //-- new
  Cond *nonfull;    //-- new
} Queue;

以后在make_queue中初始化它们:

Queue *make_queue(int length)
{
  Queue *queue = (Queue *) malloc(sizeof(Queue));
  queue->length = length;
  queue->array = (int *) malloc(length * sizeof(int));
  queue->next_in = 0;
  queue->next_out = 0;
  queue->mutex = make_mutex();
  queue->nonempty = make_cond();   //-- new
  queue->nonfull = make_cond();    //-- new
  return queue;
}

如今在queue_pop中,若是咱们发现队列为空,咱们不要退出,而是使用条件变量来阻塞:

int queue_pop(Queue *queue) {
  mutex_lock(queue->mutex);
  while (queue_empty(queue)) {
    cond_wait(queue->nonempty, queue->mutex);  //-- new
  }
  
  int item = queue->array[queue->next_out];
  queue->next_out = queue_incr(queue, queue->next_out);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonfull);   //-- new
  return item;
}

cond_wait有点复杂,因此让咱们慢慢来。第一个参数是条件变量。这里咱们须要等待的条件是“队列非空”。第二个变量是保护队列的互斥体。在你调用cond_wait以前,你须要先锁住互斥体,不然它不会生效。

当锁住互斥体的线程调用cond_wait时,它首先解锁互斥体,以后阻塞。这很是重要。若是cond_wait不在阻塞以前解锁互斥体,其它线程就不能访问队列,不能添加任何物品,队列会永远为空。

因此当消费者阻塞在nonempty的时候,生产者也能够运行。让咱们来观察生产者运行queue_push时会发生什么:

void queue_push(Queue *queue, int item) {
  mutex_lock(queue->mutex);
  while (queue_full(queue)) {
    cond_wait(queue->nonfull, queue->mutex);    //-- new
  }
  
  queue->array[queue->next_in] = item;
  queue->next_in = queue_incr(queue, queue->next_in);
  mutex_unlock(queue->mutex);
  cond_signal(queue->nonempty);  //-- new
}

让咱们假设队列如今未满,因而生产者并不会调用cond_wait也不会阻塞。它会向队列添加新的元素并解锁互斥体。可是在退出以前,它作了额外的一件事:它向nonempty条件变量发送信号。

向条件变量发送更新好表示条件为真,或者至少它可能为真。若是没有任何线程在等待条件变量,信号就不起做用。

若是有线程在等待条件变量,它们所有会从cond_wait解除阻塞而且恢复执行。可是在被唤醒的进程从cond_wait返回以前,它须要等待并再次锁住互斥体。

如今咱们回到queue_pop来观察当线程从cond_wait返回时会发生什么。它会循环到while语句的开头,并再次检查条件。我会在以后解释其缘由,可是如今让咱们假设条件为真,也就是说队列非空。

当线程从while循环退出以后,咱们知道了两件事情:(1)条件为真,因此队列中至少有一个物品,(2)互斥体是锁住的,因此访问队列是安全的。

在移除物品以后,queue_pop解锁了互斥体,发送了队列未满的信号,以后退出。

在下一节我会向你展现个人Cond的工做缘由,可是首先我想回答两个常见问题:

  • 为何cond_waitwhile循环中,而不是if语句中?也就是说,为何在从cond_wait返回以后要再次检查条件?

    须要再次检查条件的首要缘由就是信号拦截的可能性。假设线程A在等待`nonempty`,线程B向队列添加元素,以后向`nonempty`发送信号。线程A被唤醒而且尝试锁住互斥体,可是在轮到它以前,邪恶的线程C插进来了,锁住了互斥体,从队列中弹出物品而且解锁了互斥体。如今队列再次为空,可是线程A没有被阻塞。线程A会锁住互斥体而且从`cond_wait`返回。若是线程A再也不次检查条件,它会尝试从空队列中弹出元素,可能会产生错误。
    
    > 译者注:有些条件变量的实现能够每次只唤醒一个线程,好比Java对象的`notify`方法。这种状况就可使用`if`。
  • 当人们了解条件变量时,另外一个问题是“条件变量怎么知道它关联了哪一个条件?”

    这一问题能够理解,由于在`Cond`结构和有关条件之间没有明显的关联。在它的使用方式中,关联是隐性的。
    
    下面是一种理解它的办法:当你调用`cond_wait`时,`Cond`所关联的条件为假;当你调用`cond_signal`时它为真。固然,可能有一些条件第一种状况下为真,第二种状况下为假。正确的状况只在程序员的脑子中,因此它应该在文档中有详细的解释。

10.5 条件变量的实现

我在上一节中使用的条件变量是pthread_cond_t类型的包装,它定义在POSIX线程API中。这很是相似于Mutex,它是pthread_mutex_t的包装。两个包装都定义在utils.cutils.h中。

下面是类型定义:

typedef pthread_cond_t Cond;

make_cond分配空间,初始化条件变量,以后返回指针:

Cond *make_cond()
{
  Cond *cond = check_malloc(sizeof(Cond)); 
  int n = pthread_cond_init(cond, NULL);
  if (n != 0) perror_exit("make_cond failed");
 
  return cond;
}

下面是cond_waitcond_signal的包装:

 
 
 
 
<button href="javascript:void(0);" _xhe_href="javascript:void(0);" class="copyCode btn btn-xs" data-clipboard-text="" void="" cond_wait(cond="" *cond,="" mutex="" *mutex)"="" data-toggle="tooltip" data-placement="bottom" title="" style="color: rgb(255, 255, 255); font-style: inherit; font-variant: inherit; font-stretch: inherit; font-size: 12px; line-height: 1.5; font-family: inherit; margin: 0px 0px 0px 5px; overflow: visible; cursor: pointer; vertical-align: middle; border: 1px solid transparent; white-space: nowrap; padding-right: 5px; padding-left: 5px; border-radius: 3px; -webkit-user-select: none; box-shadow: rgba(0, 0, 0, 0.0980392) 0px 1px 2px; background-image: none; background-color: rgba(0, 0, 0, 0.74902);">复制
void cond_wait(Cond *cond, Mutex *mutex) { int n = pthread_cond_wait(cond, mutex); if (n != 0) perror_exit("cond_wait failed"); } void cond_signal(Cond *cond) { int n = pthread_cond_signal(cond); if (n != 0) perror_exit("cond_signal failed"); }

到这里就应该没有什么意外的东西了。

相关文章
相关标签/搜索