请参看一个线程等待某种事件发生html
1,互斥锁的初始化,有如下2种方式。linux
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2,互斥锁的销毁函数
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
3,加锁和解锁测试
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
4,条件变量的2个函数线程
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_wait:rest
调用此函数时点的处理:code
1,给互斥锁解锁。htm
2,把调用此函数的线程投入睡眠,直到另外某个线程就本条件变量调用pthread_cond_signal。blog
被唤醒后的处理:返回前从新给互斥锁加锁。事件
pthread_cond_signal:唤醒调用pthread_cond_wait函数的线程
条件变量一般用于生产者和消费者模式。
什么是生成者和消费者模式?
版本1:全部生产者线程是并行执行的,消费者线程是等待全部的生产者线性执行结束后,消费者线程才开始执行。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; struct { pthread_mutex_t mutex; int buf[MAXITEM]; int idx; int val; }shared = { PTHREAD_MUTEX_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) +=1; } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ if(shared.buf[i] != i){ printf("buf[%d] = %d\n", i, shared.buf[i]); } } }
版本2:全部生产者线程和消费者线程都是并行执行的。这时会有个问题,就是消费者线程被先执行的状况下,生产者线程尚未生产数据,这时消费者线程就只能循环给互斥锁解锁又上锁。这成为轮转(spinning)或者轮询(polling),是一种多CPU时间的浪费。咱们也能够睡眠很短的一段时间,可是不知道睡多久。这时,条件变量就登场了。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; struct { pthread_mutex_t mutex; int buf[MAXITEM]; int idx; int val; }shared = { PTHREAD_MUTEX_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } shared.buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); *((int*)arg) +=1; } } void consume_wait(int i){ while(1){ pthread_mutex_lock(&shared.mutex); if(i < shared.idx){ pthread_mutex_unlock(&shared.mutex); return; } pthread_mutex_unlock(&shared.mutex); } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ consume_wait(i); if(shared.buf[i] != i){ printf("buf[%d] = %d\n", i, shared.buf[i]); } } return NULL; }
版本3:全部生产者线程和消费者线程都是并行执行的。解决版本2的轮询问题。使用条件变量。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #define MAXITEM 100000000 #define MAXTHREAD 100 #define min(x,y) ( x>y?y:x ) int nitem; int buf[MAXITEM]; struct { pthread_mutex_t mutex; int idx; int val; } shared = { PTHREAD_MUTEX_INITIALIZER }; struct { pthread_mutex_t mutex; pthread_cond_t cond; int nready; } nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }; void* produce(void*); void* consume(void*); int main(int argc, char** argv){ int i; int nthreads; int count[MAXTHREAD]; pthread_t tid_produce[MAXTHREAD], tid_consume; if(argc != 3){ printf("arg error\n"); return 1; } nitem = min(MAXITEM,atoi(argv[1])); nthreads = min(MAXTHREAD, atoi(argv[2])); for(i = 0; i < nthreads; ++i){ count[i] = 0; pthread_create(&tid_produce[i], NULL, produce, &count[i]); } pthread_create(&tid_consume, NULL, consume, NULL); for(i = 0; i < nthreads; ++i){ pthread_join(tid_produce[i], NULL); printf("cout[%d] = %d\n", i, count[i]); } pthread_join(tid_consume, NULL); return 0; } void* produce(void* arg){ while(1){ pthread_mutex_lock(&shared.mutex); if(shared.idx >= nitem){ pthread_mutex_unlock(&shared.mutex); return NULL; } buf[shared.idx] = shared.val; shared.idx++; shared.val++; pthread_mutex_unlock(&shared.mutex); pthread_mutex_lock(&nready.mutex); if(nready.nready == 0){ pthread_cond_signal(&nready.cond);//--------------② } nready.nready++; pthread_mutex_unlock(&nready.mutex);//--------------③ *((int*) arg) += 1; } } void* consume(void* arg){ int i; for(i = 0; i < nitem; ++i){ pthread_mutex_lock(&nready.mutex); while(nready.nready == 0){//--------------① pthread_cond_wait(&nready.cond, &nready.mutex); } nready.nready--; pthread_mutex_unlock(&nready.mutex); if(buf[i] != i){ printf("buf[%d] = %d\n", i, buf[i]); } } printf("buf[%d] = %d\n", nitem-1, buf[nitem-1]); }
关于互斥锁和条件变量的最佳实践:
1,把要多个线程共享的数据定义和互斥锁定义在一个结构体里。
2,把条件变量,互斥锁,和临界条件定义在一个结构体里。
3,在①的地方,最后不要用if,理由是,pthread_cond_wait返回后,有可能另外一个消费者线程把它消费掉了,因此要再次测试相应的条件成立与否,防止发生虚假的(spurious)唤醒。各类线程都应该试图最大限度减小这些虚假唤醒,可是仍有可能发生。
4,注意②处的代码pthread_cond_signal,设想一下最坏的状况,调用该函数后,另一个等待的线程当即被唤醒,因此被唤醒的pthread_cond_wait函数要当即加锁,可是调用pthread_cond_signal函数的线程尚未执行到③处的pthread_mutex_unlock,因此被唤醒的线程又当即终止了。因此为了不这种状况发生,把②处的代码pthread_cond_signal放在③处的下一行。
参考下面的伪代码:
int flag; pthread_mutex_lock(&nready.mutex); int = nready.nready == 0); nready.nready++; pthread_mutex_unlock(&nready.mutex); if(flag){ pthread_cond_signal(&nready.cond); }