上半部分的中断处理有一些局限,包括:linux
下半部的任务就是执行与中断处理密切相关但中断处理程序自己不执行的工做。数组
中断处理程序会异步执行而且在最好的状况下也会锁定当前的中断线。安全
能够读一下别人的代码,看看别人怎么写的网络
能够尽可能缩短中断处理的时间,提升系统的响应能力。数据结构
在linux中经历过多种下半部机制。最先Linux提供下半部的惟一方法叫作“BH”。然而这个机制方便却不灵活,简单又有性能瓶颈。异步
而后开发者们引入了任务队列,来实现工做推后执行。而且代替了“BH”机制。不过仍是不够灵活,尤为是网络部分。ide
在2.3中,开发者引入了软中断(softirqs)和tasklet。32个软中断在全部处理器上同时执行,而相同类型的tasklet不能同时执行。函数
软中断使用的较少,而tasklet是下半部更经常使用的形式。不过tasklet是经过软中断实现的。性能
在<linux/interrunpt.h>中,有softirq_action结构this
struct softirq_action { void (*action)(struct softirq_action *); };
kernel/softirq.c中定义了一个包含有32个结构体的数组
static struct softirq_action softirq_vec[NR_SOFTIRQS]
如今只用到了9个,在<linux/interrupt.h>中有定义
enum { HI_SOFTIRQ=0, TIMER_SOFTIRQ, NET_TX_SOFTIRQ, NET_RX_SOFTIRQ, BLOCK_SOFTIRQ, BLOCK_IOPOLL_SOFTIRQ, TASKLET_SOFTIRQ, SCHED_SOFTIRQ, HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the numbering. Sigh! */ RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };
上面这些能够相似的使用这种方式来实现,softirq_handler是咱们本身定义的名字:
void softirq_handler(struct softirq_action *)
而后有,把整个结构体都传递给软中断处理程序,而不是仅仅传递数据值。若是结构体加入新的域时,无需对全部的软中断处理程序都进行变更。
一个注册的软中断必须在被标记后才会执行,被称做触发软中断。
如下几种状况,待处理的软中断会被检查和执行。
无论用什么办法唤醒,软中断都要在do_softirq()中执行。
/* 若是有待处理的软中断,do_softirq()会循环遍历每个,调用它们的处理程序 */ do_softirq() { u32 pending; pending = local_softirq_pending(); if(pending) { struct softirq_action *h; /* 重设待处理的位图 */ set_softirq_pending(0); h = softirq_vec; do { if(pending & 1) h->action(h); h++; pending >>= 1; } while(pending); } } /* 1) 用局部变量pending保存local_softirq_pending()宏的返回值,若是第n位被置1。那么第n位中断待处理 * 2) 如今待处理的软中断位图已经被保存,能够将实际软中断位图清零了 * 3) 将指针h只想softirq_vec的第一项 * 4) 若是pending的第一位被置1,则h->action(h)被调用 * 5) 指针加1,h指向第二项 * 6) pending右移1位,依次循环 * 7) 重复到最后执行结束 */
目前只有网络和SCSI直接使用软中断。此外内核定时器和tasklet都是创建在软中断上的。对于时间要求严格并能本身搞笑完成枷锁工做的应用,软中断会很好。
在编译期间,经过在<linux/interrupt.h>中定义的一个美剧类型来静态声明软中断。
内核使用从0开始的索引表示优先级,索引号越小优先级越高。
运行时经过调用open_softirq()注册软中断处理程序,两个参数:软中断的索引号和处理函数。在网络子系统中有net/coreldev.c中。这样使用
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
软中断处理程序执行时,容许响应中断,但本身不能休眠。好比当前处理器软中断被禁止,其余处理器仍然能够执行别的软中断。这意味着数据共享,因此要严格的锁机制。
软中断的意义是可扩展性,能够在多个处理器中高效运行。若是不须要扩展多处理器,那么使用tasklet吧。
经过在枚举类型的列表中添加新项以及调用open_softirq()进行注册之后,新的软中断处理程序可以运行。
raise_softirq()函数能够将一个软中断设置为挂起状态。在下次调用do_softirq()函数时投入运行。
tasklet是利用软中断实现的一种下半部机制。它的接口更简单,锁保护也要求较低。
tasklet由两类软中断表明,HI_SOFTIRQ和TASKLET_SOFTIRQ,这两个惟一的区别在于HI_SOFTIRQ先执行。
struct tasklet_struct { struct tasklet_struct *next; /* 链表中的下一个tasklet */ unsigned long state; /* tasklet的状态 */ atomic_t count; /* 引用计数器 */ void (*func)(unsigned long); /* tasklet处理函数 */ unsigned long data; /* 给tasklet处理函数的参数 */ };
func是tasklet的处理程序,data是它惟一的参数。
state只能在0、TASKLET_STATE_SCHED(被调度)和TASKLET_STATE_RUN(在运行)之间取值。
count是tasklet应用计数器,若是不是0,tasklet被禁止。只有0时才被激活。
已经调度的tasklet存放在两个单处理器数据结构中:tasklet_vec(普通tasklet)和tasklet_hi_vec(高优先级的tasklet)。
tasklet由tasklet_schedule()和tasklet_hi_schedule(),接收一个只需将tasklet_struct结构的指针做为参数。
tasklet_schedule()执行步骤:
static inline void tasklet_schedule(struct tasklet_struct *t) { if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t); } void __tasklet_schedule(struct tasklet_struct *t) { unsigned long flags; local_irq_save(flags); t->next = NULL; *__this_cpu_read(tasklet_vec.tail) = t; __this_cpu_write(tasklet_vec.tail, &(t->next)); raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_restore(flags); }
tasklet_action和tasklet_hi_action()就是tasklet_vec的处理核心:
static void tasklet_action(struct softirq_action *a) { struct tasklet_struct *list; local_irq_disable(); list = __this_cpu_read(tasklet_vec.head); __this_cpu_write(tasklet_vec.head, NULL); __this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head)); local_irq_enable(); while (list) { struct tasklet_struct *t = list; list = list->next; if (tasklet_trylock(t)) { if (!atomic_read(&t->count)) { if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); t->func(t->data); tasklet_unlock(t); continue; } tasklet_unlock(t); } local_irq_disable(); t->next = NULL; *__this_cpu_read(tasklet_vec.tail) = t; __this_cpu_write(tasklet_vec.tail, &(t->next)); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); } }
若是是静态建立,在<linux/interrupt.h>中定义了两个宏:
DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data);
根据给定的名称静态建立一个tasklet_struct结构
tasklet被调度后,会执行func函数,参数由data提供
这两个区别在于初始值不一样,致使初始状态一个运行态,一个等待
使用例子:
DECLARE_TASKLET(my_tasklet, my_tasklet_handler, dev); 等价于 struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0), my_tasklet_handler, dev}; 建立了一个my_stasklet,处理程序是my_tasklet_handler,dev是值
tasklet必须符合特定格式
void tasklet_handler(unsigned long data)
须要注意tasklet不能睡眠,不能使用信号量或者其余什么阻塞函数
两个相同tasklet不会同时运行,共享数据必须适当的锁保护
经过调度tasklet_schedule()函数并传递给他相应的tasklet_struct指针
tasklet_schedule(&my_tasklet); /* 把my_tasklet标记为挂起 */
软中断可能频繁触发致使用户空间进程没法得到足够的处理器时间,于是处饥饿状态。有两种最容易想到的方案:
1. 一种是软中断返回前,把其余的软中断都一并处理。复杂状况下,可能一直运行
2.第二种是在返回后,软中断必须等上一些时间才能运行。这样空闲时速度反而不够。
最后开发者作了一些折中,内核不会当即处理从新触发的软中断。当大量软中断出现的时候,内核会唤醒一组内核线程处理这些负载。
这些线程在最低优先级上运行(nice19),以免对重要任务资源抢夺。
每一个处理器都有这样一个线程,全部线程的名字都叫作ksoftirqd/n,区别在于n,它对应处理器的编号。
只要有空闲的处理器,ksoftirq就会调用do_softirq()去处理它们。
工做队列是另一种将工做推后执行的形式,交由一个内核线程去执行。这个下半部总会在进程上下文中执行。
区分使用tasklet、软中断和工做队列是,任务是否须要睡眠。
工做队列子系统是一个用于建立内核线程的接口,经过它建立的进程负责执行由内核其余部分排到队列里的任务。
建立这些内恶化线程称做工做者线程。工做队列可让你的驱动程序建立一个专门的工做者线程来处理须要推后的工做。
它定义在kernel/workqueue.c中,数组中的每一项对应系统中的一个处理器。每一个处理器,每一个工做者线程对应一个这样的cpu_workqueue_struct结构体
/* * 外部可见的工做队列抽象是 * 由每一个CPU的工做队列组成的数组 */ struct workqueue_struct { struct cpu_workqueue_struct cpu_wq[NR_CPUS]; struct list_head list; const char *name; int sinqlethread; int freezeable; int rt; };
cpu_workqueue_struct是kernel/workqueue.c的核心数据结构:
struct cpu_workqueue_struct { spinlock_t lock; /* 锁保护这种结构 */ struct list_head worklist; /* 工做列表 */ wait_queue_head_t more_work; struct work_struct *current_struct; struct workqueue_struct *wq; /* 关联工做队列结构 */ task_t *thread; /* 关联线程 */ };
全部的工做者线程是用普通的内核线程实现的,它们都要执行worker_thread()函数。在<linux/workqueue.h>中定义的work_struct结构体表示:
struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func; };
这些结构体被链接成链表,在每一个处理器上的每种类型的队列都对应这样一个链表。
work_thread()函数的核心流程,以下:
for ( ; ; ) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); if ( list_empty(&cwq->worklist)) schedule(); finish_wait(&cwq->more_work, &wait); run_workqueue(cwq); }
1)线程将本身设置为休眠状态(state被设成TASK_INTERRUPTIBLE),并把本身加入到等待队列中
2)若是工做链表是空的,线程调用schedule()函数进入睡眠状态
3)若是链表中有对象,线程不会睡眠。相反,它将本身设置成TASK_RUNNING,脱离等待队列。
4)若是链表非空,调用run_workqueue()函数执行被推后的工做。
while(!list_empty(&cwq->worklist)) { struct work_struct *work; work_func_t f; void *data; work = list_entry(cwq->worklist.next, struct work_struct, entry); f = work->func; list_del_init(cwq->worklist.next); work_clear_pending(work); f(work); }
1)当链表不为空时,选取下一个节点对象
2)获取咱们下网执行的函数func及其参数data
3)把该节点从链表上解下来,将待处理标志位pending清零
4)调用函数
5)重复执行
4.1.3 工做队列实现机制的总结
位于最高一层的工做者线程。系统容许有多种类型的工做者线程存在,对于指定的一个类型,系统的每一个CPU上都有要给该类的工做者线程。
内核能够根据须要来建立工做者线程,而在默认状况下内核只有event这一种类型的工做者线程。
①建立推后的工做
能够经过DECLARE_WORK在编译时静态地建立该结构体:
DECLARE_WORK(name, void(*func) (void *), void *data);
静态的建立要给名为name,处理函数为func,参数为data的work_struct结构体。
也能够在运行时经过指针建立一个工做:
INIT_WORK(struct work_struct *work, void(*func)(void *), void *data);
初始化一个由work指向的工做,处理函数为func,参数为data。
②工做队列处理函数
void work_handler(void *data)
会由一个工做者线程执行,函数会进行在进程上下文中。默认状况下,容许中断响应,而且没有任何锁。函数还能够睡眠。
须要注意的是函数不能访问用户空间,
③对工做进行调度
schedule_work(&work);
work立刻就会被调度,一旦其所在的处理器上的工做者线程被唤醒,它就会被执行。
schedule_delayed_work(&work, delay);
&work指向work_struct直到delay指定的时钟节拍用完之后才会执行。
④刷新操做
void flush_scheduled_work(void);
函数会一直等待,直到队列中多有对象都被执行之后才返回。
int cancel_delayed_work(struct work_struct *work);
这个函数能够取消任何与work_struct相关的挂起工做
⑤建立新的工做队列
struct workqueue_struct *create_workqueue(const char *name);
建立一个新的任务队列和与之相关的工做者线程。name参数用于该内核线程的命名,好比缺省的events队列的建立就是调用:
struct workqueue_struct *keventd_wq;
keventd_wq = create_workqueue("events");
建立一个工做的时候无需考虑工做队列的类型。
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)
最后你能够调用下面的函数刷新指定的工做队列:
flush_workqueue(struct workqueue_struct *wq);
#include <linux/kernel.h> #include <linux/module.h> #include <linux/workqueue.h> #include <linux/slab.h> static struct workqueue_struct *my_wq; typedef struct { struct work_struct work; int x; }my_work_t; my_work_t *work, *work2; static void my_wq_func(struct work_struct *work) { my_work_t *my_work = (my_work_t *)work; printk("my_work.x %d\n", my_work->x); } int init_module(void) { int ret; my_wq = create_workqueue("my_queue"); //建立一个工做队列 if(my_wq) { work = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL); if(work) { INIT_WORK((struct work_struct *)work, my_wq_func); work->x = 1; ret = queue_work(my_wq, (struct work_struct *)work); } work2 = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL); if(work2) { INIT_WORK((struct work_struct *)work2, my_wq_func); work2->x = 2; ret = queue_work(my_wq, (struct work_struct *)work2); } } return 0; } void cleanup_module(void) { flush_workqueue(my_wq); destroy_workqueue(my_wq); } MODULE_AUTHOR("skldjkas"); MODULE_DESCRIPTION(" work queue of buttom half test"); MODULE_LICENSE("GPL v2");
软中断提供的执行序列化保障最少,要求软中断处理函数必须格外当心地采起一些步骤确保共享数据的安全。
若是代码多线索化考虑得不充分,那么tasklet意义更大。 接口简单,并且两种同类型的tasklet不能同时执行
若是你须要吧任务推后到进程上下文中完成,那么只能选择工做队列了。
6、在下半部之间加锁
7、禁止下半部