下半部和下半部执行的工做--工做队列

工做队列(work queue)是另一种将工做推后执行的形式.他和其余形式都不相同.工做队列能够把工做推后,交由一个内核线程去执行,这个下半部分老是会在进程上下文中去执行.这样,经过工做队列执行的代码能占尽进程上下文的全部优点.最重要的是工做队列能容许从新调度甚至是休眠.css

一般,在工做队列和软中断/tasklet中作出选择很是容易.若是推后执行的任务须要睡眠,那么就选择工做队列,若是不须要睡眠,就选择软中断或者是tasklet.实际上,工做队列可使用内核线程特换,可是使用内核线程可能会出现一些问题,因此尽可能使用工做队列.linux

若是须要一个能够从新调度的实体来执行你的下半部处理,你应该使用工做队列,他是惟一能在进程上下文中运行的下半部实现机制,也只有他才能够睡眠.若是不须要用一个内核线程来推后执行工做,那么就考虑使用tasklet.数组

(一):工做队列的实现安全

工做队列子系统是一个用于建立内核线程的接口.经过他建立的进程负责执行由内核其余部分排到队列里的任务.他建立的这些内核线程称为工做者线程.工做队队列可让你的驱动程序建立一个专门的工做者线程来处理须要推后的工做.不过,工做队列子系统提供了一个缺省的工做者线程来处理这些工做.所以,工做队列最基本的表现形式,就转变成了一个把须要推后执行的任务交给特定的通用线程的这样一种接口.markdown

缺省的工做者线程叫作events/n,这里n是处理器编号,每个处理器对应一个线程.例如,单处理器的系统只有events/0这样一个线程,而双处理器系统就会多一个events/1线程.缺省的工做者线程会从多个地方获得被推后的工做.许多内核驱动程序都把下半部交给缺省的工做者线程去作.除非一个驱动程序或者是子系统必须创建一个属于他本身的内核线程,不然最好使用缺省线程.数据结构

1:表示线程的数据结构异步

工做者线程使用workqueue_struct结构表示:ide

/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: * * 外部可见的工做队列抽象是每一个CPU工做队列组成的数组 */
struct workqueue_struct {
    struct cpu_workqueue_struct *cpu_wq;
    struct list_head list;
    const char *name;
    int singlethread;
    int freezeable;     /* Freeze threads during suspend */
    int rt;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

该结构内部是一个由cpu_workqueue_struct结构组成的数组,它定义在kernel/workqueue.c中,数组中的每一项对应系统中的每个处理器.因为系统中每一个处理器对应一个工做者线程,因此对于给定的某台计算机来讲,就是每一个处理器,每一个工做者线程对应一个这样的cpu_workqueue_struct结构体.cpu_qorkqueue_struct是kernel/qorkqueue.c中的核心数据结构.函数

/* * The per-CPU workqueue (if single thread, we always use the first * possible cpu). */
struct cpu_workqueue_struct {
    spinlock_t lock; //锁保护这种结构
    struct list_head worklist;  //工做列表
    wait_queue_head_t more_work;
    struct work_struct *current_work;  
    struct workqueue_struct *wq; //关联工做队列结构
    struct task_struct *thread;   //关联线程
} ____cacheline_aligned;

注意,每一个工做者线程类型关联一个本身的workqueue_struct.在该结构体里面,给每一个线程分配一个cpu+qorkqueue_struct,于是也就是给每一个处理器分配一个.由于每一个处理器都有一个该类型的工做者线程.ui

2:表示工做的数据结构

全部的工做者线程都是用普通的内核线程实现的,他们都要执行worker_thread()函数.在他执行完之后,这个函数执行一个死循环并开始休眠.当有工做被插入到队列里面的时候,线程就会被唤醒,以便执行这些操做.当没有剩余的操做的时候,他又会继续休眠.

工做用linux/workqueue.h中定义的work_struct结构体表示:

struct work_struct {
    atomic_long_t data;
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_STATIC 1 /* static initializer (debugobjects) */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
    struct list_head entry;
    work_func_t func;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

这些结构体被链接成链表,在每一个处理器上的每种类型的队列都对应这样一个链表.好比,每一个处理器上用于执行被推后的工做的那个通用线程就有这样的一个链表.当一个工做者线程被唤醒的时候,他会执行他的链表上的全部工做,工做被执行完毕,他就将相应的work_struct对象从链表中移走.当链表上不在有对象的时候,他就会继续休眠.

咱们看一下worker_thread的执行流程,以下:

static int worker_thread(void *__cwq)
{
    struct cpu_workqueue_struct *cwq = __cwq;
    DEFINE_WAIT(wait);
    if (cwq->wq->freezeable)
        set_freezable();
    for (;;) {
        prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
        if (!freezing(current) &&
            !kthread_should_stop() &&
            list_empty(&cwq->worklist))
            schedule();
        finish_wait(&cwq->more_work, &wait);
        try_to_freeze();
        if (kthread_should_stop())
            break;
        run_workqueue(cwq);
    }
    return 0;
}

该函数在死循环中完成了如下功能:

1:线程将本身设置为休眠状态(state被设成TASK_INTERRUPTIBLE),并把本身加入到等待队列中

2:若是工做链是空的,则线程调用schedule()函数进入休眠状态

3:若是链表中有对象,线程不会睡眠.相反,他将本身设置成
TAKS_RUNNING,脱离等待队列.

4:若是链表非空,调用run_workqueue()函数执行被推后的工做.

下一步,由run_workqueu()函数来完成推后执行的工做:

static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
    spin_lock_irq(&cwq->lock);
    while (!list_empty(&cwq->worklist)) {
        struct work_struct *work = list_entry(cwq->worklist.next,
                        struct work_struct, entry);
        work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
        /*
         * It is permissible to free the struct work_struct
         * from inside the function that is called from it,
         * this we need to take into account for lockdep too.
         * To avoid bogus "held lock freed" warnings as well
         * as problems when looking into work->lockdep_map,
         * make a copy and use that here.
         */
        struct lockdep_map lockdep_map = work->lockdep_map;
#endif
        trace_workqueue_execution(cwq->thread, work);
        debug_work_deactivate(work);
        cwq->current_work = work;
        list_del_init(cwq->worklist.next);
        spin_unlock_irq(&cwq->lock);
        BUG_ON(get_wq_data(work) != cwq);
        work_clear_pending(work);
        lock_map_acquire(&cwq->wq->lockdep_map);
        lock_map_acquire(&lockdep_map);
        f(work);
        lock_map_release(&lockdep_map);
        lock_map_release(&cwq->wq->lockdep_map);
        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
            printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
                    "%s/0x%08x/%d\n",
                    current->comm, preempt_count(),
                        task_pid_nr(current));
            printk(KERN_ERR " last function: ");
            print_symbol("%s\n", (unsigned long)f);
            debug_show_held_locks(current);
            dump_stack();
        }
        spin_lock_irq(&cwq->lock);
        cwq->current_work = NULL;
    }
    spin_unlock_irq(&cwq->lock);
}

