Linux-workqueue讲解

=============  参考  =============node

代码:linux-3.10.65/kernel/workqueue.clinux

===============================多线程

1. workqueue 是什么?

  workqueue是对内核线程封装的用于处理各类工做项的一种处理方法, 因为处理对象是用链表拼接一个个工做项, 依次取出来处理, 而后从链表删除,就像一个队列排好队依次处理同样, 因此也称工做队列,架构

所谓封装能够简单理解一个中转站, 一边指向“合适”的内核线程, 一边接受你丢过来的工做项, 用结构体 workqueue_srtuct表示, 而所谓工做项也是个结构体 --  work_struct, 里面有个成员指针, 指向你最终要实现的函数,app

struct workqueue_struct {
    struct list_head    pwqs;        /* WR: all pwqs of this wq */
    struct list_head    list;        /* PL: list of all workqueues */

    struct workqueue_attrs    *unbound_attrs;    /* WQ: only for unbound wqs */
    struct pool_workqueue    *dfl_pwq;    /* WQ: only for unbound wqs */

    char            name[WQ_NAME_LEN]; /* I: workqueue name */

    unsigned int        flags ____cacheline_aligned; /* WQ: WQ_* flags */
    struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
    struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */
};


struct work_struct {
    atomic_long_t data;    //函数的参数
    struct list_head entry;    //挂到链表
    work_func_t func; //函数指针,指向你实现的函数功能
};

 

 固然使用者在实现本身函数功能后能够直接调用,或者经过kthread_create()把函数当作新线程的主代码, 或者add_timer添加到一个定时器延时处理, tcp

那为什么要弄个work_struct工做项先封装函数, 而后再丢到workqueue_srtuct处理呢? 这就看使用场景了, 若是是一个大函数, 处理事项比较多, 且须要重复处理, 能够单独开辟一个内核线程处理; 对延时敏感的能够用定时器; ide

若是只是简单的一个函数功能,  且函数里面有延时动做的, 就适合放到工做队列来处理了, 毕竟定时器处理的函数是在中断上下文,不能delay或者引起进程切换的API,  并且开辟一个内核线程是耗时且耗费资源的, 通常用于函数须要while(1) 不断循环处理的,函数

否则处理一次函数后退出,线程又被销毁, 简直就是浪费!ui

 

2. 怎么用?

  一个简单示例:this

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/delay.h>
#include <linux/workqueue.h>

struct workqueue_struct *workqueue_test;

struct work_struct work_test;

void work_test_func(struct work_struct *work)
{
    printk("%s()\n", __func__);

    //mdelay(1000);
    //queue_work(workqueue_test, &work_test);
}


static int test_init(void)
{
    printk("Hello,world!\n");

    /* 1. 本身建立一个workqueue, 中间参数为0,默认配置 */
    workqueue_test = alloc_workqueue("workqueue_test", 0, 0);

    /* 2. 初始化一个工做项,并添加本身实现的函数 */
    INIT_WORK(&work_test, work_test_func);

    /* 3. 将本身的工做项添加到指定的工做队列去, 同时唤醒相应线程处理 */
    queue_work(workqueue_test, &work_test);
    
    return 0;
}

static void test_exit(void)
{
    printk("Goodbye,cruel world!\n");
    destroy_workqueue(workqueue_test);
}

module_init(test_init);
module_exit(test_exit);

MODULE_AUTHOR("Vedic <FZKmxcz@163.com>");
MODULE_LICENSE("Dual BSD/GPL");
obj-m +=test.o

KDIR:=/home/fuzk/project/linux-3.10.65

COMPILER=/opt/toolchain/arm-2012.03/bin/arm-none-linux-gnueabi-
ARCH_TYPE=arm

all:
    make CROSS_COMPILE=$(COMPILER) ARCH=$(ARCH_TYPE) -C $(KDIR)  M=$(PWD)  modules

clean:
    make CROSS_COMPILE=$(COMPILER) ARCH=$(ARCH_TYPE) -C $(KDIR)  M=$(PWD)  clean
Makefile

  只需三步就能够了, 固然内核已经为咱们建立了几个工做队列, 咱们能够直接将本身的工做项挂到相应的队列便可:

  因此代码能够改成:

static int test_init(void)
{
    printk("Hello,world!\n");

    /* 2. 初始化一个工做项,并添加本身实现的函数 */
    INIT_WORK(&work_test, work_test_func);

    /* 3. 将本身的工做项添加到指定的工做队列去, 同时唤醒相应线程处理 */
    queue_work(system_wq, &work_test);
    
    return 0;
}

