1、解决问题和适用范围html
主要是用来等待一个条件,这个条件可能须要另外一个线程来知足这个条件。这个和咱们日常适用的pthread_mutex_lock的最大不一样在于后者保护的通常是一个代码段(也就是关键区),或者一个变量,可是因为通常来讲这个变量的访问是在一个关键区中,因此能够认为是一个关键区。网络
可是对于条件变量,是须要的是一个事件,只有事件知足的时候才会执行后面的操做,此时就出现一个问题:若是不知足咱们应该怎么办?若是若是使用简单信号量,可能另外一方触发了这个条件,而后经过unlock来唤醒一个线程,可是此时通过屡次唤醒其实这边根本没有等待,那么信号就可能丢失。若是这边一直的空等,那么对于CPU的利用率又很是大,因此就引起了条件等待的概念。数据结构
2、网络上的例子代码多线程
一下代码来自http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html
void producer(buffer_t *b, char item) { pthread_mutex_lock(&b->mutex); while (b->occupied >= BSIZE) pthread_cond_wait(&b->less, &b->mutex); assert(b->occupied < BSIZE); b->buf[b->nextin++] = item; b->nextin %= BSIZE; b->occupied++; /* now: either b->occupied < BSIZE and b->nextin is the index of the next empty slot in the buffer, or b->occupied == BSIZE and b->nextin is the index of the next (occupied) slot that will be emptied by a consumer (such as b->nextin == b->nextout) */ pthread_cond_signal(&b->more); pthread_mutex_unlock(&b->mutex); }
char consumer(buffer_t *b) { char item; pthread_mutex_lock(&b->mutex); while(b->occupied <= 0) pthread_cond_wait(&b->more, &b->mutex); assert(b->occupied > 0); item = b->buf[b->nextout++]; b->nextout %= BSIZE; b->occupied--; /* now: either b->occupied > 0 and b->nextout is the index of the next occupied slot in the buffer, or b->occupied == 0 and b->nextout is the index of the next (empty) slot that will be filled by a producer (such as b->nextout == b->nextin) */ pthread_cond_signal(&b->less); pthread_mutex_unlock(&b->mutex); return(item); }
3、Glibc的实现
一、数据结构
/* Data structure for conditional variable handling. The structure of
the attribute type is not exposed on purpose. */
typedef union
{
struct
{
int __lock;保护多线程中cond结构自己的变量操做不会并发,例如对于total_seq进而wakup_seq的使用和递增操做。
unsigned int __futex;另外一个线程和这个线程之间在条件点上同步的方式,也就是若是须要和其它线程同步的话,使用这个互斥锁替换pthread_cond_wait传入的互斥锁进行同步。
__extension__ unsigned long long int __total_seq;这个表示在这个条件变量上有多少个线程在等待这个信号。
__extension__ unsigned long long int __wakeup_seq;已经在这个条件变量上执行了多少次唤醒操做。
__extension__ unsigned long long int __woken_seq;这个条件变量中已经被真正唤醒的线程数目。
void *__mutex;保存pthread_cond_wait传入的互斥锁,须要保证pthread_cond_wait和pthread_cond_signal传入的值都是相同值。
unsigned int __nwaiters;表示这个cond结构如今还有多少个线程在使用,当有人在使用的时候,pthread_cond_destroy须要等待全部的操做完成
unsigned int __broadcast_seq; 广播动做发生了多少次,也就是执行了多少次broadcast
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;
二、lll_futex_wait的意义
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
对于第一个wait,须要传入一个咱们用户态判断时使用的futex值,也就是这里的第二个参数futex_val,这样内核会判断进行真正的wait挂起的时候这个地址的是否是仍是这个值,若是不是这个wait失败。可是进行wakup的时候不须要传入判断值,多是假设此时已经得到互斥锁,因此不会有其它线程来竞争了吧。
这个要和pthread_mutex_lock使用的0、一、2三值区分开来,由于这些都是C库规定的语义,内核对他们没有任何特殊要求和语义判断,因此用户态能够随意的改变这个变量的值。
三、pthread_cond_wait的操做
int
__pthread_cond_wait (cond, mutex)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int err;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are along. */
lll_lock (cond->__data.__lock, pshared);即将对cond结构的成员进行操做和判断,因此首先得到结构自己保护互斥锁。
/* Now we can release the mutex. */
err = __pthread_mutex_unlock_usercnt (mutex, 0);释放用户传入的互斥锁,此时另一个执行pthread_cond_signal的线程能够经过pthread_mutex_lock执行可能的signal判断,可是咱们尚未释放数据操做互斥锁,因此另外一方执行pthread_cond_signal的时候依然可能会等待。
if (__builtin_expect (err, 0))
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}
/* We have one new user of the condvar. */
++cond->__data.__total_seq;增长系统中全部须要执行的唤醒次数。
++cond->__data.__futex;增长futex,主要是为了保证用户态数据一致性。
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增长cond结构的使用次数。
/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;
/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;
/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);注册撤销点。
/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;
do
{
unsigned int futex_val = cond->__data.__futex;
/* Prepare to wait. Release the condvar futex. */
lll_unlock (cond->__data.__lock, pshared);此处真正释放cond操做互斥锁,咱们已经再也不对其中的变量进行操做。
/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();
/* Wait until woken by signal or broadcast. */
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);等待在futex变量上,因为咱们刚才保存了futex的原始值,因此若是在上面咱们释放了data.lock以后另外一个线程修改了这个变量的值,那么这里的lll_futex_wait将会返回失败,因此会继续进行下一轮的while循环,直到连个执行相同,说明咱们作的判断时正确的。
/* Disable asynchronous cancellation. */若是执行到这里,说明咱们已经被signal唤醒。
__pthread_disable_asynccancel (cbuffer.oldtype);
/* We are going to look at shared data again, so get the lock. */
lll_lock (cond->__data.__lock, pshared);访问变量,须要得到互斥锁。
/* If a broadcast happened, we are done. */
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;
/* Check whether we are eligible for wakeup. */
val = cond->__data.__wakeup_seq;
}
while (val == seq || cond->__data.__woken_seq == val); 当val!=seq&&cond->data.wokenup!=val的时候能够进行唤醒,也就是另外一个放修改了已经执行了唤醒的次数而且已经被唤醒的线程还有名额的时候。
/* Another thread woken up. */
++cond->__data.__woken_seq;增长系统中已经被唤醒的线程的数目。
bc_out: broadcast跳转到这里。
cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
/* If pthread_cond_destroy was called on this varaible already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
/* We are done with the condvar. */
lll_unlock (cond->__data.__lock, pshared);
/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);
/* Get the mutex before returning. */
return __pthread_mutex_cond_lock (mutex);再次得到mutex互斥锁,可能会睡眠,由于咱们的这个释放是对上层透明的,而在进入函数的时候咱们已经释放了这个互斥锁,因此此时还要进行一次得到操做,从而配对。
}
四、pthread_cond_signal的操做
int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)若是待唤醒次数比已经唤醒的次数多,那么此时就进行一个唤醒操做。
{
/* Yes. Mark one of them as woken. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;改变futex的值,这个值的具体意义并不重要,只是为了告诉另外一方,这个值已经变化,若是另外一方使用的是原始值,那么对futex的wait操做将会失败。
/* Wake one. */
if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
1, &cond->__data.__lock,
pshared), 0))
return 0;
lll_futex_wake (&cond->__data.__futex, 1, pshared);
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;
}
五、__pthread_cond_broadcast
int
__pthread_cond_broadcast (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判断是否有等待唤醒的线程。
{
/* Yes. Mark them all as woken. */
cond->__data.__wakeup_seq = cond->__data.__total_seq;
cond->__data.__woken_seq = cond->__data.__total_seq;
cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
int futex_val = cond->__data.__futex;
/* Signal that a broadcast happened. */
++cond->__data.__broadcast_seq;
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
/* Do not use requeue for pshared condvars. */
if (cond->__data.__mutex == (void *) ~0l)
goto wake_all;
/* Wake everybody. */
pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
/* XXX: Kernel so far doesn't support requeue to PI futex. */
/* XXX: Kernel so far can only requeue to the same type of futex,
in this case private (we don't requeue for pshared condvars). */
if (__builtin_expect (mut->__data.__kind
& (PTHREAD_MUTEX_PRIO_INHERIT_NP
| PTHREAD_MUTEX_PSHARED_BIT), 0))
goto wake_all;
/* lll_futex_requeue returns 0 for success and non-zero
for errors. */
if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
INT_MAX, &mut->__data.__lock,
futex_val, LLL_PRIVATE), 0))把futex上的转移到data.lock中并唤醒,若是失败则直接唤醒而不转移。
{
/* The requeue functionality is not available. */
wake_all:
lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);这里的INT_MAX就是告诉内核唤醒全部在这个变量上等待的线程。
}
/* That's all. */
return 0;
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;}