Kernel常见锁的原理和实现

    锁是内核中使用最频繁,最基础的设施之一,在内核的各个模块中被大量使用。锁的本质是在并发过程当中保证资源的互斥使用。Linux内核提供了多种锁,应用的场合也各不相同,主要包括:原子操做,信号量,读写锁,自旋锁,以及RCU锁机制等。数据结构

    RCU是比读写锁更高效,而且同时支持多个读者和多个写者并发的机制,其实现很是复杂,涉及到软中断,completion机制等,将再也不本篇分析,另起一篇RCU机制和实现。架构

原子操做(atomic)并发

原子操做是实现其余各类锁的基础。考虑以下代码语句:函数

i++;优化

编译器在编译的过程当中,该语句有可能被编译成以下三条CPU指令:atom

  1. 加载内存变量i的值到寄存器线程

  2. 寄存器值+1设计

  3. 将寄存器值写回内存rest

咱们知道,一条指令在单CPU上对内存的访问是原子的(由于中断只能是在当前指令执行完以后才去检查处理),但多条指令之间并非原子的,单条指令在多处理器系统上也不是原子的。所以,上面i++语句虽然在代码编写上是一条语句,但在二进制可执行指令上倒是三条指令,若是在两个并发的例程中对同一个i变量同时执行i++操做,必然会致使数据错乱。所以,每种CPU架构都应该提供一套指令,用于锁定/解锁内存总线,使得在锁定区内执行的指令是一个原子操做。对象

以x86架构为例,提供了lock;前缀指令,用于在指令执行前先锁定特定内存,保证对特定内存的互斥访问。以atomic_add()实现为例:

#ifdef CONFIG_SMP

#define LOCK_PREFIX_HERE \

".pushsection .smp_locks,\"a\"\n" \

".balign 4\n" \

".long 671f - .\n" /* offset */ \

".popsection\n" \

"671:"

 

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

 

#else /* ! CONFIG_SMP */

#define LOCK_PREFIX_HERE ""

#define LOCK_PREFIX ""

#endif

 

static __always_inline void atomic_add(int i, atomic_t *v)

{

asm volatile(LOCK_PREFIX "addl %1,%0"

: "+m" (v->counter)

: "ir" (i));

}

atomic_add()使用内联汇编的方式实现整型变量的加法,核心是:addl %1,%0汇编指令。若是是在单CPU系统架构上调用该函数,那么这条指令是原子的。可是若是在SMP(对称多处理器)系统上,该单条指令就不是原子的。所以,该实现还增长了LOCK_PREFIX宏,用于区分是不是SMP系统:若是是SMP系统,LOCK_PREFIX宏定义中包含lock;指令前缀,不然LOCK_PREFIX为空(单CPU系统上单条指令自己就是原子的)。

 

信号量(semaphore/mutex)

信号量semaphore是一种睡眠锁,实现对多个同类资源的互斥访问,若是资源个数降为1个,就是互斥锁mutex。信号量实现原理以下:初始有n个同类资源,当某个线程获取(down操做)资源时,资源个数-1。当全部资源被分配完,此时当前线程被挂起在等待队列上,直到某个线程释放了(up操做)资源后,唤醒在等待队列上的线程从新获取资源。

信号量的数据结构简单清晰:

struct semaphore {

raw_spinlock_t lock; // 自旋锁,保护count和 wait_list 的互斥访问

unsigned int count; // 资源个数,在sema_init接口中初始化

struct list_head wait_list; // 等待队列

};

基本接口定义也清晰明了:

void sema_init(struct semaphore *sem, int val);

void down(struct semaphore *sem);

void up(struct semaphore *sem);

简单分析一下实现:

void down(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(sem->count > 0))

sem->count--; // 上锁成功

else

__down(sem); // 资源已经用完,挂起当前线程

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

void up(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(list_empty(&sem->wait_list)))

sem->count++; // 等待队列为空,count++后直接退出

else

__up(sem); // 唤醒等待队列线程

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

读写信号量(rw_sem)

    读写信号量是信号量的更细粒度实现。咱们知道,对于单个资源,不管读写,资源都是互斥的。也就是说,同时只能有一个读线程或者写线程独占资源,这种状况不是最优的。考虑以下情形:有多个读线程和一个写线程访问同一个资源。信号量实现的是全部线程的独占访问,没法实现某一时刻多个读线程同时访问资源。而读写信号量能够实现多个读线程的并发。对同一个资源访问,读写信号量实现以下机制:

    (1)同时只能有一个写者独占资源

    (2)同时能够有多个读者访问资源

    (3)读者和写者不能同时访问资源

    所以,读写信号量适用于有读者不少,写者不多的情形。kernel中提供两种实现方式:一种是平台相关的实现,使用内联汇编实现,效率很高;另一种是通用的实现方式,与平台无关,效率较低。咱们只关注基本的通用实现方式:

    struct rw_semaphore {
    __s32            count;                    //  引用计数
    raw_spinlock_t        wait_lock;    //  自旋锁,保护wait_list的互斥访问
    struct list_head    wait_list;         //  等待队列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
};

void __sched down_read(struct rw_semaphore *sem);

void __sched down_write(struct rw_semaphore *sem);