若是workqueue对象是
system_wq, 可使用另外一个封装函数schedule_work(&work_test)

static inline bool schedule_work(struct work_struct *work)
{
  return queue_work(system_wq, work);
}

   将本身的工做项挂到已有的工做队列须要注意的是因为这些队列是共享的, 各个驱动都有可能将本身的工做项放到同个队列, 会致使队列的项拥挤, 当有些项写的代码耗时久或者调用delay()延时特别久, 你的项将会迟迟得不到执行! 

因此早期不少驱动开发人员都是本身建立workqueue, 添加本身的work。 在Linux-2.XXX时代, 建立workqueue时会建立属于workqueue本身的内核线程, 这些线程是“私有的”, 虽然是方便了驱动开发人员, 但每一个驱动都“一言不合”就

建立workqueue致使太多线程, 严重占用系统资源和效率, 因此在Linux-3.XXX时代, 社区开发人员将workqueue和内核线程剥离! 内核会本身事先建立相应数量的线程(后面详解), 被全部驱动共享使用。  用户调用alloc_workqueue()

只是建立workqueue这个空壳, 其主要做用:

  a. 兼容Linux-2.XXX时代代码

  b. 新增flag字段代表这个workqueue的属性(普通优先级仍是高优先级等), 方便在queue_work()时寻找“合适的”线程, 由于事先建立的线程分普通优先级、高优先级、绑定CPU线程, 非绑定CPU线程等

固然这对驱动开发人员是透明的, 驱动人员只需关注调用queue_work()让线程执行本身的工做项, 至因而这个workqueue的私有线程仍是如今的共享线程, 不重要! 这样就限制了系统工做线程的暴涨, 惟一的缺点就是前面提到的, 跟别人共享会增长

本身的工做项被执行的不肯定性。 只能说各个驱动开发人员自我约束, 尽可能使得工做项函数简短快速, 若是咱们须要等本身的工做项被执行完才能处理其余事情, 能够调用flush_work() 等待work被执行完:

/**
 * flush_work - wait for a work to finish executing the last queueing instance
 * @work: the work to flush
 *
 * Wait until @work has finished execution.  @work is guaranteed to be idle
 * on return if it hasn't been requeued since flush started.
 *
 * RETURNS:
 * %true if flush_work() waited for the work to finish execution,
 * %false if it was already idle.
 */
bool flush_work(struct work_struct *work)
{
    struct wq_barrier barr;

    lock_map_acquire(&work->lockdep_map);
    lock_map_release(&work->lockdep_map);

    if (start_flush_work(work, &barr)) {
        wait_for_completion(&barr.done);
        destroy_work_on_stack(&barr.work);
        return true;
    } else {
        return false;
    }
}
EXPORT_SYMBOL_GPL(flush_work);

 

 3. 部分源码解析

  直接看最核心部分:

 
 
NR_STD_WORKER_POOLS = 2

static
int __init init_workqueues(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; //线程两种优先级: nice=0普通级; nice=-20高优先级 int i, cpu; /* make sure we have enough bits for OFFQ pool ID */ BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT)) < WORK_CPU_END * NR_STD_WORKER_POOLS); WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); wq_numa_init(); /* initialize CPU pools */ for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; for_each_cpu_worker_pool(pool, cpu) {   ------------------- a BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; pool->node = cpu_to_node(cpu); /* alloc pool ID */ mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } /* create the initial worker */ for_each_online_cpu(cpu) {             struct worker_pool *pool; for_each_cpu_worker_pool(pool, cpu) { pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(create_and_start_worker(pool) < 0); ------------- b } } /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs;      ------------------ c /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs;         ----------------- d } system_wq = alloc_workqueue("events", 0, 0);  ----------------- e system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); return 0; } early_initcall(init_workqueues);

 

a.  for_each_cpu_worker_pool

  其相关代码在:

#define for_each_cpu_worker_pool(pool, cpu)                \
    for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];        \
         (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
         (pool)++)



static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);

  从这里能够看出, 每一个CPU都有两个私有结构体 struct worker_pool , 用变量cpu_worker_pools 表示, 而这两个worker_pool最大区别就是nice赋值, 以及对worker_pool 编号

 

b. create_and_start_worker(pool)

  对online CPU 每一个worker_pool建立worker, 也即前面讲到的工做线程:

create_and_start_worker()
    -> create_worker()
        -> worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);
    -> start_worker()
        -> wake_up_process(worker->task);
        