该函数循环遍历连表上每一个待处理的工做,执行链表每一个节点上的workqueue_struct中的func成员函数:
1:当链表不为空的时候,选取下一个链表对象
2:获取咱们但愿执行的函数func及其参数data
3:把该节点从链表上解下来,将待处理标志位pending清零
4:调用函数
5:重复执行

3:工做队列实现工做的总结

下面一个图展现了这些数据结构之间的关系:

这里写图片描述

位于最高一层的是工做者线程.系统中容许有多种类型的工做者线程存在.对于指定的一个类型,系统上每一个CPU上都有一个该类的工做者线程.内核中有些部分能够根据须要来建立工做者线程,而在默认状况下,内核只有event这一种类型的工做者线程.每个工做者线程都由一个cpu_workqueue_struct结构体表示.而workqueue_struct结构体则表示给定类型的全部工做者线程.

例如,除系统默认的通用events工做者类型以外,咱们本身加了一种falcon工做者类型,而且使用的是一个拥有四个处理器的计算机.那么,系统中如今有四个events类型的线程(于是也有四个cpu_workqueue_struct结构体)和另外四个falcon类型的线程(另外还有四个cpu_qorkqueue_struct结构体).同时,有一个对应的event类型的workqueue_struct和一个对应的falcon类型的workqueue_struct.

你的驱动程序建立这些须要推后执行的工做.他们用work_struct来表示.这个结构体中最重要的是一个指针,他指向一个函数,而正是该函数负责处理须要推后执行的具体任务.工做会被提交给某个具体的工做者线程.

默认状况下,如今大部分驱动程序都是使用的默认工做者线程.

(二):使用工做队列

首先咱们先看一下如何使用缺省的工做队列.

1:建立推后的工做

首先须要作的就是实际建立一些须要推后完成的工做.能够经过DECLARE_WORK在编译的时候静态的建立该结构体:

DECLARE_WORK(name,void(*func)(void *),void *data);

这样就会静态的建立一个名为name,处理函数为func,参数为data的work_struct结构体.一样,也能够在运行的时候经过指针建立一个工做.

INIT_WORK(struct work_struct *work,void(*func)(void *),void *data);

这会动态的初始化一个由work指向的工做,处理函数为func,参数为data.

2:工做队列处理函数

