ucore在前面的实验中实现了进程/线程机制,并在lab6中实现了抢占式的线程调度机制。基于中断的抢占式线程调度机制使得线程在执行的过程当中随时可能被操做系统打断,被阻塞挂起而令其它的线程得到CPU。多个线程并发的执行,大大提高了非cpu密集型应用程序的cpu吞吐量,使得计算机系统中宝贵的cpu硬件资源获得了充分利用。html
操做系统提供的内核线程并发机制的优势是明显的,但同时也带来了一些问题,其中首当其冲的即是线程安全问题。java
线程安全指的是在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会经过同步机制保证各个线程均可以正常且正确的执行,不会出现数据污染等意外状况。node
举一个经典的例子:在高级语言中对于某一共享整型变量i(假设i=5)进行的i++操做,在最终的机器代码中会被分解为几个更细致的机器指令:git
1. 从内存的对应地址中读取出变量i的值(高级语言的变量在机器层面表现为一个内存地址),写入cpu的寄存器中(假设是edx)程序员
2. 对寄存器edx进行+1运算(运算后edx寄存器中的值为5+1=6)github
3. 将edx的值写入变量i对应的内存空间中(在高级语言层面看,写入edx中的新值后i变成了6)redis
经过以前lab5/lab6的学习,咱们知道在i++具体的机器指令序列执行的每一步过程当中,操做系统均可能经过时钟中断打断对应线程的执行,进行线程的上下文切换。机器指令是原子性的,但高级语言中的一条指令底层可能对应多个机器指令,在执行的过程当中可能被中断介入,没法保证执行的连贯性。数组
例如,存在两个并发执行的线程a、线程b,都对线程间的共享变量i(i=5)进行了i++操做。安全
两个线程的执行i++时的机器指令流按照时间顺序依次为: 网络
1. 线程a读取内存中变量i的值,写入寄存器edx(此时内存中i的值为5,edx的值为5)。
2. 线程a令edx进行+1运算,此时寄存器edx的值为6。
3. 操做系统处理时钟中断,发现线程a的时间片已经用完,将其挂起,保存线程a的上下文(此时线程a的寄存器上下文中edx=6);并调度线程b开始获取cpu执行。
4. 线程b读取内存中变量i的值,写入寄存器edx(此时内存中i的值为5,edx的值为5)。
5. 线程b令edx进行+1运算,此时寄存器edx的值为6。
6. 操做系统处理时钟中断,发现线程b的时间片已经用完,将其挂起,保存线程b的上下文(此时线程b的寄存器上下文中edx=6);并调度线程a开始获取cpu执行。
7. 线程a恢复现场继续往下执行,将现场恢复后edx的值写回内存中变量i对应的内存地址中,写回后变量i=6。
8. 操做系统处理时钟中断,发现线程a的时间片已经用完,将其挂起;并调度线程a开始获取cpu执行。
9. 线程b恢复现场继续往下执行,将现场恢复后edx的值写会内存中变量i对应的内存地址中,写回后变量i=6。
上述的例子中,因为操做系统的抢占式调度以及高级语言中i++操做的非原子性,使得本来初始值为5的变量i,在执行两次i++以后获得的并非预期的7,而是错误的6。这还仅仅是两个并发线程对于一个共享变量的操做问题,实际的程序中会涉及到更多的并发线程和共享变量,使得所编写的多线程并发程序正确性没法获得保证。
在绝大多数状况下,程序的正确性都比性能重要的多。操做系统在引入抢占式调度的线程并发机制的同时,也须要提供相应的手段来解决线程安全问题。
解决线程安全问题,主要有两个思路:一是消除程序的并发性;二是阻止多个线程并发的访问共享资源(共享内存、共享文件、共享外设等等)的访问,即互斥:使得一个线程在访问某一共享资源时,其它的线程不能进行一样的操做。
第一种思路被一些I/O密集型的应用程序所使用,即整个程序(进程)中只有一个线程在工做,经过操做系统底层提供的i/o多路复用机制进行工做,早期的redis以及nodeJS就是工做在单线程模型下的。单线程工做的应用程序因为不存在多个线程并发执行的场景,消除了线程的并发性,天然也不须要处理线程安全问题了。
而操做系统解决线程安全问题的方式采用的是第二种思路(通用操做系统是用于同时为大量进程、线程服务的,所以不能再回过头来禁止并发),经过一些机制限制并发线程同时访问会引发线程安全问题的共享变量,保证访问的互斥性。
经过lab7的学习,将可以深刻学习操做系统底层实现线程同步、互斥机制,理解信号量、条件变量、管程等同步互斥机制的工做原理;也能够对更上层的如java中的synchronized、AQS悲观锁、管程monitor、notify/wait等线程并发同步机制有更深的理解。
lab7是创建在以前实验的基础之上的,须要先理解以前的实验才能更好的理解lab7中的内容。
能够参考一下我关于前面实验的博客:
1. ucore操做系统学习(一) ucore lab1系统启动流程分析
2. ucore操做系统学习(二) ucore lab2物理内存管理分析
3. ucore操做系统学习(三) ucore lab3虚拟内存管理分析
4. ucore操做系统学习(四) ucore lab4内核线程管理
5. ucore操做系统学习(五) ucore lab5用户进程管理
6. ucore操做系统学习(六) ucore lab6线程调度器
ucore在lab7中的内容大体分为如下几个部分:
1. 实现等待队列
2. 实现信号量
3. 使用信号量解决哲学家就餐问题
4. 基于信号量实现条件变量
5. 基于信号量和条件变量实现管程
6. 使用管程解决哲学家就餐问题
前面提到,ucore在lab7中实现的同步机制是基于休眠/唤醒机制的。为了保证线程对于临界区访问的互斥性,在前一个线程已经进入了临界区后,后续要访问临界区的线程会被阻塞以等待前一个线程离开临界区,在以前进入临界区的线程离开临界区后被阻塞的线程会被再次唤醒得到进入临界区的资格。
在有许多线程并发时,可能会有不止一个线程被阻塞在对应的临界区,为此抽象出了等待队列结构(wait_queue)用于维护这一被阻塞线程的集合。当线程因为互斥而被阻塞在临界区时,将其加入等待队列并放弃cpu进入阻塞态;当以前得到临界区访问权限的线程离开后,再从对应的等待队列中选择一个被阻塞、处于等待状态的线程唤醒,被唤醒的线程能接着进入临界区。
利用等待队列,使得自始至终都只有最多一个线程在临界区中,保证了互斥性;而线程在等待队列中的休眠(阻塞)/唤醒动做,则实现了线程之间对于临界区访问的同步。
等待队列固然并不仅适用于线程并发同步,当线程进入等待状态以等待某一特定完成事件时(定时休眠一段时间、等待阻塞IO读写完成等等事件),底层均可以使用等待队列来实现。
ucore在/kern/sync目录下的wait.c、wait.h中实现了等待队列wait_queue、等待队列节点项wait_t以及相关的函数。
ucore的等待队列底层是经过双向链表结构实现的。和前面的实验相似的,提供了一个宏定义le2wait用于访问wait_link节点项对应的wait_t结构。
等待队列结构:
/** * 等待队列 * */ typedef struct { // 等待队列的头结点(哨兵节点) list_entry_t wait_head; } wait_queue_t; struct proc_struct; /** * 等待队列节点项 * */ typedef struct { // 关联的线程 struct proc_struct *proc; // 唤醒标识 uint32_t wakeup_flags; // 该节点所属的等待队列 wait_queue_t *wait_queue; // 等待队列节点 list_entry_t wait_link; } wait_t; #define le2wait(le, member) \ to_struct((le), wait_t, member)
等待队列结构底层操做:
// 初始化wait_t等待队列项 void wait_init(wait_t *wait, struct proc_struct *proc); // 初始化等待队列 void wait_queue_init(wait_queue_t *queue); // 将wait节点项插入等待队列 void wait_queue_add(wait_queue_t *queue, wait_t *wait); // 将wait项从等待队列中移除 void wait_queue_del(wait_queue_t *queue, wait_t *wait); // 获取等待队列中wait节点的下一项 wait_t *wait_queue_next(wait_queue_t *queue, wait_t *wait); // 获取等待队列中wait节点的前一项 wait_t *wait_queue_prev(wait_queue_t *queue, wait_t *wait); // 获取等待队列的第一项 wait_t *wait_queue_first(wait_queue_t *queue); // 获取等待队列的最后一项 wait_t *wait_queue_last(wait_queue_t *queue); // 等待队列是否为空 bool wait_queue_empty(wait_queue_t *queue); // wait项是否在等待队列中 bool wait_in_queue(wait_t *wait);
// 将wait项从等待队列中删除(若是存在的话) #define wait_current_del(queue, wait) \ do { \ if (wait_in_queue(wait)) { \ wait_queue_del(queue, wait); \ } \ } while (0) #endif /* !__KERN_SYNC_WAIT_H__ */
/** * 初始化wait_t等待队列项 * */ void wait_init(wait_t *wait, struct proc_struct *proc) { // wait项与proc创建关联 wait->proc = proc; // 等待的状态 wait->wakeup_flags = WT_INTERRUPTED; // 加入等待队列 list_init(&(wait->wait_link)); } /** * 初始化等待队列 * */ void wait_queue_init(wait_queue_t *queue) { // 等待队列头结点初始化 list_init(&(queue->wait_head)); } /** * 将wait节点项插入等待队列 * */ void wait_queue_add(wait_queue_t *queue, wait_t *wait) { assert(list_empty(&(wait->wait_link)) && wait->proc != NULL); // wait项与等待队列创建关联 wait->wait_queue = queue; // 将wait项插入头结点前 list_add_before(&(queue->wait_head), &(wait->wait_link)); } /** * 将wait项从等待队列中移除 * */ void wait_queue_del(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_del_init(&(wait->wait_link)); } /** * 获取等待队列中wait节点的下一项 * */ wait_t * wait_queue_next(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_next(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的下一项不是头结点,将其返回 return le2wait(le, wait_link); } return NULL; } /** * 获取等待队列中wait节点的前一项 * */ wait_t * wait_queue_prev(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_prev(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的前一项不是头结点,将其返回 return le2wait(le, wait_link); } return NULL; } /** * 获取等待队列的第一项 * */ wait_t * wait_queue_first(wait_queue_t *queue) { // 获取头结点的下一项 list_entry_t *le = list_next(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 头结点的下一项不是头结点,将其返回 return le2wait(le, wait_link); } // 头结点的下一项仍是头结点,说明等待队列为空(只有一个wait_head哨兵节点) return NULL; } /** * 获取等待队列的最后一项 * */ wait_t * wait_queue_last(wait_queue_t *queue) { // 获取头结点的前一项 list_entry_t *le = list_prev(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 头结点的前一项不是头结点,将其返回 return le2wait(le, wait_link); } // 头结点的前一项仍是头结点,说明等待队列为空(只有一个wait_head哨兵节点) return NULL; } /** * 等待队列是否为空 * */ bool wait_queue_empty(wait_queue_t *queue) { return list_empty(&(queue->wait_head)); } /** * wait项是否在等待队列中 * */ bool wait_in_queue(wait_t *wait) { return !list_empty(&(wait->wait_link)); }
等待队列休眠/唤醒等高层操做:
等待队列对于线程的休眠、唤醒对应的高级操做依赖于上面介绍的、底层的等待队列增删改查操做。
// 将等待队列中的wait项对应的线程唤醒 void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del); // 将等待队列中的第一项对应的线程唤醒 void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 将等待队列中的全部项对应的线程所有唤醒 void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 令对应wait项加入当前等待队列;令当前线程阻塞休眠,挂载在该等待队列中 void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state);
/** * 将等待队列中的wait项对应的线程唤醒 * */ void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del) { if (del) { // 将wait项从等待队列中删除 wait_queue_del(queue, wait); } // 设置唤醒的缘由标识 wait->wakeup_flags = wakeup_flags; // 唤醒对应线程 wakeup_proc(wait->proc); } /** * 将等待队列中的第一项对应的线程唤醒 * */ void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { wakeup_wait(queue, wait, wakeup_flags, del); } } /** * 将等待队列中的全部项对应的线程所有唤醒 * */ void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { if (del) { do { wakeup_wait(queue, wait, wakeup_flags, 1); } while ((wait = wait_queue_first(queue)) != NULL); } else { do { wakeup_wait(queue, wait, wakeup_flags, 0); } while ((wait = wait_queue_next(queue, wait)) != NULL); } } } /** * 令对应wait项加入当前等待队列;令当前线程阻塞休眠,挂载在该等待队列中 * */ void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state) { assert(current != NULL); wait_init(wait, current); current->state = PROC_SLEEPING; current->wait_state = wait_state; wait_queue_add(queue, wait); }
信号量是一种同步互斥机制的实现,广泛存在于如今的各类操做系统内核里,最先是由著名计算机科学家Dijkstra提出。
ucore信号量定义:
信号量的定义和使用很是简单和基础,包含了一个信号量的值value以及用于线程同步的等待队列。
/** * 信号量 * */ typedef struct { // 信号量值 int value; // 信号量对应的等待队列 wait_queue_t wait_queue; } semaphore_t; /** * 初始化信号量 * */ void sem_init(semaphore_t *sem, int value) { sem->value = value; // 初始化等待队列 wait_queue_init(&(sem->wait_queue)); }
信号量的主要操做分别是down和up,对应于Dijkstra提出信号量时提出的P/V操做。
信号量做为同步互斥的基本结构,其down/up操做必须是原子性的,没法被打断发生上下文切换。令软件程序表现出原子性的方法有不少,因为ucore是运行在单核的80386cpu上的,简单起见便直接使用关闭中断的方式来实现信号量操做的原子性(多核cpu的状况下,关闭单核的中断是不够的,而关闭全部核心的中断则性能损失太大,须要采起锁总线等其它手段来实现软件原子性)。
信号量的down操做:
信号量的down操做,是请求获取一个信号量。
当信号量的value值大于0时,说明还能容纳当前线程进入临界区。
当信号量的value值等于0时,说明已经没法容纳更多的线程了,此时须要将当前线程阻塞在信号量的等待队列上,等待信号量的up操做将其唤醒。
/** * 信号量down操做 扣减信号量 * 当信号量value不足时将当前线程阻塞在信号量上,等待其它线程up操做时将其唤醒 * */ static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暂时关闭中断,保证信号量的down操做是原子操做 local_intr_save(intr_flag); if (sem->value > 0) { // 信号量对应的value大于0,还有权使用 sem->value --; local_intr_restore(intr_flag); return 0; } // 信号量对应的value小于等于0,须要阻塞当前线程 wait_t __wait, *wait = &__wait; // 令当前线程挂在信号量的阻塞队列中 wait_current_set(&(sem->wait_queue), wait, wait_state); // 恢复中断,原子操做结束 local_intr_restore(intr_flag); // 当前线程进入阻塞状态了,进行一次调度 schedule(); local_intr_save(intr_flag); // 唤醒后,原子操做将当前项从信号量的等待队列中删除 wait_current_del(&(sem->wait_queue), wait); local_intr_restore(intr_flag); if (wait->wakeup_flags != wait_state) { // 若是等待线程唤醒的标识与以前设置的参数wait_state不一致,将其状态返回给调用方作进一步判断 return wait->wakeup_flags; } return 0; }
信号量的up操做:
信号量的up操做,是增长一个信号量中的值。
当增长信号量值时发现当前信号量的等待队列为空时,则说明当前没有线程被阻塞、须要进入信号量管制的临界区中,简单的将信号量值加1。
当增长信号量时发现等待队列不为空,则说明存在线程想要进入临界区中,却因为没有知足信号量的条件,被阻塞在了临界区外。此时便从信号量的等待队列中挑选出最先被阻塞的线程,将其唤醒,使得其得以进入临界区。
/** * 信号量up操做 增长信号量或唤醒被阻塞在信号量上的一个线程(若是有的话) * */ static __noinline void __up(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暂时关闭中断,保证信号量的up操做是原子操做 local_intr_save(intr_flag); { wait_t *wait; if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) { // 信号量的等待队列为空,说明没有线程等待在该信号量上 // 信号量value加1 sem->value ++; } else { assert(wait->proc->wait_state == wait_state); // 将等待队列中的对应等待线程唤醒 wakeup_wait(&(sem->wait_queue), wait, wait_state, 1); } } local_intr_restore(intr_flag); }
信号量的down与up操做关系十分紧密,互相对照着看能够更好的理解其工做原理。
互斥信号量:
value值被初始化为1的信号量比较特殊,称为二元信号量,也被叫作互斥信号量。互斥信号量可以做为mutex互斥锁,用于保证临界区中数据不会被线程并发的访问。
哲学家就餐问题是Dijkstra提出的一个经典的多线程同步问题。大概场景是在一个环形的圆桌上,坐着五个哲学家,而桌上有五把叉子和五个碗。一个哲学家平时进行思考,饥饿时便试图取用其左右最靠近他的叉子,只有在他拿到两只叉子时才能进餐。进餐完毕,放下叉子继续思考。
解决哲学家就餐问题的基本思路是使用线程模拟哲学家,每一个线程对应一个活动着的哲学家。可是因为5个并发活动的哲学家线程争抢仅有的5把叉子,且哲学家只有在同时拿到两根叉子时才能进餐,若是没有良好的同步机制对这5个哲学家线程进行协调,那么哲学家线程互相之间容易发生死锁(例如,五个哲学家线程同时拿起了本身左手边的叉子,都没法拿起本身右边的叉子,互相等待着。哲学家之间将永远没法进餐,纷纷饿死)。
使用Dijkstra提出的信号量机制能够很好的解决哲学家就餐问题,下面看看ucore中是如何使用信号量解决哲学家就餐问题的。
哲学家线程主体执行逻辑:
ucore的lab7中的check_sync函数是整个lab7实验的总控函数。在check_sync的前半部分使用kern_thread函数建立了N(N=5)个哲学家内核线程,用于执行philosopher_using_semaphore,模拟哲学家就餐问题。
philosopher_using_semaphore中哲学家循环往复的进行以下操做:
1. 哲学家进行思考(经过do_sleep系统调用进行休眠阻塞,模拟哲学家思考)
2. 经过phi_take_forks_sema函数尝试着同时拿起左右两个叉子(若是没法拿到左右叉子,则会陷入阻塞状态)
3. 哲学家进行就餐(经过do_sleep系统调用进行休眠阻塞,模拟哲学家就餐)
4. 经过phi_put_forks_sema函数同时放下左右两个叉子
拿起叉子的phi_take_forks_sema函数和放下叉子phi_put_forks_sema的函数内部都是经过信号量进行同步的,在下面进行更进一步的分析。
#define N 5 /* 哲学家数目 */ #define LEFT (i-1+N)%N /* i的左邻号码 */ #define RIGHT (i+1)%N /* i的右邻号码 */ #define THINKING 0 /* 哲学家正在思考 */ #define HUNGRY 1 /* 哲学家想取得叉子 */ #define EATING 2 /* 哲学家正在吃面 */ #define TIMES 4 /* 吃4次饭 */ #define SLEEP_TIME 10 void check_sync(void){ int i; //check semaphore 信号量解决哲学家就餐问题 sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } 。。。 条件变量(管程)解决哲学家就餐问题(暂时忽略) } //---------- philosophers problem using semaphore ---------------------- int state_sema[N]; /* 记录每一个人状态的数组 */ /* 信号量是一个特殊的整型变量 */ semaphore_t mutex; /* 临界区互斥 */ semaphore_t s[N]; /* 每一个哲学家一个信号量 */ /** * 哲学家线程主体执行逻辑 * */ int philosopher_using_semaphore(void * arg) /* i:哲学家号码,从0到N-1 */ { int i, iter=0; i=(int)arg; cprintf("I am No.%d philosopher_sema\n",i); while(iter++<TIMES) { /* 无限循环 */ cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i); /* 哲学家正在思考 */ // 使用休眠阻塞来模拟思考(哲学家线程阻塞N秒) do_sleep(SLEEP_TIME); // 哲学家尝试着去拿左右两边的叉子(若是没拿到会阻塞) phi_take_forks_sema(i); /* 须要两只叉子,或者阻塞 */ cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i); /* 进餐 */ // 使用休眠阻塞来模拟进餐(哲学家线程阻塞N秒) do_sleep(SLEEP_TIME); // 哲学家就餐结束,将叉子放回桌子。 // 当发现以前有临近的哲学家尝试着拿左右叉子就餐时却没有成功拿到,尝试着唤醒对应的哲学家 phi_put_forks_sema(i); /* 把两把叉子同时放回桌子 */ } cprintf("No.%d philosopher_sema quit\n",i); return 0; }
phi_take_forks_sema函数表示哲学家尝试着拿起左右叉子想要就餐;而phi_put_forks_sema函数表示哲学家进餐结束后放下左右叉子。
二者都是经过全局的互斥信号量mutex的down操做进行全局的互斥,保证在同一时刻只有一个哲学家线程可以进入临界区,对临界区的资源叉子进行拿起/放下操做。对不一样的哲学家线程进行互斥,保证查看左右叉子的状态时不会出现并发问题。在后面对mutex的up操做用于释放mutex互斥信号量,以离开临界区,唤醒可能阻塞在mutex信号量中的其它哲学家线程,让阻塞在mutex信号量中的另外一个线程得以进入临界区。
phi_take_forks_sema函数分析:
在执行phi_take_forks_sema拿叉子时,经过关键的phi_test_sema函数进行条件的判断,判断当前哲学家线程i的左右哲学家线程是否都未就餐。
若是条件知足(在拿叉子时,phi_test_sema前哲学家i已经被预先设置为HUNGRY饥饿状态了),则表明当前哲学家i能够进餐(其拿起了左右叉子,也表明着其相邻的左右哲学家没法就餐)。
而若是条件不知足则会在phi_take_forks_sema的最后,被down(&s[i])阻塞在信号量s[i]上,等待其被左右两旁就餐完毕的哲学家将其唤醒。
phi_put_forks_sema函数分析:
在执行phi_put_forks_sema放下叉子时,首先经过设置state_sema[i]的状态为Thinking,表明哲学家i已经就餐完毕从新进入思考状态。同时哲学家i在放下叉子后,经过phi_test_sema(LEFT)和phi_test_sema(RIGHT)来判断相邻的哲学家在本身就餐的这段时间是否也陷入了饥饿状态,却因为暂时拿不到叉子而被阻塞了(LEFT和RIGHT宏利用取模,解决下标回环计算的问题)。若是确实存在这种状况,经过phi_test_sema函数尝试着令相邻的哲学家进行就餐(也许被阻塞哲学家的隔壁另外一边的哲学家依然在就餐,那么此时依然没法将其唤醒就餐;而须要等到另外一边的哲学家就餐完毕来尝试将其唤醒)。
经过互斥信号量mutex实现哲学家线程就餐对临界区资源-叉子访问的互斥性,避免了并发时对叉子状态判断不许确的状况产生;同时利用信号量数组semaphore_t s[N]对哲学家拿取、放下叉子的操做进行同步,使得哲学家们在叉子资源有限、冲突的状况下有序的就餐,不会出死锁、饥饿等现象。
/** * 哲学家i拿起左右叉子 */ void phi_take_forks_sema(int i) /* i:哲学家号码从0到N-1 */ { // 拿叉子时须要经过mutex信号量进行互斥,防止并发问题(进入临界区) down(&mutex); // 记录下哲学家i饥饿的事实(执行phi_take_forks_sema尝试拿叉子,说明哲学家i进入了HUNGRY饥饿状态) state_sema[i]=HUNGRY; // 试图同时获得左右两只叉子 phi_test_sema(i); // 离开临界区(唤醒可能阻塞在mutex上的其它线程) up(&mutex); // phi_test_sema中若是成功拿到叉子进入了就餐状态,会先执行up(&s[i]),再执行down(&s[i])时便不会阻塞 // 反之,若是phi_test_sema中没有拿到叉子,则down(&s[i])将会令哲学家i阻塞在信号量s[i]上 down(&s[i]); } /** * 哲学家i放下左右叉子 */ void phi_put_forks_sema(int i) /* i:哲学家号码从0到N-1 */ { // 放叉子时须要经过mutex信号量进行互斥,防止并发问题(进入临界区) down(&mutex); /* 进入临界区 */ // 哲学家进餐结束(执行phi_put_forks_sema放下叉子,说明哲学家已经就餐完毕,从新进入THINKING思考状态) state_sema[i]=THINKING; // 当哲学家i就餐结束,放下叉子时。须要判断左、右临近的哲学家在本身就餐的这段时间内是否也进入了饥饿状态,却由于本身就餐拿走了叉子而没法同时得到左右两个叉子。 // 为此哲学家i在放下叉子后须要尝试着判断在本身放下叉子后,左/右临近的、处于饥饿的哲学家可否进行就餐,若是能够就唤醒阻塞的哲学家线程,并令其进入就餐状态(EATING) phi_test_sema(LEFT); /* 看一下左邻居如今是否能进餐 */ phi_test_sema(RIGHT); /* 看一下右邻居如今是否能进餐 */ up(&mutex); /* 离开临界区q(唤醒可能阻塞在mutex上的其它线程) */ } /** * 判断哲学家i是否能够拿起左右叉子 */ void phi_test_sema(i) /* i:哲学家号码从0到N-1 */ { // 当哲学家i处于饥饿状态(HUNGRY),且其左右临近的哲学家都没有在就餐状态(EATING) if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING &&state_sema[RIGHT]!=EATING) { // 哲学家i饿了(HUNGRY),且左右两边的叉子都没人用。 // 令哲学家进入就餐状态(EATING) state_sema[i]=EATING; // 唤醒阻塞在对应信号量上的哲学家线程(当是哲学家线程i本身执行phi_test_sema(i)时,则信号量直接加1,抵消掉phi_take_forks_sema中的down操做,表明直接拿起叉子就餐成功而不用进入阻塞态) up(&s[i]); } }
ucore中的条件变量是基于信号量实现的,同时条件变量也做为管程Monitor结构的重要组成部分。
条件变量condvar结构定义:
/** * 条件变量 * */ typedef struct condvar{ // 条件变量相关的信号量,用于阻塞/唤醒线程 semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc // 等待在条件变量之上的线程数 int count; // the number of waiters on condvar // 拥有该条件变量的monitor管程 monitor_t * owner; // the owner(monitor) of this condvar } condvar_t;
管程monitor结构定义:
/** * 管程 * */ typedef struct monitor{ // 管程控制并发的互斥锁(应该被初始化为1的互斥信号量) semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1 // 管程内部协调各并发线程的信号量(线程能够经过该信号量挂起本身,其它并发线程或者被唤醒的线程能够反过来唤醒它) semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc. // 休眠在next信号量中的线程个数 int next_count; // the number of of sleeped signaling proc // 管程所属的条件变量(能够是数组,对应n个条件变量) condvar_t *cv; // the condvars in monitor } monitor_t;
/** * 初始化管程 * */ void monitor_init (monitor_t * mtp, size_t num_cv) { int i; assert(num_cv>0); mtp->next_count = 0; mtp->cv = NULL; // 管程的互斥信号量值设为1(初始化时未被锁住) sem_init(&(mtp->mutex), 1); //unlocked // 管程的协调信号量设为0,当任何一个线程发现不知足条件时,当即阻塞在该信号量上 sem_init(&(mtp->next), 0); // 为条件变量分配内存空间(参数num_cv指定管程所拥有的条件变量的个数) mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv); assert(mtp->cv!=NULL); // 构造对应个数的条件变量 for(i=0; i<num_cv; i++){ mtp->cv[i].count=0; // 条件变量信号量初始化时设置为0,当任何一个线程发现不知足条件时,当即阻塞在该信号量上 sem_init(&(mtp->cv[i].sem),0); mtp->cv[i].owner=mtp; } }
条件变量的等待操做实现:
cond_wait函数实现条件变量的wait操做。条件变量的wait操做和信号量的down功能相似。当条件变量对应的条件不知足时,经过信号量的down操做,令当前线程阻塞、等待在条件变量所属的信号量上。
// Suspend calling thread on a condition variable waiting for condition Atomically unlocks // mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures /** * 条件变量阻塞等待操做 * 令当前线程阻塞在该条件变量上,等待其它线程将其经过cond_signal将其唤醒。 * */ void cond_wait (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cv.count ++; * if(mt.next_count>0) * signal(mt.next) * else * signal(mt.mutex); * wait(cv.sem); * cv.count --; */ // 阻塞在当前条件变量上的线程数加1 cvp->count++; if(cvp->owner->next_count > 0) // 对应管程中存在被阻塞的其它线程 // 唤醒阻塞在对应管程协调信号量next中的线程 up(&(cvp->owner->next)); else // 若是对应管程中不存在被阻塞的其它线程 // 释放对应管程的mutex二元信号量 up(&(cvp->owner->mutex)); // 令当前线程阻塞在条件变量上 down(&(cvp->sem)); // down返回,说明已经被再次唤醒,条件变量count减1 cvp->count --; cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
条件变量的唤醒操做实现:
cond_signal函数用于实现条件变量的signal操做。条件变量的signal操做和信号量的up功能相似。当条件变量对应的条件知足时,经过信号量的up操做,唤醒阻塞在对应条件变量中的线程。
// Unlock one of threads waiting on the condition variable. /** * 条件变量唤醒操做 * 解锁(唤醒)一个等待在当前条件变量上的线程 * */ void cond_signal (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cond_signal(cv) { * if(cv.count>0) { * mt.next_count ++; * signal(cv.sem); * wait(mt.next); * mt.next_count--; * } * } */ // 若是等待在条件变量上的线程数大于0 if(cvp->count>0) { // 须要将当前线程阻塞在管程的协调信号量next上,next_count加1 cvp->owner->next_count ++; // 令阻塞在条件变量上的线程进行up操做,唤醒线程 up(&(cvp->sem)); // 令当前线程阻塞在管程的协调信号量next上 // 保证管程临界区中只有一个活动线程,先令本身阻塞在next信号量上;等待被唤醒的线程在离开临界区后来反过来将本身从next信号量上唤醒 down(&(cvp->owner->next)); // 当前线程被其它线程唤醒从down函数中返回,next_count减1 cvp->owner->next_count --; } cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
条件变量与管程的交互:
仔细比对条件变量与信号量的实现,会发现大体的实现思路是一致的。但ucore中实现的条件变量是做为管程的一部分工做的,所以在wait和signal操做中都额外耦合了与对应管程owner交互的地方。
在管程中进入临界区的线程发现条件不知足而进行条件变量的wait操做时,须要释放管程中临界区的锁,在wait操做挂起自身时令其它想要进入管程内的线程得到临界区的访问权限。
在管程中临界区的线程发现某一条件获得知足时,将执行对应条件变量的signal操做以唤醒等待在其上的某一个线程。可是因为管程临界区的互斥性,不能容许临界区内有超过一个的线程在其中运行,所以执行signal操做的线程须要首先将本身阻塞挂起在管程的next信号量上,使得被唤醒的那一个线程独占临界区资源。当被唤醒的线程离开临界区时,也会及时的唤醒挂起在管程next信号量上的对应线程。
因为并发环境下多个线程经过条件变量等同步机制交替的休眠/唤醒,逻辑执行流并非连贯的,所以条件变量和管程的实现显得比较绕,使人费解。经过学习如何用管程解决哲学家就餐问题,看看使用管程/条件变量是如何进行线程同步互斥的,加深对条件变量、管程工做机制的理解。
在checkSync函数的后半部分,是关于如何使用管程解决哲学家就餐问题。在check_sync的后半部分建立了N(N=5)个哲学家内核线程,用于执行philosopher_using_condvar函数,模拟哲学家就餐问题。
philosopher_using_condvar中哲学家循环往复的进行以下操做(总体流程和信号量的实现大致一致):
1. 哲学家进行思考(经过do_sleep系统调用进行休眠阻塞,模拟哲学家思考)
2. 经过phi_take_forks_condvar函数尝试着同时拿起左右两个叉子(若是没有拿到左右叉子,陷入阻塞)
3. 哲学家进行就餐(经过do_sleep系统调用进行休眠阻塞,模拟哲学家就餐)
4. 经过phi_put_forks_condvar函数同时放下左右两个叉子,回到思考状态
拿起叉子的phi_take_forks_condvar函数和放下叉子phi_put_forks_condvar的函数内部都是经过条件变量进行同步的,在下面进行更进一步的分析。
checkSync函数:
void check_sync(void){ int i; //check semaphore sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } //check condition variable monitor_init(&mt, N); for(i=0;i<N;i++){ state_condvar[i]=THINKING; int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_condvar failed.\n"); } philosopher_proc_condvar[i] = find_proc(pid); set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc"); } }
phi_take_forks_condvar函数表达哲学家尝试着拿起左右叉子想要就餐;而phi_put_forks_condvar函数表达哲学家进餐结束后放下左右叉子。
二者经过管程中的互斥信号量mutex的down操做进行全局的互斥,保证在同一时刻只有一个哲学家线程可以进入临界区,对临界区的资源叉子进行拿起/放下操做,对不一样的哲学家线程进行互斥,保证查看左右叉子的状态时不会出现并发问题。
在离开管程的临界区时(注释into routine in monitor和leave routine in monitor之间为管程的临界区代码),当前线程会根据管程内是否存在其它线程(mtp->next_count>0)而有不一样的操做。当发现管程中的next信号量上存在其它线程阻塞在上面时,优先唤醒next信号量上的线程(阻塞在next上的线程是因为要唤醒等待在某一条件变量上的线程,为了保证临界区互斥自愿被阻塞的,所以被唤醒的线程在离开临界区后须要第一时间将其唤醒);而若是next信号量中不存在休眠的线程,那么就和信号量的实现相似,释放mutex互斥锁,唤醒可能等待在其上的某一线程。
上述ucore实现的管程其线程交互的逻辑是基于Hoare语义的,此外还存在MESA语义的管程和Hansen语义的管程(MESA管程和Hansen管程实现相似前面的信号量实现哲学家就餐)。
Hoare管程在signal唤醒其它线程时会令本身陷入休眠,严格的保证了临界区线程的互斥,理论上更加可靠,常见于教科书的理论中。因为Hoare语义的管程须要额外引入一个等待队列(next信号量),所以其性能并不如其它两种语义的管程,现实中被使用的地方不多。
phi_take_forks_condvar函数(同时拿起左右叉子):
void phi_take_forks_condvar(int i) { // 拿叉子时须要经过mutex信号量进行互斥,防止并发问题(进入临界区) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I am hungry // try to get fork // I am hungry // 记录下哲学家i饥饿的事实(执行phi_take_forks_condvar尝试拿叉子,说明哲学家i进入了HUNGRY饥饿状态) state_condvar[i]=HUNGRY; // 试图同时获得左右两只叉子 phi_test_condvar(i); if (state_condvar[i] != EATING) { // state_condvar[i]状态不为EATING,说明phi_test_condvar尝试拿左右叉子进餐失败 cprintf("phi_take_forks_condvar: %d didn't get fork and will wait\n",i); // 等待阻塞在管程的条件变量cv[i]上 cond_wait(&mtp->cv[i]); } //--------leave routine in monitor-------------- if(mtp->next_count>0){ // 当离开管程临界区时,若是发现存在线程等待在mtp->next上 // 在当前实验中,执行到这里的当前线程多是阻塞在cond_wait中被其它线程唤醒的,对应线程是经过phi_test_condvar的cond_signal操做唤醒当前线程的 // 执行cond_signal时为了保证管程临界区内不存在并发的线程访问,在唤醒其它线程时,会把本身阻塞在管程的next信号量上,等待此时离开临界区的线程将其唤醒 up(&(mtp->next)); }else{ // 当离开管程临界区时,没有其它线程等待在mtp->next上,直接释放管程的互斥锁mutex便可(唤醒可能阻塞在mutex上的其它线程) up(&(mtp->mutex)); } }
phi_put_forks_condvar函数(同时放下左右叉子):
void phi_put_forks_condvar(int i) { // 放叉子时须要经过mutex信号量进行互斥,防止并发问题(进入临界区) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I ate over // test left and right neighbors // I ate over // 哲学家进餐结束(执行phi_put_forks_condvar放下叉子,说明哲学家已经就餐完毕,从新进入THINKING思考状态) state_condvar[i]=THINKING; // test left and right neighbors // 当哲学家i就餐结束,放下叉子时。须要判断左、右临近的哲学家在本身就餐的这段时间内是否也进入了饥饿状态,却由于本身就餐拿走了叉子而没法同时得到左右两个叉子。 // 为此哲学家i在放下叉子后须要尝试着判断在本身放下叉子后,左/右临近的、处于饥饿的哲学家可否进行就餐,若是能够就唤醒阻塞的哲学家线程,并令其进入就餐状态(EATING) phi_test_condvar(LEFT); // 看一下左邻居如今是否能进餐 phi_test_condvar(RIGHT); // 看一下右邻居如今是否能进餐 //--------leave routine in monitor-------------- // lab7的参考答案 if(mtp->next_count>0){ cprintf("execute here mtp->next_count>0 \n\n\n\n\n\n"); up(&(mtp->next)); }else{ cprintf("execute here mtp->next_count=0 \n\n\n\n\n"); up(&(mtp->mutex)); } // 我的认为放叉子和取叉子的状况并不同,不会出现mtp->next_count>0的状况,这里只须要释放互斥锁便可(若是这里理解的有问题,还请指正) // 当放叉子的线程在phi_put_forks_condvar中离开管程临界区时,只有两种状况 // 1. 没有发现邻居能够进餐,自身不会被阻塞 // 2. 发现有邻居以前被拿不到叉子阻塞了,如今能够进餐了,phi_test_condvar中的cond_signal会暂时令本身阻塞在next信号量上 // 可是很快被本身叫醒的相邻的哲学家线程在被唤醒后一离开临界区就会将本身唤醒,在cond_signal被唤醒后的操做中mtp->next_count会自减,而变为0 // // 以上两种状况下,因为管程自己最外面有一个mutex互斥信号量,因此不会出现两个线程同时阻塞在next信号量中,所以也就不会出现参考答案中mtp->next_count>0的状况 // up(&(mtp->mutex)); }
phi_test_condvar函数(判断哲学家i是否能拿起左右叉子开始就餐):
void phi_test_condvar (i) { // 当哲学家i处于饥饿状态(HUNGRY),且其左右临近的哲学家都没有在就餐状态(EATING) if(state_condvar[i]==HUNGRY&&state_condvar[LEFT]!=EATING &&state_condvar[RIGHT]!=EATING) { cprintf("phi_test_condvar: state_condvar[%d] will eating\n",i); // 哲学家i饿了(HUNGRY),且左右两边的叉子都没人用。 // 令哲学家进入就餐状态(EATING) state_condvar[i] = EATING ; cprintf("phi_test_condvar: signal self_cv[%d] \n",i); // 唤醒阻塞在对应信号量上的哲学家线程 cond_signal(&mtp->cv[i]) ; } }
信号量是一个简单、高效的同步互斥机制,但也正是因为其过于底层,因此在编写线程同步代码时须要十分当心谨慎,对每一处信号量的使用仔细斟酌才能保证程序的正确性,对开发人员的心智是一个巨大的负担。
而将管程做为一个总体的结构来看的话,会发现管程虽然将控制同步的代码逻辑抽象为了一个固定的模板变得容易使用,但却与要保护的临界区业务逻辑代码耦合的很严重,操做系统的开发者很难将管程控制同步的代码植入进对应的应用程序内部。
所以操做系统一般只提供了信号量以及条件变量这种偏底层、耦合性低的同步互斥机制;而管程机制则更多的由高级语言的编译器在语言层面实现,以简化程序员开发复杂并发同步程序的复杂度。高级语言编译器在编译本地机器代码时,能够在须要进行同步的代码逻辑块中利用操做系统底层提供的信号量或是条件变量机制来实现管程。
例如java中若是在方法定义时简单的加上synchronized关键字就能控制多线程环境下不会并发的执行该方法。这是由于在编译成字节码时,相比于普通方法额外插入了一些管程Monitor相关的同步控制代码(管程底层依赖的信号量、条件变量机制仍是取决于对应的操做系统平台,只不过被jvm屏蔽掉了差别,让java程序员感知不到)。
经过ucore的lab7的学习,让我理解了等待队列、信号量、条件变量以及管程的大体工做原理,也对日常会接触到的java中的synchronized、AQS、Reentrantlock其底层机制有了进一步的认识。
lab7的学习使我收获颇丰,但这对于线程同步相关领域的学习仍是远远不够。同属于进程间通讯IPC领域的经典问题除了哲学家就餐问题外,还有读者/写者问题等;对于线程安全问题,除了使用休眠/唤醒进行线程上下文切换的阻塞的方式以外,还有使用CAS等重试的方法;除了信号量、条件变量等基于单机系统内的线程同步方式外,还有基于分布式系统,经过网络进行多机器线程同步的机制等等。虽然不够了解的知识还有不少,但经过ucore操做系统的学习,为我学习相关的领域知识打下了基础,也给了我相信最终能融会贯通这些知识的信心。
这篇博客的完整代码注释在个人github上:https://github.com/1399852153/ucore_os_lab (fork自官方仓库)中的lab7_answer。
但愿个人博客能帮助到对操做系统、ucore os感兴趣的人。存在许多不足之处,还请多多指教。