linux 内核的等待队列和进程调度息息相关,进程在某些状况下必须等待某些事件的发生,例如:等待一个磁盘操做的终止,等待释放系统资源,或等待指定的时间间隔。linux
等待队列实现了在事件上的条件等待:但愿等待特定事件的进程把本身放进合适的等待队列,并放弃控制权。程序员
所以,等待队列表示一组睡眠的进程,当某一条件知足时,由内核唤醒它们。express
基于上述对等待队列的基本描述,很直观地会产生如下疑问,咱们带着问题来分析:数据结构
注:本文基于 linux-4.9 的版本进行分析。函数
顾名思义,等待队列是一个特殊的队列,代码中使用了两个数据结构来描述一个等待队列:wait_queue_head_t 和 wait_queue_t。字体
这两个数据结构定义在 include/linux/wait.h 头文件中。ui
struct __wait_queue_head { spinlock_t lock; struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t; struct __wait_queue { unsigned int flags; void *private; wait_queue_func_t func; struct list_head task_list; }; typedef struct __wait_queue wait_queue_t;
等待队列是一个双向队列,wait_queue_head_t 表明该队列的头部,wait_queue_t 表明队列中有效的成员,其 private 指针指向了关联进程的 task_struct 结构体。this
一个等待队列只有一个 wait_queue_head_t,由于等待队列多是空的,不包含 wait_queue_t 成员,因此使用一个单独的头部来保持该队列。atom
wait_queue_head_t 的结构很简单,只有一个 spinlock 和 一个 list_head 成员来构成队列,其做用只是维持等待队列的头部。lua
wait_queue_t 是等待队列的有效成员,除去 list_head 外,它包含 3 个属性:
至此,咱们明确了等待队列的基本数据结构,看起来很是简单明了。
接下来的疑问是等待队列如何与进程关联起来,或者说进程如何使用等待队列?
首先须要分配一个 wait_queue_head_t 结构,并将其初始化,完成这个操做有两种方法:静态建立和动态建立
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \ .task_list = { &(name).task_list, &(name).task_list } } #define DECLARE_WAIT_QUEUE_HEAD(name) \ wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
经过引用 DECLARE_WAIT_QUEUE_HEAD(name) 建立一个名为 name 的 wait_queue_head_t,其存储空间分配在数据段
另一种建立方式是使用 wait_queue_head_t 初始化函数 init_waitqueue_head,该函数定义在 include/linux/wait.h 头文件中。
#define init_waitqueue_head(q) \ do { \ static struct lock_class_key __key; \ \ __init_waitqueue_head((q), #q, &__key); \ } while (0) void __init_waitqueue_head(wait_queue_head_t *q, const char *name, struct lock_class_key *key) { spin_lock_init(&q->lock); lockdep_set_class_and_name(&q->lock, key, name); INIT_LIST_HEAD(&q->task_list); }
init_waitqueue_head 函数只是初始化 wait_queue_head_t 的数据成员,其存储空间事先已分配,可由程序员灵活处理:
能够静态分配在 data 段,也能够动态地在堆上分配空间。
到这里只是建立了一个空队列,这个队列尚未实际的做用。
进程使用等待队列,须要关联一个 wait_queue_t 数据结构
#define __WAITQUEUE_INITIALIZER(name, tsk) { \
.private = tsk, \
.func = default_wake_function, \
.task_list = { NULL, NULL } }
#define DECLARE_WAITQUEUE(name, tsk) \
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
可使用 DECLARE_WAITQUEUE(name, tsk) 宏来建立一个等待队列成员,这个宏展开后的结果为:
即声明一个名字为 name 的 wait_queue_t 结构,注意该 wait_queue_t 的生命周期和该宏引用的位置有关,若是在函数内使用,那么 wait_queue_t 的生命周期限定在该函数内。
添加等待队列成员:
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new) {
list_add (&new->task_list, &head->task_list); } void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; wait->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(add_wait_queue);
static inline void __add_wait_queue_tail(wait_queue_head_t *head, wait_queue_t *new) { list_add_tail(&new->task_list, &head->task_list); } void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; wait->flags |= WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue_tail(q, wait); spin_unlock_irqrestore(&q->lock, flags); }
删除等待队列成员:
static inline void __remove_wait_queue(wait_queue_head_t *head, wait_queue_t *old) {
list_del(&old->task_list); } void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __remove_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(remove_wait_queue);
添加/删除等待队列成员的操做只是简单的链表操做,将表明进程的 wait_queue_t 结构插入队列或从队列中删除。
注意:互斥的(exclusive)等待进程是插入到等待队列的尾部。
进程是什么时候进入休眠状态?又是如何从等待队列被唤醒的呢?
接下来咱们看一下等待队列的 wakeup 函数是如何实现的。
从等待队列的建立宏 DECLARE_WAITQUEUE 中能够看到,wait_queue_t 中有一个指向 task_struct 的 private 指针能够将 wait_queue_t 和一个进程 tast_struct 关联起来。
同时还将 wait_queue_func_t 函数成员绑定到 default_wake_function 函数。
include/linux/wait.h 和 kernel/sched/wait.c 中提供了 wake_up 函数,该函数能够唤醒等待队列中的进程。
经过代码来看一下,这个wake_up 函数具体作了什么工做,应该如何调用 wake_up 函数。
wait.h 提供了一系列 __wake_up 函数的封装形式,其具体实现都基于 wait.c 中的 __wake_up() 函数:
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) #define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) #define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL, 1) #define wake_up_all_locked(x) __wake_up_locked((x), TASK_NORMAL, 0) #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1)
从这一系列接口形式能够看出,其核心都是 __wake_up 函数,这些封装应用于不一样场景,针对不一样类型的进程。
/* * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve * number) then we wake all the non-exclusive tasks and one exclusive task. * * There are circumstances in which we can try to wake a task which has already * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns * zero in this (rare) case, and we handle it by continuing to scan the queue. */ static void__wake_up_common
(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next;list_for_each_entry_safe
(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
/* 注意这里的三个判断条件,其直接决定了 wakeup 函数的操做结果 */
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; } } /** * __wake_up - wake up threads blocked on a waitqueue. * @q: the waitqueue * @mode: which threads * @nr_exclusive: how many wake-one or wake-many threads to wake up * @key: is directly passed to the wakeup function * * It may be assumed that this function implies a write memory barrier before * changing the task state if and only if any tasks are woken up. */ void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0, key); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(__wake_up);
从 __wake_up 的代码能够看出,其核心操做就是在 __wake_up_common 中遍历等待队列,而后调用其成员的 func 函数。
咱们再回头看一下 func 函数,在使用DECLARE_WAITQUEUE(name, tsk) 宏来建立等待队列成员的时候,func 函数绑定为 default_wake_function。
注意:若是不使用 DECLARE_WAITQUEUE(name, tsk) 宏建立等待队列成员,那么能够自定义 wait_queue_t 的 func 函数。
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { return try_to_wake_up(curr->private, mode, wake_flags); }
EXPORT_SYMBOL(default_wake_function);
default_wake_function 和其调用的 try_to_wake_up 函数都定义在 kernel/sched/core.c,核心函数是 try_to_wake_up,本文不深究函数细节,只该函数的原型和注释
/** * try_to_wake_up - wake up a thread * @p: the thread to be awakened * @state: the mask of task states that can be woken * @wake_flags: wake modifier flags (WF_*) * * Put it on the run-queue if it's not already there. The "current" * thread is always on the run-queue (except when the actual * re-schedule is in progress), and as such you're allowed to do * the simpler "current->state = TASK_RUNNING" to mark yourself * runnable without the overhead of this. * * Return: %true if @p was woken up, %false if it was already running. * or @state didn't match @p's state. */ static int try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags);
该函数的功能就是把调用参数传入的进程描述符所表明的进程状态设置为 TASK_RUNNING 并放到 run-queue 中,后续由调度程序来调度运行。
这里须要重点关注 __wake_up_common 中遍历等待队列的三个 break 条件:
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break;
注意 C 语言多个判断条件的执行过程,此例中当前一个条件为 false 时会直接 break,不会继续执行后续条件表达式;
等待队列中,EXCLUSIVE 类型的进程插入在队列的尾部,所以 __wake_up_common 函数的语义有如下几个要点:
__wake_up 函数有 4 个参数:
1. wait_queue_head_t *q:这个参数很直观,即等待队列的头部,经过它能够遍历到队列中的全部节点
2. unsigned int mode:该参数的注释是 “which threads”,是一个 unsigned int 类型,他表明什么意思呢?
咱们看一下引用 __wake_up 时传入的参数和 __wake_up 对该参数的使用方式
wait.h 中的 wake_up 系列函数传入的 mode 参数为 TASK_NORMAL 和 TASK_INTERRUPTIBLE,TASK_NORMAL 的定义以下:
#define TASK_NORMAL (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)
这是表明进程状态的 flag 定义,它的传递路径:
__wake_up --> __wake_up_common –> default_wake_function –> try_to_wake_up
最终起做用在 try_to_wake_up 的第二个参数:
@state: the mask of task states that can be woken
总结一下,__wake_up 的第二个参数,表示本次调用将唤醒处于 TASK_NORMAL 状态的进程仍是只唤醒 TASK_INTERRUPTIBLE 的进程。
3. int nr_exclusive:该参数注释“how many wake-one or wake-many threads to wake up”,是一个 int 类型
该参数表示这次 __wake_up 调用将唤醒多少个互斥的等待进程,它的传递路径:
__wake_up --> __wake_up_common
4. void *key:该参数将传递给 func 的第 4 个参数,default_wake_function 并无使用该参数,暂不深刻分析。若是使用用户自定义的 func 函数的话,key 参数将有其余做用。
从上述分析过程当中,能够得出一个基本的思路:
等待队列是一个维护了一系列进程的双向队列,等待队列中的进程分为互斥(带 WQ_FLAG_EXCLUSIVE 标识)和非互斥(不带 WQ_FLAG_EXCLUSIVE 标识)的,
kernel 中提供了一系列函数将进程插入等待队列或从等待队列中删除,同时提供了 wakeup 函数来唤醒等待队列中的进程。
那么所谓“等待队列”的“等待”二字体如今哪里?应当如何使用等待队列呢?
以 kernel mmc driver 中的 mmc_claim_host 和 mmc_release_host 为例来看一下等待队列的具体使用。
kernel mmc driver 中对 host 的某些操做必须是互斥的,由于 host 硬件的某些操做过程必须保持必定的完整性,不能被多个进程并行访问。
所以在执行这类操做前,driver 调用 mmc_claim_host 声明占用 host,操做完成后使用 mmc_release_host 释放 host 资源。
咱们直接在下面的代码中添加注释来讲明等待队列在其中发挥的做用。
/** * __mmc_claim_host - exclusively claim a host * @host: mmc host to claim * @abort: whether or not the operation should be aborted * * Claim a host for a set of operations. If @abort is non null and * dereference a non-zero value then this will return prematurely with * that non-zero value without acquiring the lock. Returns zero * with the lock held otherwise. */ int __mmc_claim_host(struct mmc_host *host, atomic_t *abort) { /* * 声明一个名为 wait 的 wait_queue_t 结构体,绑定到 current 进程 * 注意 wait 的生命周期位于该函数内,其存储空间分配在该函数栈上 */ DECLARE_WAITQUEUE(wait, current); unsigned long flags; int stop; bool pm = false; might_sleep(); /* * 将 wait 加入到 host->wq 这个等待队列中 * host->wq 是 host 的一个成员变量,driver 加载时已经初始化 */ add_wait_queue(&host->wq, &wait); spin_lock_irqsave(&host->lock, flags); while (1) { /* 设置当前进程的状态,再也不处于 RUNNING 状态,不会被再次调度执行 */ set_current_state(TASK_UNINTERRUPTIBLE); stop = abort ? atomic_read(abort) : 0; /* 这里体现了等待条件,当如下任一条件知足时,跳出 while(1) 循环*/ if (stop || !host->claimed || host->claimer == current) break; spin_unlock_irqrestore(&host->lock, flags); /* 若是上述等待条件不知足,让出 CPU 资源,进入等待状态 */ schedule(); /* * 当 host->wq 被 wakeup 函数唤醒时,该进程可能被再次被调度执行 * 将再次从 while(1) 进入检查上述等待条件,看是否可以得到 host 使用权 */ spin_lock_irqsave(&host->lock, flags); } /* 运行到此处,说明 while(1) 的 break 条件知足,将进程状态设置为 TASK_RUNNING */ set_current_state(TASK_RUNNING); if (!stop) { host->claimed = 1; host->claimer = current; host->claim_cnt += 1; if (host->claim_cnt == 1) pm = true; } else wake_up(&host->wq); spin_unlock_irqrestore(&host->lock, flags); /* 将 wait 从 host->wq 中移除 */ remove_wait_queue(&host->wq, &wait); if (pm) pm_runtime_get_sync(mmc_dev(host)); return stop; } /* 对 __mmc_claim_host 的简单封装,无需特别关注 */ static inline void mmc_claim_host(struct mmc_host *host) { __mmc_claim_host(host, NULL); } /** * mmc_release_host - release a host * @host: mmc host to release * * Release a MMC host, allowing others to claim the host * for their operations. */ void mmc_release_host(struct mmc_host *host) { /* 当 driver 完成 host 的互斥操做后,调用该函数释放 host 资源 */ unsigned long flags; WARN_ON(!host->claimed); spin_lock_irqsave(&host->lock, flags); if (--host->claim_cnt) { /* Release for nested claim */ spin_unlock_irqrestore(&host->lock, flags); } else { host->claimed = 0; host->claimer = NULL; spin_unlock_irqrestore(&host->lock, flags); /* 调用 wakeup 唤醒 host->wq 等待队列中的其余等待进程运行 */ wake_up(&host->wq); pm_runtime_mark_last_busy(mmc_dev(host)); pm_runtime_put_autosuspend(mmc_dev(host)); } }
include/linux/wait.h 中提供了一系列使用等待队列的便捷方法,例如:
这些方法都是宏定义,其功能相似可是有不一样的语义,适用不一样的使用场景。
咱们以 wait_event 为例来看一下其具体实现,其代码以下(注意注释中高亮部分对其语义的描述):
/** * wait_event - sleep until a condition gets true * @wq: the waitqueue to wait on * @condition: a C expression for the event to wait for * * The process is put to sleep (TASK_UNINTERRUPTIBLE) until the * @condition evaluates to true. The @condition is checked each time * the waitqueue @wq is woken up. * * wake_up() has to be called after changing any variable that could * change the result of the wait condition. */ #define wait_event(wq, condition) \ do { \ might_sleep(); \ if (condition) \ break; \ __wait_event(wq, condition); \ } while (0) /* * The below macro ___wait_event() has an explicit shadow of the __ret * variable when used from the wait_event_*() macros. * * This is so that both can use the ___wait_cond_timeout() construct * to wrap the condition. * * The type inconsistency of the wait_event_*() __ret variable is also * on purpose; we use long where we can return timeout values and int * otherwise. */ #define ___wait_event(wq, condition, state, exclusive, ret, cmd) \ ({ \ __label__ __out; \ wait_queue_t __wait; \ long __ret = ret; /* explicit shadow */ \ \ init_wait_entry(&__wait, exclusive ? WQ_FLAG_EXCLUSIVE : 0); \ for (;;) { \ long __int = prepare_to_wait_event(&wq, &__wait, state);\ \ if (condition) \ break; \ \ if (___wait_is_interruptible(state) && __int) { \ __ret = __int; \ goto __out; \ } \ \ cmd; \ } \ finish_wait(&wq, &__wait); \ __out: __ret; \ }) #define __wait_event(wq, condition) \ (void)___wait_event(wq, condition, TASK_UNINTERRUPTIBLE, 0, 0, \ schedule())
wait_event(wq, condition) 的上述实现就是一系列的宏定义。
将 wait_event(wq, condition) 宏展开就获得下面一个代码段,这个代码段没有返回值,所以 wait_event 不能做为右值使用。
咱们在该代码段中加入注释来讲明其工做原理:
do { might_sleep();
/* 若是 condition 条件为 true,不会进入等待状态 */ if (condition) break; (void)({ __label__ __out;
/* 建立等待队列成员 */ wait_queue_t __wait; long __ret = 0; /* explicit shadow */
/* 初始化 __wait, 注意 init_wait_entry 初始化 __wait 时绑定的 func */ init_wait_entry(&__wait, 0); for (;;) {
/*
* 将 __wait 加入到等待队列中,返回 0 表示 __wait 加入到等待队列,非 0 表示未加入
* 因为 wait_event 展开时传入的 state 参数为 TASK_UNINTERRUPTIBLE,
* 因此此处 __int 得到的返回值必定为 0
*/ long __int = prepare_to_wait_event(&wq, &__wait, TASK_UNINTERRUPTIBLE); if (condition) break;
/* 这个 if 判断条件的结果必定为 false */ if (___wait_is_interruptible(TASK_UNINTERRUPTIBLE) && __int) { __ret = __int; goto __out; }
/* 让出 CPU 资源,进入等待状态 */ schedule(); }
/* 将 current 进程设置为 TASK_RUNNING 状态,并将 __wait 从等待队列 wq 中移除 */ finish_wait(&wq, &__wait); __out:
__ret; }) } while (0)
上述宏展开的代码段中涉及的几个关键函数代码以下:
void init_wait_entry(wait_queue_t *wait, int flags) { wait->flags = flags; wait->private = current; wait->func = autoremove_wake_function; INIT_LIST_HEAD(&wait->task_list); } int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) { int ret = default_wake_function(wait, mode, sync, key); if (ret) list_del_init(&wait->task_list); return ret; } long prepare_to_wait_event(wait_queue_head_t *q, wait_queue_t *wait, int state) { unsigned long flags; long ret = 0; spin_lock_irqsave(&q->lock, flags); if (unlikely(signal_pending_state(state, current))) { /* * Exclusive waiter must not fail if it was selected by wakeup, * it should "consume" the condition we were waiting for. * * The caller will recheck the condition and return success if * we were already woken up, we can not miss the event because * wakeup locks/unlocks the same q->lock. * * But we need to ensure that set-condition + wakeup after that * can't see us, it should wake up another exclusive waiter if * we fail. */ list_del_init(&wait->task_list); ret = -ERESTARTSYS; } else { if (list_empty(&wait->task_list)) { if (wait->flags & WQ_FLAG_EXCLUSIVE) __add_wait_queue_tail(q, wait); else __add_wait_queue(q, wait); } set_current_state(state); } spin_unlock_irqrestore(&q->lock, flags); return ret; } EXPORT_SYMBOL(prepare_to_wait_event);
wait_event(wq, condition) 实际的操做流程和 4.1 章节中描述的 __mmc_claim_host 是相似的,wait_event 将这个过程封装起来提供了更便捷的使用方法
一个进程要使用 wait_event 等待一个特定事件,须要如下三个基本步骤:
使用 wait_event 系列宏操做等待队列,比 __mmc_claim_host 中的方式要简单直观,也更不容易出错。
要正确使用 wait_event 系列宏,关键是要理解每个宏的语义以及适用场景,能够经过阅读源代码来深刻理解。
等待队列是 linux kernel 中与进程调度相关的重要机制,为进程间的同步提供了一种便捷的方式。
正确使用等待队列的前提是明白它的基本实现原理,掌握 wait_event 系列宏的语义和适用场景,在阅读源代码的基础上深刻理解。