工做队列处理函数的原型为:

void work_handler(void *data);

这个函数会由一个工做者线程执行,所以,函数会运行在进程上下文中.默认状况下,容许相应中断,而且不持有任何锁.若是须要,函数能够睡眠.须要注意的是,尽管操做处理函数运行在进程上下文中,但他不能访问用户空间,由于内核线程在用户空间没有相关的内存映射.一般在发生系统调用的时候,内核会表明用户空间的进程运行,此时才能访问用户空间.也只有此时他才会映射用户空间的内存.

在工做队列和内核其余部分之间使用锁机制就像在其余的进程上下文中使用锁机制同样方便.

3:对工做进行调度

如今工做已经被建立,咱们能够调度他了.想要把给定工做的处理函数提交给缺省的events工做线程,只需调用:

schedule_work(&work);

work立刻就会被调度,一旦其所在的工做者线程被唤醒,他就会被执行.

若是想要他等待必定的时间以后再运行,那么就能够调度他在指定时间执行:

schedule_delayed_work(&work,delay);

此时,&work指向的work_struct直到delayz指定的时钟节拍用完后才会执行.

4:刷新操做

排入队列的工做会在工做者线程下一次被唤醒的时候执行.有时,在继续下一步工做以前,你必须保证一些操做已经执行完毕了.这一点对于模块来讲就很重要,在卸载以前,他就有可能须要调用下面的函数.而在内核的其余部分,为了防止竞争条件的出现,也可能须要确保再也不有待处理的工做.

出于以上目的,内核准备了一个用于刷新指定工做队列的函数:

void flush_scheduled_work(void);

这个函数能够取消任何与work_struct相关的挂起工做.

5: 建立新的工做队列

若是缺省的队列不能知足你的要求,你应该建立一个新的工做队列和与之相应的工做者线程.建立一个新的任务队列和与之相关的工做者线程,你只须要调用一个简单的函数:

struct workqueue_struct *create_workqueue(const char *name);

name参数用于该内核线程的命名.好比缺省的events队列的建立就调用的是:

truct workqueue_struct *keventd_wq;
kevent_wq = create_workqueue("events");

这样就会建立全部的工做者线程(系统的每一个处理器都会有一个),而且作好全部开始处理工做以前的准备工做.

建立一个新的工做的时候,无需考虑工做队列的类型.在建立以后,能够调用下面列举的函数.这些函数于schedule_work()和schedule_delayed_work()相近,惟一的区别就在于他们针对给定的工做队列而不是缺省的events队列进行操做.

int queue_work(struct workqueue_struct *wq,struct work_struct *work)
int queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work,unsigned long delay)

最后,调用下面的函数刷新指定的工做队列.

flush_workqueue(struct workqueue_struct *wq);

该函数与flush_scheduled_work()做用相同,只是他在返回前等待清空的是给定的队列.

5:下半部机制的选择

下面是三种下半部接口的比较:

这里写图片描述

对于通常的驱动开发来讲,能够考虑推后执行的工做,需不须要休眠,若是须要休眠,则使用工做队列;若是不须要,最好使用tasklet.

6:在下半部之间加锁

使用tasklet的一个好处在于,他本身负责执行的序列化保障:两个相同类型的tasklet不容许同时执行,即便在不一样的处理器上也不行.tasklet之间的同步(也就是两个不一样类型的tasklet
共享同一数据的时候)须要正确使用锁机制.

若是一个进程上下文和一个下半部共享数据,在访问这些数据以前,你须要禁止下半部的处理并获得锁的使用权.作这些是为了本地和SMP的保护,而且防止死锁的出现.

若是一个中断上下文和一个下半部共享数据,在访问数据以前,你须要禁止中断并获得锁的使用权.所作的这些也是为了本地和SMP的保护而且防止死锁的出现.

全部在工做队列中被共享的数据也须要使用锁机制.

7:禁止下半部

通常单纯禁止下半部的处理是不够的.为了保证共享数据的安全,更常见的作法是,先获得一个锁而后再禁止下半部的处理.驱动程序中一般使用的都是这种方法.

若是须要禁止全部的下半部处理(就是全部的软中断和全部的tasklet),能够调用local_bh_disable()函数.容许下半部进行处理,能够调用local_bh_enable()函数.
函数经过preempt_count为每一个进程维护一个计数器,当计数为0的时候,下半部才能被处理.由于下半部的处理已经被禁止,因此local_bh_enable()还须要检查全部
现存的待处理的下半部并执行他们.

这些函数并不能禁止工做队列的执行,由于工做队列是在进程上下文中运行的.不会涉及异步执行的问题,因此也就没有必要禁止他们执行.

相关文章
相关标签/搜索