除了提供互斥以外,信号量的另一个做用是调度对共享资源的访问。在这种情景中,一个线程用信号量来通知另外一个线程,程序状态中的某个条件已经为真了。 函数
下图给出了生产者-------消费者问题。生产者和消费者线程共享一个有n个槽的有限缓冲区。 post
生产者线程反复的生成新的项目(item),并把它们插入到缓冲区中。消费者线程不断的从缓冲区中取出这些项目,而后消费( 使用它们)。
spa
由于插入和取出项目都涉及更新共享变量,因此咱们必须保证对缓冲区的访问时互斥的。 线程
可是只保证互斥访问仍是不够的,咱们还须要调度 对缓冲区的访问 (若是缓冲区是满的即没有可用的槽,那么生产者必须等待直到有一个空的槽变为可用为止。与之类似,若缓冲区是空的即没有可取用的槽, 那么消费者必须等待直到有一个项目变为可用。) 索引
接下来咱们为生产者----消费者 定义一个结构体: 来存储数据 ci
Typedef struct {资源
Int * buf; // item存放在一个动态分配的n项整数buf中 rem
Int n; //同步
Int front; // 索引值记录第一项 和最后一项 string
Int rear; //
Sem_t mutex; // 三个信号量同步对缓冲区的访问。提供互斥的缓冲区的访问。
Sem_t slots; // 分别记录空槽和可用item的数量
Sem_t items; //
}sbuf_t;
/*
Buffer array
Maximum number of slots
Buf[(front+1)%n] is first item
Buf[rear%n] is last item
Protects accessed to buf
Counts available slots
Counts available items
*/
此结构体包含使用的有限缓冲区
咱们使用一个函数sbuf_init来初始化此缓冲区,并设置front 和rear 表示一个空的缓冲区,并为三个信号量赋予初始值。使用sbuf_deinit函数来删除缓冲区(当程序使用完以后)
Sbuf_insert函数等待一个可用的槽,对互斥锁加锁,添加项目item。对互斥锁解锁,而后宣布有一个新的item可用。
Sbuf_remove函数式与上一个函数对应的。在等待一个可用的缓冲区以后,对互斥锁加锁,从缓冲区的前面取出该项目,对互斥锁解锁。而后发信号通知一个新的槽可供使用。
void sbuf_init(sbuf_t *sp,int n)
{
If((Sp->buf=calloc(n,sizeof(int))==NULL)
Printf(“calloc 错误\n”);
Sp->n=n; //Buffer holds max of n items
Sp->front=sp->rear=0; //Empty buffer iff front==rear
Sem_init(&sp->mutex,0,1);//binary semaphore for locking
Sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots
Sem_init(&sp->items,0,0); //initially, buf has zero data items
}
Void sbuf_deinit(sbuf_t *sp)
{
Free(sp->buf);
}
Void sbuf_insert(sbuf_t *sp,int item)
{
Sem_wait(&sp->slots);//wait for available slot
Sem_wait(&sp->mutex);//lock the buffer
Sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item
Sem_post(&sp->mutex);//unlock the buffer
Sem_post(&sp->items);//announce available item
}
Int sbuf_remove(sbuf_t *sp)
{
Int item;
Sem_wait(&sp->items);//wait for available item
Sem_wait(&sp->mutex);//lock the buffer
Item=sp->buf[(++sp->front)%(sp->n)];// remove the item
Sem_post(&sp->mutex);
Sem_post(&sp->slots);
Return item;
}
#include <stdio.h>
#include <stdlib.h> #include <string.h> #include <pthread.h> #include <unistd.h> #include <semaphore.h> typedef struct { int * buf; // item存放在一个动态分配的n项整数buf中 int n; // int front; // 索引值记录第一项 和最后一项 int rear; // sem_t mutex; // 三个信号量同步对缓冲区的访问。提供互斥的缓冲区的访问。 sem_t slots; // 分别记录空槽和可用item的数量 sem_t items; // }sbuf_t; sbuf_t g_sp; void sbuf_init(sbuf_t *sp,int n) { sp->buf=(int*)calloc(n,sizeof(int)); if( NULL == sp->buf) printf("calloc 错误\n"); sp->n=n; //Buffer holds max of n items sp->front=sp->rear=0; //Empty buffer iff front==rear sem_init(&sp->mutex,0,1);//binary semaphore for locking sem_init(&sp->slots,0,n); //Initially ,buf has n empty slots sem_init(&sp->items,0,0); //initially, buf has zero data items } void sbuf_deinit(sbuf_t *sp) { free(sp->buf); } void sbuf_insert(sbuf_t *sp,int item) { sem_wait(&sp->slots);//wait for available slot sem_wait(&sp->mutex);//lock the buffer sp->buf[(++sp->rear)%(sp->n)]=item;//insert the item sem_post(&sp->mutex);//unlock the buffer sem_post(&sp->items);//announce available item } int sbuf_remove(sbuf_t *sp) { int item; sem_wait(&sp->items);//wait for available item sem_wait(&sp->mutex);//lock the buffer item=sp->buf[(++sp->front)%(sp->n)];// remove the item sem_post(&sp->mutex); sem_post(&sp->slots); return item; } void* producer(void* arg) { int i,item,index; //index 是线程编号 index=*(int*)arg; for(int i=0;i<g_sp.n;i++) { item =(i*2+1); sbuf_insert(&g_sp,item); printf("[P%d] Producing %d ...\n",index,item); fflush(stdout); sleep(1); } } void* consumer(void* arg) { int i,item,index; index=*(int*)arg; for(int i=0;i<g_sp.n;i++) { item =sbuf_remove(&g_sp); printf("------>[C%d] Consuming %d ...\n",index,item); sleep(1); } } int main(int argc,char* argv[]) { sbuf_init(&g_sp,10); int NP=3,NC=3; pthread_t idp[NP],idc[NC]; //1 for(int i=0;i<NP;i++) { pthread_create(&idp[i],NULL,producer,(void*)&i); } //2 for(int i=0;i<NC;i++) { pthread_create(&idc[i],NULL,consumer,(void*)&i); } //3 for(int i=0;i<NP;i++) pthread_join(idp[i],NULL); for(int i=0;i<NC;i++) pthread_join(idc[i],NULL); sbuf_deinit(&g_sp); return 0; }