worker_thread:
    do {
        struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);

        if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
            /* optimization path, not strictly necessary */ process_one_work(worker, work);
            if (unlikely(!list_empty(&worker->scheduled)))
                process_scheduled_works(worker);
        } else {
            move_linked_works(work, &worker->scheduled, NULL);
            process_scheduled_works(worker);
        }
    } while (keep_working(pool));
    
        -> process_one_work(worker, work)
            -> 
                worker->current_work = work;
                worker->current_func = work->func;
                worker->current_pwq = pwq;

                list_del_init(&work->entry);

                worker->current_func(work); //调用函数 如上面的work_test_func()

  每一个CPU都有两个worker_pool(普通优先级和高优先级), 而后每一个worker_pool又建立一个worker(名称格式为worker cpiid / 线程号 【H】), 并挂载到worker_pool   --> idr_replace(&pool->worker_idr, worker, worker->id);  同时worker->pool也指向worker_pool

所以, 通过a、b后架构以下:

  worker.task里经过 list_first_entry(&pool->worklist, struct work_struct, entry); 获取每一个工做项 work_struct, 并调用用户指定的函数current_func

c. unbound_std_wq_attrs

   这个变量是为了后面建立新的线程所作的一部分初始化工做

d.  ordered_wq_attrs

   这个变量也是为了后面建立新的线程所作的一部分初始化工做

  前面说过新版内核对workqueue和线程进行了剥离, 由内核控制线程的数量和属性, 咱们只介绍了普通优先级和高优先级, 其实还有bound cpu和unbound cpu属性, 即这个线程是跑在指定的CPU上仍是任意CPU, 前面介绍的因为调用

DEFINE_PER_CPU_SHARED_ALIGNED, 天然都是跟CPU走了, 而unbound_std_wq_attrs和ordered_wq_attrs天然就是为了后面建立任意CPU均可运行的线程而作的准备, 最终线程有四种类型, 指定CPU的普通线程、指定CPU的高优先级线程、任意CPU的普通线程、任意CPU的高优先级线程。
且任意CPU的线程一开始是没有建立了(只是初始化unbound_std_wq_attrs和ordered_wq_attrs), 根据驱动建立workqueue和系统负载自行决定, 因此线程的数量不会像指定CPU那样只有一个!, 最终相似以下:

 

  能够看出unbound和ordered的worker_poll不会指定CPU, 同时worker_dir链表会挂载多个worker, 另外线程的名称也有区别, 指定CPU就用所在CPU id表示, 不然用worker_pool的id表示:

      

   我奇怪为什么不使用一个unbound worker_pool, 其worker_idr挂载全部的worker就能够了, 为什么每生成一个worker就要配套一个worker_pool, 若是你知道请留言告知 谢谢~

 e. alloc_workqueue

  前面说过, 新版的alloc_workqueue()只是建立workqueue这个空壳, 不会再建立本身“私有”的线程了, 有的是如何指向“合适”的线程, 何为合适?  这取决用户在调用alloc_workqueue()传的参数, 用于告知要什么属性的线程

#define alloc_ordered_workqueue(fmt, flags, args...)            \
    alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)

#define create_workqueue(name)                        \
    alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezable_workqueue(name)                \
    alloc_workqueue((name), WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name)                \
    alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)

===========================================================================
#define alloc_workqueue(fmt, flags, max_active, args...)        \
    __alloc_workqueue_key((fmt), (flags), (max_active),        \
                  NULL, NULL, ##args)

  很显然第一个参数表示workqueue的名称, 在Linux-2.XXX也会做为本身私有线程的线程名, 命令ps还能查看获得。 第二个参数就是告知这个workqueue到时候(调用queue_work()时)要指定哪一个线程的依据, 后面参数就不解释了

咱们进一步跟踪函数__alloc_workqueue_key() 的实现(详解在注释):

struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
                           unsigned int flags,
                           int max_active,
                           struct lock_class_key *key,
                           const char *lock_name, ...)
{

    struct workqueue_struct *wq;
    struct pool_workqueue *pwq;

    /* 1. 建立workqueue_struct */
    wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);

    /* 2.1 建立pool_workqueue
     * 2.2 寻找worker_pool, 若是是bound那worker_pool已存在,直接找优先级
           若是是unbound,就建立unbound worker_pool
       2.3 若是是建立unbound worker_pool, 就顺道建立worker
       2.4 将步骤1建立的workqueue_struct 和 步骤2.1建立的pool_workqueue 和 步骤2.2的 worker_pool 串起来
       2.5 同理ordered
     */
    if (alloc_and_link_pwqs(wq) < 0)
        goto err_free_wq;

    /* 3. 将 workqueue_struc 挂载到workqueues上 */
    list_add(&wq->list, &workqueues);
}


