以前咱们介绍了锁,然而锁并非并发程序设计中所需的惟一原语。在不少状况下,线程须要检查某一条件(condition)知足以后,才会继续运行。例如,父线程须要检查子线程是否执行完毕。这种等待如何实现呢?安全
注:并发程序有两大需求,一是互斥,二是等待。互斥是由于线程间存在共享数据,等待则是由于线程间存在依赖。服务器
咱们能够尝试用一个共享变量,如图所示。这种解决方案通常能工做,可是效率低下,由于主线程会自旋检查,浪费CPU时间。咱们但愿有某种方式让父线程休眠,直到等待的条件知足(即子线程完成执行)。网络
1 volatile int done = 0; 2 3 void *child(void *arg) { 4 printf("child\n"); 5 done = 1; 6 return NULL; 7 } 8 9 int main(int argc, char *argv[]) { 10 printf("parent: begin\n"); 11 pthread_t c; 12 Pthread_create(&c, NULL, child, NULL); // create child 13 while (done == 0) 14 ; // spin 15 printf("parent: end\n"); 16 return 0; 17 }
线程可使用条件变量(condition variable),来等待一个条件变成真。条件变量是一个显式队列,当某些执行状态(即条件,condition)不知足时,线程能够把本身加入队列,等待该条件。当其余线程改变了上述状态时,就能够经过在该条件上发送信号唤醒队列中的等待线程,让它们继续执行。数据结构
在POSIX库中,要声明一个条件变量,只要像这样写:pthread_cond_t c(注意:还须要适当的初始化)。条件变量有两种相关操做:wait()和signal()。线程要睡眠的时候,调用wait();当线程想唤醒等待在某个条件变量上的睡眠线程时,调用signal()。下面是一个典型示例:多线程
1 int done = 0; 2 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; 3 pthread_cond_t c = PTHREAD_COND_INITIALIZER; 4 5 void thr_exit() { 6 Pthread_mutex_lock(&m); 7 done = 1; 8 Pthread_cond_signal(&c); 9 Pthread_mutex_unlock(&m); 10 } 11 12 void *child(void *arg) { 13 printf("child\n"); 14 thr_exit(); 15 return NULL; 16 } 17 18 void thr_join() { 19 Pthread_mutex_lock(&m); 20 while (done == 0) 21 Pthread_cond_wait(&c, &m); 22 Pthread_mutex_unlock(&m); 23 } 24 25 int main(int argc, char *argv[]) { 26 printf("parent: begin\n"); 27 pthread_t p; 28 Pthread_create(&p, NULL, child, NULL); 29 thr_join(); 30 printf("parent: end\n"); 31 return 0; 32 }
wait()调用除了条件变量外还有一个参数,它是一个互斥锁。它假定在wait()调用时,这个互斥锁是已上锁状态。wait()的职责是原子地释放锁,并让调用线程休眠。当线程被唤醒时,它必须从新获取锁,再返回调用者。这样复杂的步骤也是为了不在线程陷入休眠时,产生一些竞态条件。并发
有两种状况须要考虑。第一种状况是父线程建立出子线程,但本身继续运行,而后立刻调用thr_join()等待子线程。在这种状况下,它会先获取锁,检查子线程是否完成,而后调用wait(),让本身休眠。子线程最终得以运行,打印出“child”,并调用thr_exit()函数唤醒父线程,这段代码会在得到锁后设置状态变量done,而后向父线程发信号唤醒它。最后,父线程会运行(从wait()调用返回并持有锁),释放锁,打印出“parent:end”。函数
第二种状况是,子线程在建立后,马上运行,设置变量done为1,调用signal函数唤醒其余线程(这里没有其余线程),而后结束。父线程运行后,调用thr_join()时,发现done已是1了,就直接返回。高并发
须要注意的是,在上面的代码中,状态变量done和互斥锁c都是必需的。假如咱们不使用状态变量,代码像下面这样,会出现什么问题?oop
1 void thr_exit() { 2 Pthread_mutex_lock(&m); 3 Pthread_cond_signal(&c); 4 Pthread_mutex_unlock(&m); 5 } 6 7 void thr_join() { 8 Pthread_mutex_lock(&m); 9 Pthread_cond_wait(&c, &m); 10 Pthread_mutex_unlock(&m); 11 }
假设子线程马上运行,而且调用thr_exit()。在这种状况下,子线程发送信号,但此时却没有在条件变量上睡眠等待的线程。父线程运行时,就会调用wait并卡在那里,没有其余线程会唤醒它。经过这个例子,你应该认识到变量done的重要性,它记录了线程感兴趣的值。睡眠、唤醒和锁都离不开它。post
在下面的例子中,咱们假设线程在发信号和等待时都不加锁。又会发生什么问题?
1 void thr_exit() { 2 done = 1; 3 Pthread_cond_signal(&c); 4 } 5 6 void thr_join() { 7 if (done == 0) 8 Pthread_cond_wait(&c); 9 }
这里的问题是一个微妙的竞态条件。具体来讲,若是父进程调用thr_join(),检查完done的值为0,而后试图睡眠。但在调用wait进入睡眠以前,父进程被中断。随后子线程修改变量done为1,发出信号,此时一样没有等待线程。当父线程再次运行时,就会长眠不醒。
因此,咱们能够坚持这样一条原则:在使用条件变量时,调用signal和wait时要持有锁。
假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区,消费者从缓冲区取走数据项,以某种方式消费。不少实际的系统中都会有这种场景。例如,在多线程的网络服务器中,一个生产者将HTTP请求放入工做队列,消费线程从队列中取走请求并处理。
由于有界缓冲区是共享资源,因此咱们必须经过同步机制来访问它,以避免产生竞态条件。为了更好地理解这个问题,咱们来尝试一些实际的代码。
首先须要一个共享缓冲区,让生产者放入数据,消费者取出数据。简单起见,咱们就拿一个整数来作缓冲区,两个内部函数将值放入缓冲区,从缓冲区取值。
1 int buffer; 2 int count = 0; // initially, empty 3 4 void put(int value) { 5 assert(count == 0); 6 count = 1; 7 buffer = value; 8 } 9 10 int get() { 11 assert(count == 1); 12 count = 0; 13 return buffer; 14 }
put()函数会假设缓冲区是空的,把一个值存在缓冲区,而后把count设置为1表示缓冲区满了。get()函数恰好相反,把缓冲区清空后,并返回该值。
如今咱们须要编写一些函数,用于生产和消费数据。调用生产函数的咱们称之为生产者(producer)线程,调用消费函数的咱们称之为消费者(consumer)线程。下面展现了一对非线程安全的生产者和消费者的代码,生产者将一个整数放入共享缓冲区loops次,消费者持续从该共享缓冲区中获取数据,并打印出数据项。咱们的目标就是使用条件变量将其改形成线程安全的版本。
1 void *producer(void *arg) { 2 int i; 3 int loops = (int) arg; 4 for (i = 0; i < loops; i++) { 5 put(i); 6 } 7 } 8 9 void *consumer(void *arg) { 10 int i; 11 while (1) { 12 int tmp = get(); 13 printf("%d\n", tmp); 14 } 15 }
显然,put()和get()函数之中会有临界区,由于put()更新缓冲区,get()读取缓冲区。咱们的首次尝试以下:
1 cond_t cond; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 if (count == 1) // p2 9 Pthread_cond_wait(&cond, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&cond); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 if (count == 0) // c2 21 Pthread_cond_wait(&cond, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&cond); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
当生产者想要填充缓冲区时,它等待缓冲区变空(p1~p3)。消费者具备彻底相同的逻辑,但等待不一样的条件——变满(c1~c3)。
当只有一个生产者和一个消费者时,上图的代码可以正常运行。但若是有超过一个线程,这个方案会有两个严重的问题。
先来看第一个问题,它与等待以前的if语句有关。假设有两个消费者(Tc1和Tc2),一个生产者(Tp)。首先,一个消费者(Tc1)先开始执行,它得到锁(c1),检查缓冲区是否能够消费(c2),而后等待(c3)。
接着生产者(Tp)运行。它获取锁(p1),检查缓冲区是否满(p2),发现没满就给缓冲区加入一个数字(p4)。而后生产者发出信号,说缓冲区已满(p5)。关键的是,这让第一个消费者(Tc1)再也不睡在条件变量上,进入就绪队列。生产者继续执行,直到发现缓冲区满后睡眠(p6,p1-p3)。
这时问题发生了:另外一个消费者(Tc2)抢先执行,消费了缓冲区中的值。如今假设Tc1运行,在从wait返回以前,它获取了锁,而后返回。而后它调用了get() (p4),但缓冲区已没法消费。断言触发,代码不能像预期那样工做。
问题产生的缘由很简单:在Tc1被生产者唤醒后,但在它运行以前,因为Tc2抢先运行,缓冲区的状态改变了。发信号给线程只是唤醒它们,暗示状态发生了变化,但并不会保证在它运行以前状态一直是指望的状况。
修复这个问题很简单:把if语句改成while。当消费者Tc1被唤醒后,马上再次检查共享变量(c2)。若是缓冲区此时为空,消费者就会回去继续睡眠(c3)。生产者中相应的if也改成while(p2)。
1 cond_t cond; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 while (count == 1) // p2 9 Pthread_cond_wait(&cond, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&cond); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 while (count == 0) // c2 21 Pthread_cond_wait(&cond, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&cond); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
咱们要记住一条关于条件变量的简单规则:老是使用while循环。
可是,这段代码仍然有一个问题,也是上文提到的两个问题之一,它和咱们只用了一个条件变量有关。
假设两个消费者(Tc1和Tc2)先运行,都睡眠了(c3)。生产者开始运行,在缓冲区放入一个值,唤醒了一个消费者(假定是Tc1),并开始睡眠。如今是一个消费者立刻要运行(Tc1),两个线程(Tc2和Tp)都等待在同一个条件变量上。
消费者Tc1醒过来并从wait()调用返回(c3),从新检查条件(c2),发现缓冲区是满的,消费了这个值(c4)。这个消费者而后在该条件上发信号(c5),唤醒一个在睡眠的线程。可是,应该唤醒哪一个线程呢?
由于消费者已经清空了缓冲区,很显然,应该唤醒生产者。可是,若是它唤醒了Tc2,问题就出现了。消费者Tc2会醒过来,发现队列为空(c2),又继续回去睡眠(c3)。生产者Tp刚才在缓冲区中放了一个值,如今在睡眠。消费者Tc1继续执行后也回去睡眠了。3个线程都在睡眠,显然是一个大问题。
咱们能够看出:信号显然须要,但必须更有指向性。消费者不该该唤醒消费者,而应该只唤醒生产者,反之亦然。
这个问题的解决方案也很简单:使用两个而不是一个条件变量,以便在系统状态改变时,能正确地发出信号唤醒哪类线程。下面展现了最终的代码。
1 cond_t empty, fill; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); 8 while (count == 1) 9 Pthread_cond_wait(&empty, &mutex); 10 put(i); 11 Pthread_cond_signal(&fill); 12 Pthread_mutex_unlock(&mutex); 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); 20 while (count == 0) 21 Pthread_cond_wait(&fill, &mutex); 22 int tmp = get(); 23 Pthread_cond_signal(&empty); 24 Pthread_mutex_unlock(&mutex); 25 printf("%d\n", tmp); 26 } 27 }
咱们如今有了可用的生产者/消费者方案,但不太通用,咱们最后所作的修改是为了提升并发和效率。具体来讲就是增长更多缓冲区槽位,这样在睡眠以前,生产者能够生产多个值;一样,消费者在睡眠以前能够消费多个值。
单个生产者和消费者时,这种方案由于上下文切换少,提升了效率。多个生产者和消费者时,它能够支持并发生产和消费。和现有方案相比,改动也很小。
第一处修改是缓冲区结构自己,以及对应的put()和get()方法:
1 int buffer[MAX]; 2 int fill = 0; 3 int use = 0; 4 int count = 0; 5 6 void put(int value) { 7 buffer[fill] = value; 8 fill = (fill + 1) % MAX; 9 count++; 10 } 11 12 int get() { 13 int tmp = buffer[use]; 14 use = (use + 1) % MAX; 15 count--; 16 return tmp; 17 }
下面展现了最终的代码逻辑。至此,咱们解决了生产者/消费者问题。
1 cond_t empty, fill; 2 mutex_t mutex; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 Pthread_mutex_lock(&mutex); // p1 8 while (count == MAX) // p2 9 Pthread_cond_wait(&empty, &mutex); // p3 10 put(i); // p4 11 Pthread_cond_signal(&fill); // p5 12 Pthread_mutex_unlock(&mutex); // p6 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 Pthread_mutex_lock(&mutex); // c1 20 while (count == 0) // c2 21 Pthread_cond_wait(&fill, &mutex); // c3 22 int tmp = get(); // c4 23 Pthread_cond_signal(&empty); // c5 24 Pthread_mutex_unlock(&mutex); // c6 25 printf("%d\n", tmp); 26 } 27 }
如今再来看条件变量的一个例子。这段代码是一个简单的多线程内存分配库中的问题片断:
1 // how many bytes of the heap are free? 2 int bytesLeft = MAX_HEAP_SIZE; 3 4 // need lock and condition too 5 cond_t c; 6 mutex_t m; 7 8 void *allocate(int size) { 9 Pthread_mutex_lock(&m); 10 while (bytesLeft < size) 11 Pthread_cond_wait(&c, &m); 12 void *ptr = ...; // get mem from heap 13 bytesLeft -= size; 14 Pthread_mutex_unlock(&m); 15 return ptr; 16 } 17 18 void free(void *ptr, int size) { 19 Pthread_mutex_lock(&m); 20 bytesLeft += size; 21 Pthread_cond_signal(&c); // whom to signal?? 22 Pthread_mutex_unlock(&m); 23 }
从代码中能够看出,当线程调用进入内存分配代码时,它可能会由于内存不足而等待。相应的,线程释放内存时,会发信号说有更多内存空闲。可是,代码中有一个问题:应该唤醒哪一个等待线程(可能有多个线程)?
解决方案也很直接:用pthread_cond_broadcast()代替上述代码中的pthread_cond_signal(),唤醒全部的等待线程。这样作,确保了全部应该唤醒的线程都被唤醒。固然,不利的一面是可能会影响性能,由于没必要要地唤醒了其余许多不应被唤醒的线程。这些线程被唤醒后,从新检查条件,立刻再次睡眠。
这种条件变量叫做覆盖条件(covering condition),由于它能覆盖全部须要唤醒线程的场景(保守策略)。通常来讲,若是你发现程序只有改为广播信号时才能工做,多是程序有缺陷。但在某些情景下,就像上述内存分配的例子中,广播多是最直接有效的方案。
信号量是Dijkstra及其同事发明的,做为与同步有关的全部工做的惟一原语,可使用信号量做为锁和条件变量。
信号量是有一个整数值的对象,能够用两个函数来操做它。在POSIX标准中,是sem_wait()和sem_post()。由于信号量的初始值可以决定其行为,因此首先要初始化信号量,才能调用其余函数与之交互。
#include <semaphore.h> sem_t s; sem_init(&s, 0, 1);
其中申明了一个信号量s,经过第三个参数,将它的值初始化为1。sem_init()的第二个参数,在咱们的全部例子中都被设置为0,表示信号量是在同一进程的多个线程共享的。信号量初始化以后,咱们能够调用sem_wait()或sem_post()与之交互。
sem_wait()对信号量的值进行原子减一操做,当信号量的值大于等于1时马上返回,不然会将调用线程放入信号量关联的队列中等待被唤醒。sem_post()对信号量的值进行原子加一操做,它不用等待某些条件知足,直接增长信号量的值,若是有等待线程,就唤醒其中一个。当信号量的值为负数时,这个值就是等待线程的个数。
信号量的第一种用法是咱们已经熟悉的:用信号量做为锁。在下面的代码片断里,咱们直接把临界区用一对sem_wait()/sem_post()环绕。为了使这段代码正常工做,信号量m的初始值X是相当重要的。X应该是多少呢?
sem_t m; sem_init(&m, 0, X); // initialize semaphore to X; what should X be? sem_wait(&m); // critical section here sem_post(&m);
回顾sem_wait()和sem_post()函数的定义,咱们发现初值应该是1。
咱们假设有两个线程的场景。第一个线程(线程1)调用了sem_wait(),它把信号量的值减为0。由于值是0,线程1从函数返回并进入临界区。若是没有其余线程尝试获取锁,当它调用sem_post()时,会将信号量重置为1(由于没有等待线程,不会唤醒其余线程)。
若是线程1持有锁,另外一个线程(线程2)调用sem_wait()尝试进入临界区。这种状况下,线程2把信号量减为−1,而后等待。线程1再次运行,它最终调用sem_post(),将信号量的值增长到0,唤醒等待的线程,而后线程2就能够获取锁。线程2执行结束时,再次增长信号量的值,将它恢复为1。
由于锁只有两个状态(持有和没持有),因此这种用法有时也叫做二值信号量(binary semaphore)。
下面是一个简单的例子。假设一个线程建立另外一个线程,而且等待它结束,那么信号量的初始值X应该是多少?
1 sem_t s; 2 3 void * 4 child(void *arg) { 5 printf("child\n"); 6 sem_post(&s); // signal here: child is done 7 return NULL; 8 } 9 10 int 11 main(int argc, char *argv[]) { 12 sem_init(&s, 0, X); // what should X be? 13 printf("parent: begin\n"); 14 pthread_t c; 15 Pthread_create(c, NULL, child, NULL); 16 sem_wait(&s); // wait here for child 17 printf("parent: end\n"); 18 return 0; 19 }
有两种状况须要考虑。第一种,父线程建立了子线程,可是子线程并无运行。这种状况下,父线程调用sem_wait()会先于子线程调用sem_post()。咱们但愿父线程等待子线程运行,惟一的办法是让信号量的值不大于0。所以,初值值为0。父线程运行,将信号量减为−1,而后睡眠等待;子线程运行的时候,调用sem_post(),信号量增长为0,唤醒父线程,父线程而后从sem_wait()返回,完成该程序。
第二种状况是子线程在父线程调用sem_wait()以前就运行结束。在这种状况下,子线程会先调用sem_post(),将信号量从0增长到1。而后当父线程有机会运行时,会调用sem_wait(),发现信号量的值为1。因而父线程将信号量从1减为0,没有等待,直接从sem_wait()返回,也达到了预期效果。
在这里,咱们讨论如何使用信号量来解决上面提到的生产者/消费者,也即有界缓冲区问题。封装的put()和get()函数以下:
1 int buffer[MAX]; 2 int fill = 0; 3 int use = 0; 4 5 void put(int value) { 6 buffer[fill] = value; // line f1 7 fill = (fill + 1) % MAX; // line f2 8 } 9 10 int get() { 11 int tmp = buffer[use]; // line g1 12 use = (use + 1) % MAX; // line g2 13 return tmp; 14 }
咱们用两个信号量empty和full分别表示缓冲区空或者满,下面是咱们尝试解决生产者/消费者问题的代码。
1 sem_t empty; 2 sem_t full; 3 4 void *producer(void *arg) { 5 int i; 6 for (i = 0; i < loops; i++) { 7 sem_wait(&empty); // line P1 8 put(i); // line P2 9 sem_post(&full); // line P3 10 } 11 } 12 13 void *consumer(void *arg) { 14 int i, tmp = 0; 15 while (tmp != -1) { 16 sem_wait(&full); // line C1 17 tmp = get(); // line C2 18 sem_post(&empty); // line C3 19 printf("%d\n", tmp); 20 } 21 } 22 23 int main(int argc, char *argv[]) { 24 // ... 25 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 26 sem_init(&full, 0, 0); // ... and 0 are full 27 // ... 28 }
咱们先假设MAX=1,验证程序是否有效。假设有两个线程,一个生产者和一个消费者。咱们来看在一个CPU上的具体场景。消费者先运行,执行到C1行,调用sem_wait(&full)。由于full初始值为0,wait调用会将full减为−1,致使消费者睡眠,等待另外一个线程调用sem_post(&full),符合预期。
假设生产者而后运行。执行到P1行,调用sem_wait(&empty)。生产者将继续执行,由于empty被初始化为MAX(在这里是1)。所以,empty被减为0,生产者向缓冲区中加入数据,而后执行P3行,调用sem_post(&full),把full从−1变成0,唤醒消费者。
在这种状况下,可能会有两种状况。若是生产者继续执行,再次循环到P1行,因为empty值为0,它会阻塞。若是生产者被中断,而消费者开始执行,调用sem_wait(&full),发现缓冲区确实满了,消费它。这两种状况都是符合预期的。
能够继续推导,在MAX=1时,即使有多个生产者和消费者的状况下,本示例代码仍然正常运行。
咱们如今假设MAX大于1,同时假定有多个生产者,多个消费者。那么就有问题了:竞态条件。假设两个生产者(Pa和Pb)几乎同时调用put()。当Pa先运行,在f1行先加入第一条数据(fill=0),假设Pa在将fill计数器更新为1以前被中断,Pb开始运行,也在f1行给缓冲区的0位置加入一条数据,这意味着那里的数据被覆盖,这也就意味着生产者的数据丢失。
能够看到,向缓冲区加入元素和增长缓冲区的索引是临界区,须要当心保护起来。因此,咱们使用二值信号量做为锁来进行互斥。下面是对应的代码。
1 sem_t empty; 2 sem_t full; 3 sem_t mutex; 4 5 void *producer(void *arg) { 6 int i; 7 for (i = 0; i < loops; i++) { 8 sem_wait(&mutex); // line p0 (NEW LINE) 9 sem_wait(&empty); // line p1 10 put(i); // line p2 11 sem_post(&full); // line p3 12 sem_post(&mutex); // line p4 (NEW LINE) 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 sem_wait(&mutex); // line c0 (NEW LINE) 20 sem_wait(&full); // line c1 21 int tmp = get(); // line c2 22 sem_post(&empty); // line c3 23 sem_post(&mutex); // line c4 (NEW LINE) 24 printf("%d\n", tmp); 25 } 26 } 27 28 int main(int argc, char *argv[]) { 29 // ... 30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 31 sem_init(&full, 0, 0); // ... and 0 are full 32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock (NEW LINE) 33 // ... 34 }
如今咱们给整个put()/get()部分都增长了锁,就是注释中有NEW LINE的几行。这彷佛是正确的思路,但仍然有问题——死锁。
假设有两个线程,一个生产者和一个消费者。消费者首先运行,得到锁,而后对full信号量执行sem_wait()。由于尚未数据,因此消费者阻塞,让出CPU。可是,问题来了,此时消费者仍然持有锁。而后生产者运行,它首先对二值互斥信号量调用sem_wait()。锁已经被消费者持有,所以生产者也被卡住。
这里出现了一个循环等待。消费者持有互斥量,等待在full信号量上。生产者能够发送full信号,却在等待互斥量。所以,生产者和消费者互相等待对方——典型的死锁。
要解决这个问题,只需减小锁的做用域,下面是最终的可行方案。能够看到,咱们把获取和释放互斥量的操做调整为紧挨着临界区,把full、empty的唤醒和等待操做调整到锁外面。就获得了简单而有效的有界缓冲区,多线程程序的经常使用模式。
1 sem_t empty; 2 sem_t full; 3 sem_t mutex; 4 5 void *producer(void *arg) { 6 int i; 7 for (i = 0; i < loops; i++) { 8 sem_wait(&empty); // line p1 9 sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...) 10 put(i); // line p2 11 sem_post(&mutex); // line p2.5 (... AND HERE) 12 sem_post(&full); // line p3 13 } 14 } 15 16 void *consumer(void *arg) { 17 int i; 18 for (i = 0; i < loops; i++) { 19 sem_wait(&full); // line c1 20 sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...) 21 int tmp = get(); // line c2 22 sem_post(&mutex); // line c2.5 (... AND HERE) 23 sem_post(&empty); // line c3 24 printf("%d\n", tmp); 25 } 26 } 27 28 int main(int argc, char *argv[]) { 29 // ... 30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... 31 sem_init(&full, 0, 0); // ... and 0 are full 32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock 33 // ... 34 }
另外一个经典问题源于对更加灵活的锁定原语的渴望,它认可不一样的数据结构访问可能须要不一样类型的锁。例如,一个并发链表有不少插入和查找操做。插入操做会修改链表的状态,而查找操做只是读取该结构,只要没有进行插入操做,咱们能够并发的执行多个查找操做。读者—写者锁(reader-writer lock)就是用来完成这种操做的。下面是这种锁的代码。
1 typedef struct _rwlock_t { 2 sem_t lock; // binary semaphore (basic lock) 3 sem_t writelock; // used to allow ONE writer or MANY readers 4 int readers; // count of readers reading in critical section 5 } rwlock_t; 6 7 void rwlock_init(rwlock_t *rw) { 8 rw->readers = 0; 9 sem_init(&rw->lock, 0, 1); 10 sem_init(&rw->writelock, 0, 1); 11 } 12 13 void rwlock_acquire_readlock(rwlock_t *rw) { 14 sem_wait(&rw->lock); 15 rw->readers++; 16 if (rw->readers == 1) 17 sem_wait(&rw->writelock); // first reader acquires writelock 18 sem_post(&rw->lock); 19 } 20 21 void rwlock_release_readlock(rwlock_t *rw) { 22 sem_wait(&rw->lock); 23 rw->readers--; 24 if (rw->readers == 0) 25 sem_post(&rw->writelock); // last reader releases writelock 26 sem_post(&rw->lock); 27 } 28 29 void rwlock_acquire_writelock(rwlock_t *rw) { 30 sem_wait(&rw->writelock); 31 } 32 33 void rwlock_release_writelock(rwlock_t *rw) { 34 sem_post(&rw->writelock); 35 }
若是某个线程要更新数据结构,须要调用rwlock_acquire_writelock()得到写锁,调用rwlock_release_writelock()释放写锁。内部经过一个writelock的信号量保证只有一个写者能得到锁进入临界区,从而更新数据结构。
获取读锁时,读者首先要获取lock,而后增长reader变量,追踪目前有多少个读者在访问该数据结构。当第一个读者获取读锁时,同时也会获取写锁,即在writelock信号量上调用sem_wait(),最后调用sem_post()释放lock。
一旦一个读者得到了读锁,其余的读者也能够获取这个读锁。可是,想要获取写锁的线程,就必须等到全部的读者都结束。最后一个退出的读者在writelock信号量上调用sem_post(),从而让等待的写者可以获取该锁。
这一方案可行,但有一些缺陷,尤为是公平性,读者很容易饿死写者。存在复杂一些的解决方案,好比有写者等待时,避免更多的读者进入并持有锁。最后,读者-写者锁一般加入了更多锁操做,所以和其余一些简单快速的锁相比,读者—写者锁在性能方面没有优点。
最后,咱们用底层的同步原语锁和条件变量,来实现本身的信号量,名字叫做Zemaphore。
1 typedef struct _Zem_t { 2 int value; 3 pthread_cond_t cond; 4 pthread_mutex_t lock; 5 } Zem_t; 6 7 // only one thread can call this 8 void Zem_init(Zem_t *s, int value) { 9 s->value = value; 10 Cond_init(&s->cond); 11 Mutex_init(&s->lock); 12 } 13 14 void Zem_wait(Zem_t *s) { 15 Mutex_lock(&s->lock); 16 while (s->value <= 0) 17 Cond_wait(&s->cond, &s->lock); 18 s->value--; 19 Mutex_unlock(&s->lock); 20 } 21 22 void Zem_post(Zem_t *s) { 23 Mutex_lock(&s->lock); 24 s->value++; 25 Cond_signal(&s->cond); 26 Mutex_unlock(&s->lock); 27 }
咱们实现的信号量和Dijkstra定义的信号量有一点细微区别,就是咱们没有保持当信号量的值为负数时,让它反映出等待的线程数。事实上,该值永远不会小于0。这一行为更容易实现,并符合现有的Linux实现。