void up_read(struct rw_semaphore *sem);

void up_write(struct rw_semaphore *sem);

    说明:int类型的count设计得很巧妙,count > 0表示当前占用资源的读线程个数,count = -1表示当前有个写线程占用资源,count = 0表示当前资源空闲。

    下面分析核心实现:

    (1)读者上锁:

void __sched __down_read(struct rw_semaphore *sem)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋锁保护等待队列

    if (sem->count >= 0 && list_empty(&sem->wait_list)) {    // 资源空闲或者被其余读者占用,而且等待队列为空,当前读者能够获取锁。增长读者引用计数,释放自旋锁并退出,上锁成功。
        /* granted */
        sem->count++; 
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        goto out;
    }

    // 要么资源被写者占用(count == -1),要么等待队列不空,那么当前读者都要入队列。判断等待队列不空是为了防止读者饥饿。

    tsk = current;
    set_task_state(tsk, TASK_UNINTERRUPTIBLE);    // 优化措施:只能被其余读者/写者主动唤醒,防止被系统唤醒,增长系统开销

    /* set up my own style of waitqueue */
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_READ;
    get_task_struct(tsk);

    list_add_tail(&waiter.list, &sem->wait_list);    // 当前读者加入等待队列

    /* we don't need to touch the semaphore struct anymore */
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);    // 释放自旋锁

    /* wait to be given the lock */
    for (;;) {
        if (!waiter.task)
            break;
        schedule();    // 切换线程,调度其余读者/写者执行
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);     // for 循环是为了判断本次唤醒是否真的轮到本身运行。存在以下情形:等待队列上有多个读写线程,当写线程完成以后,会唤醒等待队列上全部线程。为了防止读写饥饿,先入队列的线程先获得执行。若是当前线程前面有个写线程获得机会执行,那么当前读线程须要再次挂起在等待队列,等待下次调度。
    }

    __set_task_state(tsk, TASK_RUNNING);    // 当前读者能够获取锁,上锁成功
 out:
    ;
}

    (2)读者解锁:

void __up_read(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋锁上锁

    if (--sem->count == 0 && !list_empty(&sem->wait_list))    // 资源空闲而且等待队列不空,此时等待队列的首个对象必定是一个写者
        sem = __rwsem_wake_one_writer(sem);    // 唤醒等待队列上的写者

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
    (3)写者上锁:

void __sched __down_write_nested(struct rw_semaphore *sem, int subclass)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋锁上锁

    /* set up my own style of waitqueue */
    tsk = current;
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_WRITE;
    list_add_tail(&waiter.list, &sem->wait_list);    // 写者先入队列

    /* wait for someone to release the lock */
    for (;;) {
        /*
         * That is the key to support write lock stealing: allows the
         * task already on CPU to get the lock soon rather than put
         * itself into sleep and waiting for system woke it or someone
         * else in the head of the wait list up.
         */
        if (sem->count == 0)    // 若是资源没被占用,跳出for循环后,退出队列,上锁成功。
            break;
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        schedule();        //  资源被占用,调度其余线程
        raw_spin_lock_irqsave(&sem->wait_lock, flags);
    }

    // 上锁成功,退出等待队列
    /* got the lock */
    sem->count = -1;
    list_del(&waiter.list);

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}

void __sched __down_write(struct rw_semaphore *sem)
{
    __down_write_nested(sem, 0);
}
    (4)写者解锁:

void __up_write(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋锁上锁

    sem->count = 0;    // 读者解锁成功,资源空闲
    if (!list_empty(&sem->wait_list))    // 等待队列不空
        sem = __rwsem_do_wake(sem, 1);    // 调度等待队列上的读者/写者运行

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
 

自旋锁(spin lock)

自旋锁设计可用于SMP系统中中断上下文的临界区保护。咱们知道,中断上下文中是不能够睡眠的(1,中断上下文要求及时处理,2中断上下文不是可调度实体 等种种缘由),所以信号量/读写锁等睡眠锁不能用于中断上下文。自旋锁设计原理是:当锁被其余内核线程CPU或其余CPU中断上下文持有时,当前CPU不是去睡眠,而是不停的空转并轮询该锁的状态,直到该锁被其余CPU释放,当前CPU获取该锁并进入临界区,执行完以后释放该锁。这也是自旋锁的名称来源:当获取不到锁时,CPU空等,不停的自旋。

根据自旋锁的设计原理,咱们知道:

(1)自旋锁保护的临界区代码执行时间要尽量短,不然其余CPU会一直忙等,浪费CPU资源。

(2)中断上下文中的自旋锁保护的临界区必定不能睡眠,不然会死锁。线程上下文中能够睡眠,只要保证能被唤醒便可。

自旋锁能够用在内核线程中,也能够用在中断上下文中。若是某个自旋锁只用于内核线程中,那么该自旋锁的实现只须要关闭内核抢占便可(对应spin_lock/spin_unlock版本)。若是该自旋锁不只用于内核线程,也会在中断上下文使用,那么该自旋锁的实现不只要关闭内核抢占,并且要禁止中断(对应spin_lock_irqsave/spin_unlock_irqsotre等版本)。

相关文章
相关标签/搜索