锁是操做系统中实现进程同步的重要机制。程序员
临界区(Critical Section)是指对共享数据进行访问与操做的代码区域。所谓共享数据,就是可能有多个代码执行流并发地执行,并在执行中可能会同时访问的数据。缓存
同步(Synchronization)是指让两个或多个进程/线程可以按照程序员指望的方式来协调执行的顺序。好比,让A进程必须完成某个操做后,B进程才能执行。互斥(Mutual Exclusion)则是指让多个线程不可以同时访问某些数据,必需要一个进程访问完后,另外一个进程才能访问。并发
当多个进程/线程并发地执行而且访问一块数据,而且进程/线程的执行结果依赖于它们的执行顺序,咱们就称这种状况为竞争状态(Race Condition)。app
Xv6操做系统要求在内核临界区操做时中断必须关闭。若是此时中断开启,那么可能会出现如下死锁状况:A进程在内核态运行并拿下了p锁时,触发中断进入中断处理程序,中断处理程序也在内核态中请求p锁,因为锁在A进程手里,且只有A进程执行时才能释放p锁,所以中断处理程序必须返回,p锁才能被释放。那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。函数
Xv6中实现了自旋锁(Spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。显然,自旋锁看上去效率很低,咱们很容易想到更加高效的基于等待队列的方法,让等待进程陷入阻塞而不是无限循环。然而,Xv6容许同时运行多个CPU核,多核CPU上的等待队列实现至关复杂,所以使用自旋锁是相对比较简单且能正确执行的实现方案。oop
Xv6中锁的定义以下优化
// Mutual exclusion lock. struct spinlock { uint locked; // Is the lock held? // For debugging: char *name; // Name of lock. struct cpu *cpu; // The cpu holding the lock. uint pcs[10]; // The call stack (an array of program counters) // that locked the lock. };
核心的变量只有一个locked
,当locked
为1时表明锁已被占用,反之未被占用,初始值为0。ui
在调用锁以前,必须对锁进行初始化。this
void initlock(struct spinlock *lk, char *name) { lk->name = name; lk->locked = 0; lk->cpu = 0; }
最困难的地方是如何对locked
变量进行原子操做占用锁和释放锁。这两步具体被实现为acquire()
和release()
函数。(注意v7版本和v11版本的实现略有不一样,本文使用的是v11版本)atom
acquire()
函数// Acquire the lock. // Loops (spins) until the lock is acquired. // Holding a lock for a long time may cause // other CPUs to waste time spinning to acquire it. void acquire(struct spinlock *lk) { pushcli(); // disable interrupts to avoid deadlock. if(holding(lk)) panic("acquire"); // The xchg is atomic. while(xchg(&lk->locked, 1) != 0) ; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that the critical section's memory // references happen after the lock is acquired. __sync_synchronize(); // Record info about lock acquisition for debugging. lk->cpu = mycpu(); getcallerpcs(&lk, lk->pcs); }
acquire()
函数首先禁止了中断,而且使用专门的pushcli()
函数,这个函数保证了若是有两个acquire()
禁止了中断,那么也必须调用两次release()
中的popcli()
后中断才会被容许。而后,acquire()
函数采用xchg
指令来实如今设置locked
为1的同时得到其原来的值的操做。这里的C代码中封装了一个xchg()
函数,在xchg()
函数中采用GCC的内联汇编特性,实现以下
static inline uint xchg(volatile uint *addr, uint newval) { uint result; // The + in "+m" denotes a read-modify-write operand. asm volatile("lock; xchgl %0, %1" : "+m" (*addr), "=a" (result) : "1" (newval) : "cc"); return result; }
其中,volatile
标志用于避免gcc对其进行一些优化;第一个冒号后的"+m" (*addr), "=a" (result)
是这个汇编指令的两个输出值;newval
是这个汇编指令的输入值。假设newval
位于eax
寄存器中,addr
位于rax
寄存器中,那么gcc会翻译获得以下汇编指令
lock; xchgl (%rdx), %eax
因为xchg
函数是inline
的,它会被直接嵌入调用xchg
函数的代码中,使用的寄存器可能会有所不一样。
下面咱们来分析一下上面的指令的语义。·lock
是一个指令前缀,它保证了这条指令对总线和缓存的独占权,也就是这条指令的执行过程当中不会有其余CPU或同CPU内的指令访问缓存和内存。因为现代CPU通常是多发射流水线+乱序执行的,所以通常状况下并不能保证这一点。xchgl
指令是一条古老的x86指令,做用是交换两个寄存器或者内存地址里的4字节值,两个值不能都是内存地址,他不会设置条件码。
那么,仔细思考一下就能发现,以上一条xchg
指令就同时作到了交换locked和1的值,而且在以后经过检查eax
寄存器就能知道locked的值是否为0。而且,以上操做是原子的,这就保证了有且只有一个进程可以拿到locked的0值而且进入临界区。
最后,acquire()
函数使用__sync_synchronize
为了不编译器对这段代码进行指令顺序调整的话和避免CPU在这块代码采用乱序执行的优化。
release()
函数// Release the lock. void release(struct spinlock *lk) { if(!holding(lk)) panic("release"); lk->pcs[0] = 0; lk->cpu = 0; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that all the stores in the critical // section are visible to other cores before the lock is released. // Both the C compiler and the hardware may re-order loads and // stores; __sync_synchronize() tells them both not to. __sync_synchronize(); // Release the lock, equivalent to lk->locked = 0. // This code can't use a C assignment, since it might // not be atomic. A real OS would use C atomics here. asm volatile("movl $0, %0" : "+m" (lk->locked) : ); popcli(); }
release
函数为了保证设置locked为0的操做的原子性,一样使用了内联汇编。最后,使用popcli()来容许中断(或者弹出一个cli,但由于其余锁未释放使得中断依然被禁止)。
struct semaphore { int value; struct spinlock lock; struct proc *queue[NPROC]; int end; int start; }; void sem_init(struct semaphore *s, int value) { s->value = value; initlock(&s->lock, "semaphore_lock"); end = start = 0; } void sem_wait(struct semaphore *s) { acquire(&s->lock); s->value--; if (s->value < 0) { s->queue[s->end] = myproc(); s->end = (s->end + 1) % NPROC; sleep(myproc(), &s->lock) } release(&s->lock); } void sem_signal(struct semaphore *s) { acquire(&s->lock); s->value++; if (s->value <= 0) { wakeup(s->queue[s->start]); s->queue[s->start] = 0; s->start = (s->start + 1) % NPROC; } release(&s->lock); }
上面的代码使用Xv6提供的接口实现了信号量,格式和命名与POSIX标准相似。这个信号量的实现采用等待队列的方式。当一个进程因信号量陷入阻塞时,会将本身放进等待队列并睡眠(18-22行)。当一个进程释放信号量时,会从等待队列中取出一个进程继续执行(29-33行)。