3、对比上锁与等待数组
在上述的生产者-消费者问题中,咱们在实现同步的时候还能够用如下的方法来实现,这里只是说明与上述实现中的不一样之处。并发
一、首先,说明一下此版本中的相关特征:函数
在此版本中,消费者线程在生产者线程建立完毕后立刻就建立,而不是等待全部生产者线程完成而终止后再建立。测试
二、再则,如何来实现:spa
A、Set_concurrency(..)中参量并发线程从nthreads变为nthreads+1;线程
B、同时,为了实现同步,咱们必须设置一个consume_wait(int i)函数,其实现的功能是检测到对应i的pthread_mutex_unlock后,便启动consume对应的i判断部分。这里将这两个函数源代码实现呈现以下:设计
void consume_wait(int i){ for(;;){ pthread_mutex_lock(&shared.mutex); if(i < nput){ pthread_mutex_unlock(&shared.mutex); return ; } pthread_mutex_unlock(&shared.mutex); } } void *consume(void *arg){ int i; for(i=0;i<nitems;i++){ consume_wait(i); if(shared.buff[i] != i) printf("buff[%d] = %d\n",i,shared.buff[i]); } return NULL; }
这里,特别说明一些关于consume_wait(int i)的实现及具体运行方式:code
首先,当临界区出于未上锁时,consume_wait便调用pthread_mutex_lock(..)来上锁互斥锁,得到对临界区的全部权,这时,判断i<nput条件是否成立,以判断生产者线程是否产生了第i个条目。若是检测到以后,即可返回以通告consume能够处理第i个条目,同时解锁互斥锁。ip
而后,若第i个条目未产生,这时,函数便一直循环,每次给互斥锁上锁又解锁,这样称为“轮询(polling)”,这对CPU是一种浪费,但着实能够实现同步问题。同步
4、条件变量(Condition):等待与信号发送
互斥锁用于上锁,条件变量用于等待。这两种不一样类型的同步都是须要的。
条件变量是类型为pthread_cond_t的变量,如下两个函数使用了这些变量:
#include<pthread.h> int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr); int pthread_cond_signal(pthread_cond_t *cptr); 均返回:若成功则为0,若出错则为正的Exxx值
其中第二个函数的名字中的“signal”一词指的不是Unix_SIGxxx信号。
这两个函数所等待或由之得以通知的“条件”,其定义由咱们选择:咱们在代码中测试这种条件。
每一个条件变量老是有一个互斥锁与之关联。咱们调用pthread_cond_wait等待某个条件为真时,还会指定其条件变量的地址和所关联的互斥锁的地址。
咱们利用条件变量和互斥锁从新设计生产者-消费者同步问题,其改变以下:
全局变量声明:
#include "unpipc.h" #define MAXNITEMS 1000000 #define MAXNTHREADS 100 int nitems; int buff[MAXITEMS]; struct { pthread_mutex_t mutex; int nput; int nval; }put ={PTHREAD_MUTEX_INITIALIZER}; struct { pthread_mutex_t mutex; pthread_cond_t cond; int nready; }nready={PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER};
把生产者变量和互斥锁收集到一个结构中;
把计数器、条件变量和互斥锁收集到一个结构中,nready指的是对消费者已准备好的线程数;
main函数同上锁与等待时的main函数;
Produce和consume函数:
void *produce(void *arg){ for(;;){ pthread_mutex__lock(&put.mutex); if(put.nput >= nitems){ pthread_mutex_unlock(&put.mutex); return NULL; } buff[put.nput]=put.nval; put.nput++; put.nval++; pthread_mutex_unlock(&put.mutex); pthread_mutex_lock(&nready.mutex); if(nready.nready == 0){ pthread_cond_signal(&nready.cond); } nready.nready++; pthread_mutex_unlock(&nready.mutex); } } void *consume(void *arg){ int i; for(i=0;i<nitems;i++){ pthread_mutex_lock(&nready.mutex); while(nready.nready == 0) pthread_cond_wait(&nready.cond,&nready.mutex); nready.nready--; pthread_mutex_unlock(&nready.mutex); if(buff[i]!=i){ printf("buff[%d]=%d\n",i,buff[i]); } } return NULL; }
这里的分析很重要:
Produce函数中的for语句部分前半部分实现的是:当生产者往数组buff中放置一个新条目时,咱们改用put.mutex来为临界区上锁。
For语句后半部分实现的是通知消费者,给用来统计由消费者处理的条目数的计数器nready.nready加1。在加1以前,若是该计数器的值为0,就调用pthread_cond_signal唤醒可能正等待其值变为非零的任意线程(如消费者)。如今能够看出与该计数器关联的互斥锁和条件变量的相互做用。该计数器在生产者和消费者之间共享,所以,只有锁住与之关联的互斥锁(nready.mutex)时才能访问它。与之关联的条件变量则用于等待和发送信号。
对于消费者,消费者只是等待计数器nready.nready变为非零。既然该计数器是在全部的生产者和消费者之间共享的,那么只有锁住与之关联的互斥锁(nready.mutex)时才能测试它的值。若是在锁住该互斥锁期间该计数器的值为0,咱们就调用pthread_cond_wait进入睡眠。该函数原子地执行如下两个动做:
A、给互斥锁nready.mutex解锁;
B、把调用线程投入睡眠,直到另外某个线程就本条件变量调用pthread_cond_signal。同时,pthread_cond_wait在返回前从新给互斥锁nready.mutex上锁。
5、条件变量:定时等待和广播
一般pthread_cond_signal只唤醒等待在相应条件变量上的一个线程。在某些状况下,一个线程认定有多个其余线程应被唤醒,这时它能够调用pthread_cond_broadcast唤醒阻塞在相应条件变量上的全部线程。
#include<pthread.h> int pthread_cond_broadcast(pthread_cond_t *cptr); int pthread_cond_timewait(pthread_cond_t *cptr,pthread_mutex_t *mptr,const struct timespc *abstime);
对于pthread_cond_timewait(...),其容许线程就阻塞时间设置一个限制值。Abstime参数是一个timespec结构体,该结构体指定这个函数必须返回的时间,即使当时相应的条件变量还没收到信号,若是发生这种超时状况,该函数返回ETIMEDOUT错误。该时间值是绝对时间。而不是时间差。
6、互斥锁和条件变量的属性
在前面的互斥锁和条件变量的讲解中,咱们用两个常量PTHREAD_MUTEX_INITIALIZER和PTHREAD_COND_INITIALIZER来初始化它们。有这种方式初始化的互斥锁和条件变量具有默认属性,不过咱们还能以非默认属性来初始化它们。
#include<pthread.h> int pthread_mutex_init(pthread_mutex_t *mptr,const pthread_mutex_mutexattr_t *attr); int pthread_mutex_destory(pthread_mutex_t *mptr); int pthread_cond_init(pthread_cond_t *cptr,const pthread_cond_condattr_t *attr); int pthread_cond_destory(pthread_cond_t *cptr);