/* ========================== 最重要的是步骤2!  继续跟踪......============================= */
        static int alloc_and_link_pwqs(struct workqueue_struct *wq)
        {
            bool highpri = wq->flags & WQ_HIGHPRI;
            int cpu, ret;

            if (!(wq->flags & WQ_UNBOUND)) {
                /* 上面2.1 建立pool_workqueue */    
                wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
                if (!wq->cpu_pwqs)
                    return -ENOMEM;

                for_each_possible_cpu(cpu) {
                    struct pool_workqueue *pwq =
                        per_cpu_ptr(wq->cpu_pwqs, cpu);
                    /* 上面2.2 寻找worker_pool, 已存在的 */
                    struct worker_pool *cpu_pools =
                        per_cpu(cpu_worker_pools, cpu);

                    /* 上面2.4 串起来 */
                    init_pwq(pwq, wq, &cpu_pools[highpri]);

                    mutex_lock(&wq->mutex);
                    link_pwq(pwq);
                    mutex_unlock(&wq->mutex);
                }
                return 0;
            } else if (wq->flags & __WQ_ORDERED) {
                ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
            } else {
                /* 上面2.1234 由于是unbound, 因此要建立, 用到以前实现初始化变量unbound_std_wq_attrs */
                return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
            }
        }



/* ===================================== 继续跟踪 apply_workqueue_attrs ========================================= */ apply_workqueue_attrs()
                    -> alloc_unbound_pwq()        -------------2.1--------- 得到 pool_workqueue
                        -> get_unbound_pool()     -------------2.2--------- 得到 worker_pool
                            -> create_and_start_worker(pool) --2.3--------- 得到 worker

  总而言之, 调用alloc_workqueue()返回workqueue_struct后, 会依次串连起 workqueue_struct -> pool_workqueue -> worker_pool -> worker, 如图:

  调个线程要透过这四个结构体大山确实蛮尴尬的, 但也是为了兼容之前, 因此这个就是目前的现状......

 

 4. 其余

 除了上面介绍的四种线程属性, 其实还有其余的, 读者能够自行查看:

enum {
    WQ_NON_REENTRANT    = 1 << 0, /* guarantee non-reentrance */
    WQ_UNBOUND        = 1 << 1, /* not bound to any cpu */
    WQ_FREEZABLE        = 1 << 2, /* freeze during suspend */
    WQ_MEM_RECLAIM        = 1 << 3, /* may be used for memory reclaim */
    WQ_HIGHPRI        = 1 << 4, /* high priority */
    WQ_CPU_INTENSIVE    = 1 << 5, /* cpu instensive workqueue */
    WQ_SYSFS        = 1 << 6, /* visible in sysfs, see wq_sysfs_register() */

    /*
     * Per-cpu workqueues are generally preferred because they tend to
     * show better performance thanks to cache locality.  Per-cpu
     * workqueues exclude the scheduler from choosing the CPU to
     * execute the worker threads, which has an unfortunate side effect
     * of increasing power consumption.
     *
     * The scheduler considers a CPU idle if it doesn't have any task
     * to execute and tries to keep idle cores idle to conserve power;
     * however, for example, a per-cpu work item scheduled from an
     * interrupt handler on an idle CPU will force the scheduler to
     * excute the work item on that CPU breaking the idleness, which in
     * turn may lead to more scheduling choices which are sub-optimal
     * in terms of power consumption.
     *
     * Workqueues marked with WQ_POWER_EFFICIENT are per-cpu by default
     * but become unbound if workqueue.power_efficient kernel param is
     * specified.  Per-cpu workqueues which are identified to
     * contribute significantly to power-consumption are identified and
     * marked with this flag and enabling the power_efficient mode
     * leads to noticeable power saving at the cost of small
     * performance disadvantage.
     *
     * http://thread.gmane.org/gmane.linux.kernel/1480396
     */
    WQ_POWER_EFFICIENT    = 1 << 7,

    __WQ_DRAINING        = 1 << 16, /* internal: workqueue is draining */
    __WQ_ORDERED        = 1 << 17, /* internal: workqueue is ordered */

    WQ_MAX_ACTIVE        = 512,      /* I like 512, better ideas? */
    WQ_MAX_UNBOUND_PER_CPU    = 4,      /* 4 * #cpus for unbound wq */
    WQ_DFL_ACTIVE        = WQ_MAX_ACTIVE / 2,
};

 

  兼容之前API接口

#define alloc_ordered_workqueue(fmt, flags, args...)            \
    alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)

#define create_workqueue(name)                        \
    alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezable_workqueue(name)                \
    alloc_workqueue((name), WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name)                \
    alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
相关文章
相关标签/搜索