配套的代码能够从本号的github下载,github.com/shuningzhan… 文本有些图片来自网络,在此表示感谢,若有侵权请联系删除。linux
工做队列是一种将工做交给其它线程执行的机制。也就是当线程A指望作某件事,但本身由不想作,或者不能作的状况下,它能够将该事情(工做 work)加入到一个队列当中,而后有后台线程会从队列中获取该工做,并执行该工做。 这里的的其它线程能够本身建立,也能够不用本身建立。由于,在操做系统起来的时候在每一个CPU上都建立了一组工做线程,并建立了默认工做队列。如图经过ps命令能够看到内核建立的工做线程。 git
因为Linux内核默认为咱们作了不少工做,所以在常规状况下工做队列的使用很是简单。咱们这里先看一下最简单状况下如何使用工做队列机制。本文的介绍从4个方面进行,分别以下:github
在具体使用以前,咱们先了解一下提供给咱们的接口有那些。首先咱们看一下涉及到的数据结构,了解了数据结构,才能比较容易的理解如何使用工做队列。从使用层面上来讲,咱们主要关注以下数据结构,这个数据结构表明一项工做。bash
typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data; /* 内核内部使用 */
struct list_head entry; /* 用于连接到工做队列中*/
work_func_t func; /* 工做函数*/
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
复制代码
这里有一点须要说明的是在结构体中有一个函数指针成员,这个是执行具体的工做的实现。Linux内核将工做队列设计为一个通用的机制。网络
工做队列是应用很灵活,咱们能够定义本身的工做队列,或者使用操做系统内核预约义的工做队列。这里咱们先给出一个最简单的工做队列的实例,这个实例借用内核预约义的工做队列。在这个示例中,咱们启动了一个线程,而后定时将工做放入工做队列中。工做队列接收到任务后执行该任务。任务的具体内容也很简单,只是打印一个消息。数据结构
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/mm.h>
#include <linux/in.h>
#include <linux/inet.h>
#include <linux/socket.h>
#include <net/sock.h>
#include <linux/kthread.h>
#include <linux/sched.h>
#include <linux/workqueue.h>
#define BUF_SIZE 1024
struct task_struct *main_task;
/* 定义本身的工做结构体,用于描述工做,
* 这里须要包含系统提供的work_struct结构体
* 做为其第一个成员。 */
struct my_work {
struct work_struct w;
int data;
};
/* 实例化咱们须要作的工做 */
static struct my_work real_work;
static inline void sleep(unsigned sec)
{
__set_current_state(TASK_INTERRUPTIBLE);
schedule_timeout(sec * HZ);
}
/* 工做函数,上述工做的具体工做由该函数完成,这里
* 只是一个简单的示例,仅仅打印一行文本 ,实际上
* 能够作不少事情。*/
static void my_work_func(struct work_struct *work)
{
struct my_work *pwork;
/* 这里使用了一个系统函数,用于根据成员的指针
* 得到父结构体的指针。 */
pwork = container_of(work, struct my_work, w);
printk(KERN_NOTICE "Do something %d\n", pwork->data);
}
/* 做为独立的线程,每隔1秒对工做数据进行调整,并加入
* 到工做队列中。 */
static int multhread_server(void *data)
{
int index = 0;
/* 初始化一个工做,关键是初始化该工做的执行函数 */
INIT_WORK(&real_work.w, my_work_func);
while (!kthread_should_stop()) {
printk(KERN_NOTICE "server run %d\n", index);
real_work.data = index;
/* 调度工做,本质是将工做放入工做队列当中。 */
if (schedule_work(&real_work.w) == 0) {
printk(KERN_NOTICE "Schedule work failed!\n");
}
index ++;
sleep(1);
}
return 0;
}
static int multhread_init(void)
{
ssize_t ret = 0;
printk("Hello, workqueue \n");
main_task = kthread_run(multhread_server,
NULL,
"multhread_server");
if (IS_ERR(main_task)) {
ret = PTR_ERR(main_task);
goto failed;
}
failed:
return ret;
}
static void multhread_exit(void)
{
printk("Bye!\n");
kthread_stop(main_task);
}
module_init(multhread_init);
module_exit(multhread_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("SunnyZhang<shuningzhang@126.com>");
复制代码
下面这张图是借用的魅族内核团队博客的,这张图很是清晰的解释了工做队列的架构和数据走向。 架构
实际上咱们能够将工做队列理解为一个计算集群,工做就是任务,咱们将工做提交给工做队列至关于将任务提交给集群。工做队列再将任务根据负载状况分配给具体的工人执行(工做队列线程)。socket
前面介绍的工做队列是借用的内核预建立的线程池,这个是全部人公用的。若是在负载较大的状况下可能会影响任务执行的效率。内核提供了另外的加强功能,用户能够本身建立线程池,这样就能够用独立的线程池处理任务,从而保证任务执行效率。下面这个函数用来建立一个函数
#define create_workqueue(name) \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name) \
alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
复制代码
这两个宏都会返回一个workqueue_struct结构体的指针,而且都会建立进程(“内核线程”)来执行加入到这个workqueue的work。 create_workqueue:多核CPU,这个宏,会在每一个CPU上建立一个专用线程。 create_singlethread_workqueue:单核仍是多核,都只在其中一个CPU上建立线程。 核心实如今函数alloc_workqueue和alloc_ordered_workqueue中,咱们之前者为例进行介绍。该函数也是一个宏定义,具体定义以下,这里并无作实质性的工做,是另一个宏定义。学习
#ifdef CONFIG_LOCKDEP
#define alloc_workqueue(fmt, flags, max_active, args...) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
\
__lock_name = #fmt#args; \
\
__alloc_workqueue_key((fmt), (flags), (max_active), \
&__key, __lock_name, ##args); \
})
#else
#define alloc_workqueue(fmt, flags, max_active, args...) \
、、
__alloc_workqueue_key((fmt), (flags), (max_active), \
NULL, NULL, ##args)
#endif
复制代码
咱们在进一步看__alloc_workqueue_key函数的定义,这里删除了其它冗余的内容,从函数定义能够看出这里主要是建立工做队列结构体和启动了独立的线程。该函数最终返回建立的workqueue_struct
结构体,然后面就能够向该队列发送工做了。
struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name, ...)
{
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
... ...
if (flags & WQ_MEM_RECLAIM) {
struct worker *rescuer;
rescuer = alloc_worker(NUMA_NO_NODE);
if (!rescuer)
goto err_destroy;
rescuer->rescue_wq = wq;
/*其实这个核心就是建立一个独立的线程*/
rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
wq->name);
if (IS_ERR(rescuer->task)) {
kfree(rescuer);
goto err_destroy;
}
wq->rescuer = rescuer;
rescuer->task->flags |= PF_NO_SETAFFINITY;
wake_up_process(rescuer->task);
}
... ...
}
复制代码
发送工做的函数定义以下,能够看出来这里有2个参数,分别是目的工做队列和但愿完成的工做。
bool queue_work(struct workqueue_struct *wq,struct work_struct *work);
bool queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork,
unsigned long delay);
复制代码
最后咱们把工做队列涉及到的接口贴到下面,方便你们学习查找。