在上一篇博客互斥量中,解决了线程如何互斥访问临界资源的问题。html
在开始本文以前,咱们先保留一个问题:为何须要条件变量,若是只有互斥量不能解决什么问题?api
条件变量的数据类型是 pthread_cond_t
.app
初始化,销毁 API 为:函数
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); int pthread_cond_destroy(pthread_cond_t *cond);
函数原型:ui
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
做用:atom
The
pthread_cond_wait
function atomically blocks the current thread waiting on the condition variable specified bycond
, and releases the mutex specified bymutex
. The waiting thread unblocks only after another thread callspthread_cond_signal
, orpthread_cond_broadcast
with the same condition variable, and the current thread re-acquires the lock on mutex.线程—— Manual on MacOS.code
在条件变量 cond
上阻塞线程,加入 cond
的等待队列,并释放互斥量 mutex
. 若是其余线程使用同一个条件变量 cond
调用了 pthread_cond_signal/broadcast
,唤醒的线程会从新得到互斥锁 mutex
.htm
函数原型:blog
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
做用:与 pthread_cond_wait
相似,但该线程被唤醒的条件是其余线程调用了 signal/broad
,或者系统时间到达了 abstime
。
函数原型:
int pthread_cond_signal(pthread_cond_t *cond)
做用:
The
pthread_cond_signal
function shall unblock at least one of the threads that are blocked no the specified condition variablecond
(if any threads are blocked oncond
).If more than one thread is blocked on a condition variable, the scheduling policy shall determine the order which threads are unblocked.
When each thread unblocked as a result of a
pthread_cond_broadcast()
orpthread_cond_signal()
returns from its call topthread_cond_wait()
orpthread_cond_timedwait()
, the thread shall own themutex
with which it calledpthread_cond_wait()
orpthread_cond_timedwait()
.The thread(s) that are unblocked shall contend for the
mutex
according to the scheduling policy (if applicable), and as if each had calledpthread_mutex_lock()
.The
pthread_cond_broadcast()
andpthread_cond_signal()
functions shall have no effect if there are no threads currently blocked oncond
.——Manual on Ubuntu.
唤醒一个在 cond
上等待的至少一个线程,若是 cond
上阻塞了多个线程,那么将根据调度策略选取一个。
当被唤醒的线程从 wait/timedwait
函数返回,将从新得到 mutex
(但可能须要竞争,由于可能唤醒多个线程)。
函数原型:
int pthread_cond_broadcast(pthread_cond_t *cond);
做用:唤醒全部在 cond
上等待的线程。
又称 PC (Producer - Consumer) 问题。
详细问题定义能够看:
具体要求:
buffer
实质上是一个队列。
const int CAPACITY = 4; typedef struct { char items[CAPACITY]; int in, out; } buffer_t; void buffer_init(buffer_t *b) { b->in = b->out = 0; } int buffer_is_full(buffer_t *b) { return ((b->in + 1) % CAPACITY) == (b->out); } int buffer_is_empty(buffer_t *b) { return b->in == b->out; } void buffer_put_item(buffer_t *buf, char item) { buf->items[buf->in] = item; buf->in = (buf->in + 1) % CAPACITY; } char buffer_get_item(buffer_t *buf) { char item = buf->items[buf->out]; buf->out = (buf->out + 1) % CAPACITY; return item; }
const int CAPACITY = 4; // buffer 的容量 const int N = 8; // 依据题意,须要转换 8 个字符 buffer_t buf1, buf2; pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; // 保证只有一个线程访问 buf1 pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER; // 保证只有一个线程访问 buf2 pthread_cond_t empty1 = PTHREAD_COND_INITIALIZER; pthread_cond_t empty2 = PTHREAD_COND_INITIALIZER; pthread_cond_t full1 = PTHREAD_COND_INITIALIZER; pthread_cond_t full2 = PTHREAD_COND_INITIALIZER;
几个条件变量的做用以下:
empty1
表示当 buf1
为空的时候,从 buf1
取数据的线程要在此条件变量上等待。full1
表示当 buf1
为满的时候,向 buf1
写数据的线程要在此条件变量上等待。其余同理。
代码思路解析:
buf1
操做,首先写一对 pthread_mutex_lock/unlock
,保证临界代码区内只有 producer
操做 buf1
;buf1
是满的,那么就将 producer
线程阻塞在条件变量 full1
上,释放互斥量 mutex1
(虽然不能写入,但要让别的线程可以读取 buf1
的数据);buf1
;buf1
一定不为空,所以唤醒一个在 empty1
上等待的线程,最后释放 mutex1
.void *producer(void *arg) { int i = 0; // can be while(true) for (; i < N; i++) { pthread_mutex_lock(&mutex1); while (buffer_is_full(&buf1)) pthread_cond_wait(&full1, &mutex1); buffer_put_item(&buf1, (char)('a' + i)); printf("Producer put [%c] in buffer1. \n", (char)('a' + i)); pthread_cond_signal(&empty1); pthread_mutex_unlock(&mutex1); } return NULL; }
思路与 producer
相似。
void *consumer(void *arg) { int i = 0; for (; i < N; i++) { pthread_mutex_lock(&mutex2); while (buffer_is_empty(&buf2)) pthread_cond_wait(&empty2, &mutex2); char item = buffer_get_item(&buf2); printf("\tConsumer get [%c] from buffer2. \n", item); pthread_cond_signal(&full2); pthread_mutex_unlock(&mutex2); } return NULL; }
这是 produer
和 consumer
的结合体。
void *calculator(void *arg) { int i = 0; char item; for (; i < N; i++) { pthread_mutex_lock(&mutex1); while (buffer_is_empty(&buf1)) pthread_cond_wait(&empty1, &mutex1); item = buffer_get_item(&buf1); pthread_cond_signal(&full1); pthread_mutex_unlock(&mutex1); pthread_mutex_lock(&mutex2); while (buffer_is_full(&buf2)) pthread_cond_wait(&full2, &mutex2); buffer_put_item(&buf2, item - 'a' + 'A'); pthread_cond_signal(&empty2); pthread_mutex_unlock(&mutex2); } return NULL; }
int main() { pthread_t calc, prod, cons; // init buffer buffer_init(&buf1), buffer_init(&buf2); // create threads pthread_create(&calc, NULL, calculator, NULL); pthread_create(&prod, NULL, producer, NULL); pthread_create(&cons, NULL, consumer, NULL); pthread_join(calc, NULL); pthread_join(prod, NULL); pthread_join(cons, NULL); // destroy mutex pthread_mutex_destroy(&mutex1), pthread_mutex_destroy(&mutex2); // destroy cond pthread_cond_destroy(&empty1), pthread_cond_destroy(&empty2); pthread_cond_destroy(&full1), pthread_cond_destroy(&full2); }
从上面的 Producer - Consumer 问题能够看出,mutex
仅仅能表达「线程可否得到访问临界资源的权限」这一层面的信息,而不能表达「临界资源是否足够」这个问题。
假设没有条件变量,producer
线程得到了 buf1
的访问权限( buf1
的空闲位置对于 producer
来讲是一种资源),但若是 buf1
是满的,producer
就无法对 buf1
操做。
对于 producer
来讲,它不能占用访问 buf1
的互斥锁,但却又什么都不作。所以,它只能释放互斥锁 mutex
,让别的线程可以访问 buf1
,并取走数据,等到 buf1
有空闲位置,producer
再对 buf1
写数据。用伪代码表述以下:
pthread_mutex_lock(&mutex1); if (buffer_is_full(&buf1)) { pthread_mutex_unlock(&mutex1); wait_until_not_full(&buf1); pthread_mutex_lock(&mutex); } buffer_put_item(&buf1, item); pthread_mutex_unlock(&mutex1);
而条件变量其实是对上述一系列操做的一种封装。
在上面代码中,使用 pthread_cond_wait
的时候,咱们是经过这样的方式的:
while (...) pthread_cond_wait(&cond, &mutex);
但这里为何不是 if
而是 while
呢?
参考文章:http://www.javashuo.com/article/p-qkxrvctp-nv.html
#include <pthread.h> struct msg { struct msg *m_next; /* value...*/ }; struct msg* workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; void process_msg() { struct msg* mp; for (;;) { pthread_mutex_lock(&qlock); while (workq == NULL) { pthread_cond_wait(&qread, &qlock); } mq = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); /* now process the message mp */ } } void enqueue_msg(struct msg* mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); /** 此时第三个线程在signal以前,执行了process_msg,恰好把mp元素拿走*/ pthread_cond_signal(&qready); /** 此时执行signal, 在pthread_cond_wait等待的线程被唤醒, 可是mp元素已经被另一个线程拿走,因此,workq仍是NULL ,所以须要继续等待*/ }
代码解析:
这里
process_msg
至关于消费者,enqueue_msg
至关于生产者,struct msg* workq
做为缓冲队列。在
process_msg
中使用while (workq == NULL)
循环判断条件,这里主要是由于在enqueue_msg
中unlock
以后才唤醒等待的线程,会出现上述注释出现的状况,形成workq==NULL
,所以须要继续等待。可是若是将
pthread_cond_signal
移到pthread_mutex_unlock()
以前执行,则会避免这种竞争,在unlock
以后,会首先唤醒pthread_cond_wait
的线程,进而workq != NULL
老是成立。所以建议使用
while
循环进行验证,以便可以容忍这种竞争。
pthread_cond_signal
在多核处理器上可能同时唤醒多个线程。
//thread 1 while(0<x<10) pthread_cond_wait(...); //thread 2 while(5<x<15) pthread_cond_wait(...);
若是某段时间内 x == 8
,那么两个线程相继进入等待。
而后第三个线程,进行了以下操做:
x = 12 pthread_cond_signal(...) // or call pthread_cond_broadcast()
那么可能线程 一、2 都被唤醒了(由于 signal
可能唤醒多个),可是,此时线程 1 仍然不知足 while
,须要再次判断,而后进入下一次等待。
其次,即便 signal
只唤醒一个,上面咱们提到,若是有多个线程都阻塞在同一个 cond
上,signal
会根据调度策略选取一个唤醒,那若是根据调度策略,唤醒的是线程 1 ,显然它还须要再一次判断是否须要继续等待(不然就违背了 pthead_cond_wait
的本意)。