一种基于优先权的低层次抢占式调度器源码分析

前言

调度器(scheduler)是计算机操做系统内核中对进程/线程分配CPU计算资源的重要模块react

调度器功能

在桌面机,嵌入式设备或大型机等不一样的应用环境中,产生了不一样类型资源分配策略的调度器,针对不一样层面分为如下几种调度器算法

  • 高级调度器(long-term scheduler): 一般用于面向批处理的多程序环境中,经过均衡内存中的任务以优化系统资源,包括CPU,内存和硬盘存储等。
  • 中级调度器(medium-term scheduler): 跟踪CPU上执行的进程/线程的动态内存用量来决定是否要增长或减小“多通道程度“(内存中竞争CPU的进程个数),以防止”颠簸“。颠簸即当前的进程集合的内存需求超过了系统容量,致使进程在各自的执行中变得缓慢。
  • 低级调度器(short-term scheduler): 负责在当前驻留的内存里的进程/线程中选择一个来运行。这篇内容设计的算法主要关注的都是低级调度器

在以前的项目工做中研究了一种部署在Xilinx MicroBlaze 嵌入式平台中的低级调度器,计划按如下四个部分介绍该调度器的具体实现和改进实验编程

  1. 调度器源码分析
  2. 基于VEGA织女星NXP-RVM1-Series RI5CY平台的调度器移植实验
  3. 基于中断时间片的调度器改进实验
  4. 调度器性能分析

本篇文章为第一部分将分析该调度器的源码实现。 在低级调度器中,通常划分为:非抢占式抢占式。非抢占式的调度器中,一个进程/线程或采用执行到底策略,或主动释放资源放弃占用处理器时间以处理I/O请求,调度器只负责安排进程/线程执行顺序;而抢占式调度器则从主动从当前线程手中把处理器计算资源抢走,并交给另外一个进程/线程使用。无论是哪一类调度器都会按照如下四项开展调度工做:数组

  • 得到处理器的控制权
  • 保存当前正在运行的进程/线程状态(PCB,process control block)
  • 选择一个新的进程/线程来执行
  • 提交新选择出来的进程/线程给处理器运行,将该进程/线程的PCB状态加载到CPU寄存器中

进程/线程状态包含了程序执行中的关键信息,例如当前程序执行位置(PC指针),过程调用(sub-routine)返回地址,程序在存储空间的现场(context,程序上下文)等等,是调度器执行调度任务的重要信息部分。缓存

源码分析

基本结构

调度器由如下6个基本类组成安全

类名 功能说明
thread_dt 装载任务线程thread的运行状态(PCB)
threadext_dt 指定任务线程thread的栈(stack)空间,管理线程函数的函数指针及参数
contexqueue 管理PCB线程池的单向链表队列
thread_lib 调度器的核心类,包含调度器线程管理的全部方法和线程资源池
event_dt 线程事件的实现类,包含事件的实现方法及操做函数
semaphore_dt 信号量旗语的实现类,包含信号量的实现及操做方法

class thread_lib是整个调度器的核心类,下图为整个调度器的组织结构bash

调度器结构关系

代码分析

  • thread_dt类

class thread_dt是上文所提到的PCB内容的容器,实现以下:多线程

class thread_dt
{
public:
#ifdef WIN32
    LPVOID thread_fiber;
#else
    uint32 sp;             //栈指针
    union
    {
        uint32 reg[15];    //部分CPU通用寄存器保存
        struct
        {
            uint32 r15;    //sub-routine结束后的返回地址, 在microblaze体系中,r15用于保存返回地址
            uint32 r17;
            uint32 r19;
            uint32 r20;
            uint32 r21;
            uint32 r22;
            uint32 r23;
            uint32 r24;
            uint32 r25;
            uint32 r26;
            uint32 r27;
            uint32 r28;
            uint32 r29;
            uint32 r30;
            uint32 r31;
        };
    };
#endif
    threadext_dt *extdat;    //线程任务的操做类指针
    thread_dt *next;         //线程池链表的尾指针
       uint32 priority;         //任务的优先级
#ifdef PROBE_LOG
    uint32 count;
    char funcname[12];
#endif
};

class thread_dt保存了线程运行的全部关键现场信息,包括如下5部分架构

  1. 部分CPU通用寄存器(GPR, General Purpose Register)的当前状态
  2. 程序计数器的当前指令执行地址,用与产生sub-routine结束后的返回地址
  3. 栈(Stack)指针
  4. 线程任务的操做内容对象指针
  5. 线程任务优先级

经过上述五部分所描述的线程上下文现场信息,处理器能够切换到指定sub-routine执行新的任务。一般寄存器文件信息只需保存两类寄存器做为关键现场信息:由Saving RegistersTemporry Registersapp

microbalze寄存器功能描述

按上图给出的定义因此对线程执行现场的保存须要保存r15,r17,r19-r31等15个必要GPR寄存器

  • threadext_dt类

class threadext_dt是线程任务函数的容器,其实现代码以下:

class threadext_dt
{
public:
    static const uint32 thread_stack_size = 520284; //Near 512k. Not exactly 512k to avoid cache collision.  栈空间大小
#ifndef WIN32
    uint8 astack[thread_stack_size];  //栈空间
#endif
    thread_fn thread_start;           //线程任务函数指针
    void *thread_arg;                 //线程任务函数参数
};

class threadext_dt 保存的内容主要有三点:

  1. 线程的栈空间内容
  2. 任务线程函数指针,该指针在具体的线程任务建立时须要和实际的执行函数地址绑定,所支持的线程函数格式要求以下:
    void (*thread_fn)(void *arg)
  3. 任务线程函数的参数列表指针

线程空间所指定的栈空间大小比cache的物理尺寸(512KB)少一个内存页框(page frame, 4KB)的大小。这种设计目的在于当cache基于4KB大小的cacheline作刷新时,cache经过DataBus总线访问主存的最大单次数据传输宽度为256bit*16等于4KB。当cache miss时,cache可直接使用这个多余的空cacheline从主存调入新块,并更新CAM表映射,上述状况可在Data Cache Miss的数据量小于一个物理页范围时减小一次Cache Line的Write Back操做,从而减小Cache Miss带来的访存延迟惩罚(penalty)。因为上述线程的保存并无利用线程任务的PID,所以在线程任务在切换时,前面线程存放于cache的数据对当前线程均miss,所以在存储切换时可能有较大因为cache warm up所带来的访存颠簸。

  • contextqueue类

class contexqueue实现了对线程池的管理功能, 实现以下:

class contextqueue
{
private:
    thread_dt* volatile head;
    thread_dt* volatile * volatile tail;

public:
    inline void init()
    {
        head = NULL;
        tail = &head;
    }

    inline thread_dt* volatile current()
    {
        return head;
    }

    inline void insert(thread_dt* c)
    {
        c->next = NULL;
        *tail = c;
        tail = &c->next;
    }

    inline void inserthead(thread_dt* c)
    {
        c->next = head;
        if(head == NULL)
        {
            tail = &c->next;
        }
        head = c;
    }

    inline void batchinsert(contextqueue &addqueue)
    {
        ASSERT(addqueue.head != NULL,LEVEL_NORMAL);
        *tail = addqueue.head;
        tail = addqueue.tail;
    }

    inline void remove()
    {
        if ((head = head->next) == NULL)
        {
            tail = &head;
        }
    }

    inline void removeall()
    {
        init();
    }

    inline void rotate()
    {
        ASSERT(head != NULL,LEVEL_NORMAL);
        *tail = head;
        tail = &head->next;
        head = head->next;
        *tail = NULL;
    }
};

上述线程池为单向链表结构,并提供了8种资源管理方法

函数名 功能简介
init() 线程池的初始化方法
current() 返回线程池链表的头部元素指针
insert() 在线程池链表尾部插入新的线程元素
inserthead() 在线程池链表的头部插入新的线程元素
batchinsert() 在线程池链表的尾部插入新的线程池链表
remove() 删除线程池链表的头部元素
removeall() 删除整个线程池链表(从新初始化线程池)
retate() 线程池的初始化方法
init() 线程池的初始化方法

线程池init方法

init

线程池的初始化方法经过首尾指针构造了一个以class thread_dt对象为元素的空白单向链表结构,头部指针指向一个NULL对象,尾部的二级指针指向头部指针地址的位置。

线程池insert方法

insert

线程池对象的插入方法将原线程池尾部元素的后驱指针链接到新元素,移动尾部指针的指向新元素的后驱指针位置,尾部元素的后驱指针须要指向NULL

线程池inserthead方法

inserthead

线程池的头部插入方法将新元素的后驱指针指向原链表的头部,特别在空表插入状况下,须要将尾部指针定位到新元素的后驱指针地址位置,最后更新头部指针指向新元素便可

线程池batchinsert方法

batchinsert

对线程池尾部插入一个线程链表需将尾指针指向新链表的头部元素,同时移动尾指针执行新链表的尾部元素的后驱指针

线程池remove方法

remove

线程池链表元素的删除老是删除头部元素,当删除后为非空链表,只需将头部指针移向原头部的后续元素,当出现删空时,则还须要将尾部指针也指向原头部后驱指针的地址位置

线程池removeall方法

等同于从新初始化线程池

线程池rotate方法

rotate

线程池的元素旋转方法是调度器的调度策略的重要操做,目的使原链表的头部元素被替换到尾部,从而实现round-robin的模式,操做时首先将尾部元素的后续指针指向头部元素,更新尾指针指向原头部元素的后驱指针地址,而后将头部指针指向原头部元素的后驱元素造成新的头部元素(应注意,旋转方法操做时应确保链表中很多于两个线程元素,本调度器在流程机制上保证了这项条件), 最后将原头部元素的后驱指针指向NULL完成旋转操做

  • thread_lib类

class thread_lib 是调度器的核心代码,主要实现以下:

class thread_lib
{
public:
    static const uint32 high_priority = 0;
    static const uint32 normal_priority = 1;
    static const uint32 low_priority = 2;
    static const uint32 childthreadintetris=lunsizeintetris*threadcountinlun;
#ifdef PROBE_LOG
    uint32 lasttick;
#endif
private:
    static const uint32 maxthread = tetrissizelimitinsystem*(1u+childthreadintetris);
    static thread_lib instance;
    static threadext_dt extcontext[maxthread];
    thread_dt availablecontext[maxthread];
    contextqueue ready_queue[3];//0:high priority   1:normal priority  2:low priority
    thread_dt main_thread;
    contextqueue spare_queue;
    thread_dt *current;
public:
    static contextqueue& get_readyqueue(uint32 priority)

    static contextqueue& get_currentreadyqueue()

    static void reschedule();

    static inline void init()

    static inline thread_dt* getcurrentcontext()

    static inline void __yield()

    static inline void yield()

    static inline void __lowpriorityyield()

    static inline void lowpriorityyield()

    static inline void sleep(uint32 Millisecond = 250)

    static inline void threadexit()

    static inline void reset_threadpool()

    //create_thread should only be called in thread context. Not in interrupt/dpc context.
#ifdef PROBE_LOG
    static void create_thread(const char* funcname,thread_fn thread_start,void* parg, uint32 priority);
#else
    static void create_thread(thread_fn thread_start,void* parg, uint32 priority);
#endif

#ifdef WIN32
    static VOID CALLBACK run_thread(LPVOID pcontext);
#else
    static void run_thread();
#endif

#ifdef PROBE_LOG
    static inline void thread_printf(void);
#endif

};

class thread_lib提供了调度器工做所必要的成员变量和调度方法

调度器成员变量的简要说明

变量名 功能说明
lasttick 调度器上一次读取的timer计数,表明某时刻系统累积的时间计数,相似Linux的jiffies概念
maxthread 定义线程池能支持的最大线程数,这个限制一般取决于平台系统所定义的PID(Process Identify)字段的宽度,在本例取决于处理器算力及下游处理能力的带宽极限
instance 所构造的thread lib静态单例,方便将线程池放置于系统规划的快速存储段以加速调度器的调度效能
extcontext 在单例中构造的class threadext_dt静态实例数组
availablecontext 在单例中构造的class thread_dt静态实例数组
ready_queue 调度器中已注册实际操做任务的线程池,分为high_priority, normal_priority, low_priority三个优先级的独立线程池
main_thread 调度器的主线程,即main函数产生的线程
spare_queue 在单例中构造的线程池,用于存放调度器未注册的全部可用空白线程元素
current 调度器当前在执行的线程元素

调度器调度方法的简要说明


操做函数 功能说明
get_readyqueue 得到指定优先级的线程池实例
get_currentreadyqueue 得到当前运行任务所在优先级的线程池实例
reschedule 调度器重调度方法,按照指定的调度策略将当前执行任务占用的处理器计算资源释放,并从ready_queue中选出下一个线程提交处处理器执行
init 调度器的初始化方法
getcurrentcontext 得到当前正在执行的线程元素实例
__yield 普通中断模式下的处理器计算资源替换方法,让当前执行的线程任务让出处理器资源,并交给新的线程任务
yield 快速中断模式下的处理器资源替换方法,做用与普通模式下相似
__lowpriorityyield 普通中断模式下将当前任务让步到low priority线程池队列的方法
lowpriorityyield 快速中断模式下将当前任务让步到low priority线程池的方法
sleep 使当前线程休眠指定时间的间隔,单位ms
threadexit 子线程任务退出执行并返回主线程的方法
reset_threadpool 调度器内部资源的初始化方法
create_thread 注册线程任务到空白线程元素
run_thread 执行线程任务的统一入口
thread_printf 调度器debug使用的打印函数
linkinterrupt 调度器线程任务与中断连接方法

调度器的初始化过程

调度器经过init和reset_threadpool两个函数完成内部资源节点的初始化操做,其中reset_threadpool是init函数调用的子程

reset_threadpool函数的源码以下:

static inline void reset_threadpool()
    {
#ifdef WIN32
        for (uint32 i=0; i!=maxthread; ++i)
        {
            LPVOID thread_fiber = instance.availablecontext[i].thread_fiber;
            if (thread_fiber != NULL)
            {
                DeleteFiber(thread_fiber);
            }
        }
#endif
        //memset(&instance,0,sizeof instance);
        memset(&instance,0,sizeof(instance));
        memset(extcontext,0,sizeof(extcontext));
        for(uint32 i = high_priority; i <= low_priority; ++i)
        {
            instance.ready_queue[i].init();
        }
        instance.spare_queue.init();
        for(uint32 i=0; i!=maxthread; ++i)
        {
            instance.availablecontext[i].extdat=&extcontext[i];
            instance.spare_queue.insert(&instance.availablecontext[i]);
        }
    }

上述代码操做流程以下

restpool

调度器初始化操做将类中使用静态资源的内存空间段作归0操做,并将线程实例(class thread_dt)与线程操做函数实例(class threadext_dt)创建一一绑定关系从而产生可用的空白线程元素,空白线程会加入到spare_queue供调度器随时拾取。调度器的内部线程资源采用静态资源的目的在于提升调度器的工做性能,详细缘由将在下文阐述。

init函数的源码以下:

static inline void init()
    {
        instance.reset_threadpool();

#ifdef WIN32
        instance.main_thread.thread_fiber = ConvertThreadToFiber(NULL);
        ASSERT(instance.main_thread.thread_fiber != NULL,LEVEL_NORMAL);
#endif

        instance.ready_queue[low_priority].insert(&(instance.main_thread));
        instance.current = &(instance.main_thread);
#ifdef PROBE_LOG
        instance.lasttick = reg_ops::gettickcount();
        instance.current->count = 0;
#endif
    instance.current->priority = low_priority;

    }

init函数的流程以下,在调用reset_threadpool初始化资源后,设置主线程和当前线程的状态

init

调度器资源的获取

get_readyqueue, getcurrentcontext,, get_currentreadyqueue是调度器三个资源状态获取方法,分别用于获取指定条件下的线程资源

static contextqueue& get_readyqueue(uint32 priority)
{
    return instance.ready_queue[priority];
}
static contextqueue& get_currentreadyqueue()
{
    return instance.ready_queue[instance.current->priority];
}
static inline thread_dt* getcurrentcontext()
{
    return instance.current;
}
static inline void __yield()

get_readyqueue用于得到指定优先级的线程池队列

get_currentreadyqueue用于得到当前正在执行线程所在优先级的线程池队列

getcurrentcontext用于得到当前正在执行线程的线程对象

调度器任务的注册方法

调度器经过create_thread函数将用户函数绑定到空白线程对象,并注册到ready_queue线程池中等待调度器提交处处理器执行

void thread_lib::create_thread(thread_fn thread_start,void* parg, uint32 priority)
#endif
{
    ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
    ASSERT(priority <= low_priority, LEVEL_NORMAL);
    thread_dt *pcontext = instance.spare_queue.current();
    ASSERT(pcontext!=NULL,LEVEL_NORMAL);
    instance.spare_queue.remove();
    pcontext->next = NULL;
    pcontext->priority = priority;
    pcontext->extdat->thread_start = thread_start;
    pcontext->extdat->thread_arg = parg;
#ifndef WIN32
    memset(pcontext->extdat->astack,0,threadext_dt::thread_stack_size);
#endif // WIN32
#ifdef PROBE_LOG
    pcontext->count = 0;
    memset(pcontext->funcname,' ',sizeof(pcontext->funcname));
    const char* funcnamebody=funcname;
    const char* current=funcname;
    while(*current!='\0')
    {
        if(*current==':')
        {
            funcnamebody=current+1;
        }
        ++current;
    }
    for(uint32 i=0;i!=sizeof(pcontext->funcname);++i)
    {
        char deschar=funcnamebody[i];
        if(deschar=='\0')
            break;
        pcontext->funcname[i]=deschar;
    }
#endif
#ifdef WIN32
    pcontext->thread_fiber  = CreateFiber(threadext_dt::thread_stack_size, &run_thread, NULL);
    ASSERT(pcontext->thread_fiber != NULL,LEVEL_NORMAL);
#else
    pcontext->sp = ((uint32)pcontext->extdat->astack)+threadext_dt::thread_stack_size-60;
    pcontext->r15 = (uint32)run_thread-8;
#endif
    uint32 status = clearinterruptacquire();
    instance.ready_queue[priority].insert(pcontext);
    interruptrestore(status);
}

流程以下

create_thread

线程栈空间地址以高位地址做为栈底,逐渐向低位地址扩展栈空间范围,在以上代码中,实际的栈空间地址比定义最大栈空间地址小60个byte,目的使不一样线程各自的栈空间之间留出足够的安全距离,防止某个线程因为stack overflow crash对其余线程栈区产生破坏性覆盖操做。

按照以上定义,处理通用寄存器文件GPR的第16个寄存器r15存放子过程的返回地址,因为调度器自己属于低级非抢占式调度器,每一个线程任务在得到cpu计算资源后将不会被中断打断,一直执行至函数完毕,所以每一个子过程的返回地址都被注册到class thread_lib的静态成员函数run_thread, 具体的原理将在如下调度器过程说明中详细阐述。实际子过程返回地址根据pcontext->r15 = (uint32)run_thread-8所示,位于run_thread函数label以前一个Dword地址, 这是因为Microblaze处理器的branch模块存在分支预测槽结构(delay slot

根据Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018, 55页及58页关于分支延迟槽的描述以下

A control hazard occurs when a branch is taken, and the next instruction is not immediately available. This results in stalling the pipeline. MicroBlaze provides delay slot branches and the optional branch target cache to reduce the number of stall cycles.

Delay Slots

When executing a taken branch with delay slot, only the fetch pipeline stage in MicroBlaze is flushed. The instruction in the decode stage (branch delay slot) is allowed to complete. This technique effectively reduces the branch penalty from two clock cycles to one. Branch instructions with delay slots have a D appended to the instruction mnemonic. For example, the BNE instruction does not execute the subsequent instruction (does not have a delay slot), whereas BNED executes the next instruction before control is transferred to the branch location.

A delay slot must not contain the following instructions: IMM, IMML, branch, or break. Interrupts and external hardware breaks are deferred until after the delay slot branch has been completed. Instructions that could cause recoverable exceptions (for example unaligned word or halfword load and store) are allowed in the delay slot.
If an exception is caused in a delay slot the ESR[DS] bit is set, and the exception handler is responsible for returning the execution to the branch target (stored in the special purpose register BTR). If the ESR[DS] bit is set, register R17 is not valid (otherwise it contains the address following the instruction causing the exception).

存在分支预测槽结构的处理器,在执行branch类指令时,因为取指单元(IFU, Instruction Fetch Unit)预取的指令缓存会清空,致使IFU须要从新预取新的有效指令,所以跳转指令的效果会延迟数个cycle才能生效,从而在流水线上产生若干个cycle的空泡(bubble),为了遮盖这些流水线空泡,处理器会提早执行跳转指令以后的数条指令,所以对应上文赋值给r15的返回地址将比实际的run_thread地址提早两个指令左右的执行宽度,用于配合分支延迟槽的提早执行特征

调度器的操做方法

调度器的操做方法主要有以下几种

  • 线程重调度方法reschedule
  • 线程让步方法yield, __yield, lowpriorityyield, __lowpriorityyield
  • 线程任务执行方法run_thread
  • 线程任务推出方法threadexit
  • 线程休眠方法sleep
线程重调度函数 reschedule

函数实现的代码以下

void thread_lib::reschedule()
{
    thread_dt *pnewctx;
#ifdef PROBE_LOG
    uint32 curtick=reg_ops::gettickcount();
    instance.current->count += curtick - instance.lasttick;
    do
    {
        communicator::overheat_delay();
        if(laterthan(reg_ops::gettickcount(),currenttick))
        {
            uint32 newtick=reg_ops::gettickcount();
            accidletime+=newtick-curtick;
            curtick=newtick;
            disp_diagnoisisinfo();
        }
    }
    while((pnewctx = instance.ready_queue[high_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[normal_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[low_priority].current())==NULL);
    {
        uint32 newtick=reg_ops::gettickcount();
        accidletime+=newtick-curtick;
        instance.lasttick = newtick;
    }
#else
    do
    {
        communicator::overheat_delay();
    }
    while((pnewctx = instance.ready_queue[high_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[normal_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[low_priority].current())==NULL);
#endif
#ifdef WIN32
    instance.current = pnewctx;
    LPVOID next_fiber =instance.current->thread_fiber;
    ASSERT(next_fiber != NULL,LEVEL_NORMAL);
    SwitchToFiber(next_fiber);
#else
    thread_dt *poldctx = instance.current;
    instance.current = pnewctx;
    __Yield(poldctx,pnewctx);
#endif
}

重调度函数实现的功能很是简单, 即按照从高到低的优先级从ready_queue线程池中取出下一个即将执行的线程元素,并使其取代当前执行线程得到处理器计算资源, 图示流程以下。

reschedule

过温判断是一个阻塞式的函数调用过程,CPU所读取到系统温度传感器(Temperature Sendor)的读数高过预设的过温阈值时,CPU须要反复进入nop指令以等待系统温度下降到安全阈值如下,等待函数overheat_delay的代码以下

static inline void overheat_delay()
    {
        //Wait for a certain period of time.
        //1/2 CPU computation power.
        uint32 isrflag=clearinterruptacquire();
        uint32 tick=reg_ops::gettickcount();
        uint32 interval = timeinterval(instance.last_ohdelay_tick, tick);
        interval=Min(interval,reg_ops::tick_size*max_ohdelay_interval);
        uint32 tickguard=tick+interval;
        while(beforethan(reg_ops::gettickcount(),tickguard));
        instance.last_ohdelay_tick=reg_ops::gettickcount();
        interruptrestore(isrflag);
    }

在执行等待过程当中,须要将系统的中断响应使能关闭,防止意外中断的介入打断系统降温过程。在过程当中,系统须要读入实时计数器(Timer/R, Real Timer Clock)的当前计数,并与预设的降温等待间隔累加获得tickguard值,当系统polling到的tick计数小于tickguard时,全系统除CPU外,业务均处于pending状态已使系统待机降温。tick 的概念相似Linux 系统中所提出的jiffies概念,即系统开机后,一段时间内累计的总时间周期基数,该计数用于系统执行一些延迟等待任务。

重调度函数的核心部分是用于执行线程替换的内嵌ASM函数__Yield

Yield函数使用内嵌式汇编调用接口,其接口形式以下

extern void __Yield(thread_dt* poldctx,thread_dt* pnewctx);

内嵌式汇编程序的参数传递方式通常有三种,经常使用的有经过汇编占位符方式引入参数和经过处理器paramerter寄存器引入参数。

经过汇编占位符引入参数的内联汇编格式以下

__asm__ __volatile__("Instruction List" : Output : Input : Clobber/Modify);
  • Instruction List

Instruction List 是汇编指令序列。它能够是空的,好比:__asm__ volatile__(""); 或 __asm ("");都是彻底合法的内联汇编表达式

  • volatile

__volatile__是GCC 关键字volatile 的宏定义
#define __volatile__ volatile __volatile__或volatile 是可选的。若是用了它,则是向GCC 声明不容许对该内联汇编优化

  • Output

Output 用来指定当前内联汇编语句的输出

  • Input

Input 域的内容用来指定当前内联汇编语句的输入Output和Input中,格式为形如“constraint”(variable)的列表(逗号分隔)

  • Clobber/Modify

有时候,你想通知GCC当前内联汇编语句可能会对某些寄存器或内存进行修改,但愿GCC在编译时可以将这一点考虑进去。那么你就能够在Clobber/Modify域声明这些寄存器或内存。这种状况通常发生在一个寄存器出如今"Instruction List",但却不是由Input/Output操做表达式所指定的,也不是在一些Input/Output操做表达式使用"r"约束时由GCC 为其选择的,同时此寄存器被"Instruction List"中的指令修改,而这个寄存器只是供当前内联汇编临时使用的状况。

  • 通用约束

约束 Input/Output 意义 g I,O 表示可使用通用寄存器,内存,当即数等任何一种处理方式。 0,1,2,3,4,5,6,7,8,9 I 表示和第n个操做表达式使用相同的寄存器/内存

例如:

__asm__ ("popl %0 \n\t"
         "movl %1, %%esi \n\t"
         "movl %2, %%edi \n\t": 
         "=a"(__out): 
         "r" (__in1), 
         "r" (__in2));

此例中,%0对应的就是Output操做表达式,它被指定的寄存器是%eax,整个Instruction List的第一条指令popl %0,编译后就成为popl %eax,这时%eax的内容已经被修改,随后在Instruction List后,GCC会经过movl %eax, address_of_out这条指令将%eax的内容放置到Output变量__out中。对于本例中的两个Input操做表达式而言,它们的寄存器约 束为"r",即要求GCC为其指定合适的寄存器,而后在Instruction List以前将__in1和__in2的内容放入被选出的寄存器中,若是它们中的一个选择了已经被__out指定的寄存器%eax,假如是__in1,那 么GCC在Instruction List以前会插入指令movl address_of_in1, %eax,那么随后popl %eax指令就修改了%eax的值,此时%eax中存放的已经不是Input变量__in1的值了,那么随后的movl %1, %%esi指令,将不会按照咱们的本意——即将__in1的值放入%esi中——而是将__out的值放入%esi中了。

__Yield函数采用的第二种方式经过处理器parameter寄存器进行函数传参, 根据Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018 195页描述, r5-r10是Mircoblaze处理器GPR中的参数寄存器,引入参数按照形参顺序依次放入寄存器位置

registerparam

在本例中使用r5和r6两枚参数寄存器分别存放poldctx与pnewctx指针, 调用swi save 指令按照class/struct内存排布结构,依次向内存段将当前GPR的保存内容到poldctx对象空间,同时使用lwi load 指令将新线程中上下文内容换入处处理器的GPR空间。对于class thread_dt的实例在内存中的排布结构以下所示:

classstruct

以上结构为class thread_dt在实例在内存中的排布结构,因为线程池元素构造了类的实例对象数组,所以在内存构造中并无this指针占用结构地址。

__Yield函数的源码实现以下

.section .text
    .globl    __Yield
    .align    4
    .ent    __Yield
        .type __Yield, @function
__Yield:
    //save registers
    swi    r15,r5,4
    swi    r17,r5,8
    swi    r19,r5,12
    swi    r20,r5,16
    swi    r21,r5,20
    swi    r22,r5,24
    swi    r23,r5,28
    swi    r24,r5,32
    swi    r25,r5,36
    swi    r26,r5,40
    swi    r27,r5,44
    swi    r28,r5,48
    swi    r29,r5,52
    swi    r30,r5,56
    swi    r31,r5,60
    //store R1 in *poldctx->sp
    swi    r1,r5,0
    //set R1 to *pnewctx->sp
    lwi    r1,r6,0
    //restore registers
    lwi    r15,r6,4
    lwi    r17,r6,8
    lwi    r19,r6,12
    lwi    r20,r6,16
    lwi    r21,r6,20
    lwi    r22,r6,24
    lwi    r23,r6,28
    lwi    r24,r6,32
    lwi    r25,r6,36
    lwi    r26,r6,40
    lwi    r27,r6,44
    lwi    r28,r6,48
    lwi    r29,r6,52
    lwi    r30,r6,56
    rtsd    r15,8
    lwi    r31,r6,60
    .end    __Yield

代码中 ".section .text" 表示该段代码位于程序的text段,即指令正文, ".globl __Yield" 表示函数的label名 __Yield全局空间可见,以方便连接器按名字执行连接操做,”.align 4“ 表示生成的二进制代码按照4字节对齐排布,对应microblaze做为RISC 32处理的格式要求 ”.ent __Yield“表示做为__Yield函数的正文起始, ”.type __Yield, @function"用于指定代码的类型为函数,__Yield部分代码属于函数子程。".end __Yield"表示整个函数的结尾。

__Yield函数首先调用swi指令将以r15开始的15个上下文GPR内容保存到poldxtx内存段基址+4偏址至+60偏址的内存空间,最后把栈空间指针从r1复制到poldxtx内存段基址+0偏址从而完成现有进程的上下文保存。

新线程任务的注入的过程与保存过程相反,调用lwi指令首先将pnewxtx内存段基地址+0偏址的栈空间sp指针地址复制到r1寄存器 而后将r15开始的15个上下文GPR内容从pnewctx基址的+4至+60偏址段内容复制处处理器的GPR, 最后使用rtsd将PC指针重定向到r15返回地址+8的位置,因为上文r15寄存器内容被设置为,r15=run_thread=8,所以重定向的PC指针地址为run_thread函数的统一入口地址。此例中能够看到rtsd 分支指令提早到 lwi r31, r6,60指令前执行,其缘由在于上文所提到的分支预测槽的影响,因为rtsd指令须要至少延迟一个指令周期才能生效,为了屏蔽延迟带来的空泡,故将rtsd指令提早一个周期执行,使延迟时间槽正好被下一条指令的执行时间所填满,在调度器频繁使用线程切换时,能够提升必定的指令执行的效率。

线程让步函数

调度器提供了yield, __yield, lowpriorityyield, __lowpriorityyield四种线程让步函数,其源码以下:

static inline void __yield()
    {
        instance.ready_queue[instance.current->priority].rotate();
        setinterrupt();
        reschedule();
        clearinterrupt();
    }
    static inline void yield()
    {
        uint32 status = clearinterruptacquire();
        instance.ready_queue[instance.current->priority].rotate();
        interruptrestore(status);
        reschedule();
    }
    static inline void __lowpriorityyield()
    {
        uint32 priority = instance.current->priority;
        instance.ready_queue[priority].remove();
        instance.ready_queue[low_priority].inserthead(instance.current);
        setinterrupt();
        reschedule();
        clearinterrupt();
        instance.ready_queue[low_priority].remove();
        instance.ready_queue[priority].inserthead(instance.current);
    }
    static inline void lowpriorityyield()
    {
        uint32 priority = instance.current->priority;
        uint32 status = clearinterruptacquire();
        instance.ready_queue[priority].remove();
        instance.ready_queue[low_priority].inserthead(instance.current);
        interruptrestore(status);
        reschedule();
        status = clearinterruptacquire();
        instance.ready_queue[low_priority].remove();
        instance.ready_queue[priority].inserthead(instance.current);
        interruptrestore(status);
    }

__yieldyield函数区别在于__yield函数对应普通中断模式而yield对应快速中断模式,两类线程让步函数的流程十分类似。

yield

__lowpriorityyieldpriorityyield函数区别与上面的状况相似,分别在普通中断和快速中断模式下将当前任务让步到低优先级线程池队列。

lowyield

线程任务运行与退出方法

线程退出方法threadexit的实现源码以下:

static inline void threadexit()
    {
#ifdef PROBE_LOG
        instance.current->extdat->thread_start=NULL;
        instance.current->extdat->thread_arg=NULL;
#endif
        ASSERT((instance.current != NULL),LEVEL_NORMAL);
        uint32 status = clearinterruptacquire();

        instance.ready_queue[instance.current->priority].remove();

        interruptrestore(status);
        instance.spare_queue.insert(instance.current);
        instance.reschedule();
    }

调度器的线程退出函数提供了用户线程完成线程函数任务后退出原线程上下文的方法,其流程以下图

exit

线程运行方法threadexit的实现源码以下:

void thread_lib::run_thread()
{
    instance.current->extdat->thread_start(instance.current->extdat->thread_arg);
    threadexit();
}

run_thread函数是调度器类的静态函数方法,是用户定义线程执行最重要的统一程序入口。全部子线程得到处理器计算资源的起始执行位置都将从run_thread开始,其流程以下。

run

在执行run_thread方法以前,通常已经经过reschedule方法将新线程任务设置为当前的current线程,所以在run_thread执行中,函数任务将始终执行current线程的函数任务方法。经过run_thread运行线程任务,使调度器在多线程上下文切换的过程当中可以始终有效管理子线程的运行行为。

线程任务休眠方法

调度器的休眠方法在主线程中执行以毫秒为单位的运行时间延迟,在延迟时间内,经过循环yield调用重调度函数,使其余子线程得之后台运行,线程最终返回主线程时,经过读取timer获得tick计数判断是否达到预约的时间延迟,因为延迟断定须要切换回主线程才能执行,当线程池中的用户线程过多时,延迟等待的时间并不精确,一般会超出预设的等待时间。

sleep

线程中断绑定方法

调度器提供了中断绑定线程任务的接口,本例描述的低层次抢占式调度器中没有使用绑定中断方式来增长调度任务的场景应用环境

中断绑定接口的代码以下

static inline void linkinterrupt(uint32 ISRID,ISRCallBack f)
{
    aISRFunc[ISRID] = f;
}

microbalze的处理器采用low-latancy中断查询机制,触发中断后会进入统一中断服务接口函数,查询具体的中断事件源并处理中断,在根据Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018第85页到86页对中断机制有详细描述

Interrupt

MicroBlaze supports one external interrupt source (connected to the Interrupt input port). The processor only reacts to interrupts if the Interrupt Enable (IE) bit in the Machine Status Register (MSR) is set to 1. On an interrupt, the instruction in the execution stage completes while the instruction in the decode stage is replaced by a branch to the interrupt vector. This is either address C_BASE_VECTORS + 0x10, or with low-latency interrupt mode, the address supplied by the Interrupt Controller.

The interrupt return address (the PC associated with the instruction in the decode stage at the time of the interrupt) is automatically loaded into general purpose register R14. In addition, the processor also disables future interrupts by clearing the IE bit in the MSR. The IE bit is automatically set again when executing the RTID instruction.

Interrupts are ignored by the processor if either of the break in progress (BIP) or exception in progress (EIP) bits in the MSR are set to 1.

By using the parameter C_INTERRUPT_IS_EDGE, the external interrupt can either be set to level-sensitive or edge-triggered:

• When using level-sensitive interrupts, the Interrupt input must remain set until
MicroBlaze has taken the interrupt, and jumped to the interrupt vector. Software must
cknowledge the interrupt at the source to clear it before returning from the interrupt
handler. If not, the interrupt is taken again, as soon as interrupts are enabled when
returning from the interrupt handler.

• When using edge-triggered interrupts, MicroBlaze detects and latches the Interrupt
input edge, which means that the input only needs to be asserted one clock cycle. The
interrupt input can remain asserted, but must be deasserted at least one clock cycle
before a new interrupt can be detected. The latching of an edge-triggered interrupt is
independent of the IE bit in MSR. Should an interrupt occur while the IE bit is 0, it will
immediately be serviced when the IE bit is set to 1.

With periodic interrupt sources, such as the FIT Timer IP core, that do not have a method to clear the interrupt from software, it is recommended to use edge-triggered interrupts.

Low-latency Interrupt Mode

A low-latency interrupt mode is available, which allows the Interrupt Controller to directly supply the interrupt vector for each individual interrupt (using the Interrupt_Address input port). The address of each fast interrupt handler must be passed to the Interrupt Controller when initializing the interrupt system. When a particular interrupt occurs, this address is supplied by the Interrupt Controller, which allows MicroBlaze to directly jump to the handler code.

With this mode, MicroBlaze also directly sends the appropriate interrupt acknowledge to the Interrupt Controller (using the Interrupt_Ack output port), although it is still the responsibility of the Interrupt Service Routine to acknowledge level sensitive interrupts at the source.

This information allows the Interrupt Controller to acknowledge interrupts appropriately, both for level-sensitive and edge-triggered interrupt.

To inform the Interrupt Controller of the interrupt handling events, Interrupt_Ack is set to:

• 01: When MicroBlaze jumps to the interrupt handler code,

• 10: When the RTID instruction is executed to return from interrupt,

• 11: When MSR[IE] is changed from 0 to 1, which enables interrupts again.

The Interrupt_Ack output port is active during one clock cycle, and is then reset to 00

程序的入口地址+0x10的偏移地址部分即统一的中断入口函数地址

.globl _start
        .section .vectors.reset, "ax"
    .align 2
        .ent _start
        .type _start, @function
_start:
        brai    _start1
        .end _start

    .globl _interrupthandle
        .section .vectors.interrupt, "ax"
    .align 2
        .ent _interrupthandle
        .type _interrupthandle, @function
_interrupthandle:
        brai    PreemptiveInterrupt
        .end _interrupthandle

当触发中断后,处理器会自动跳转到_interrupthandle执行中断服务任务,中断服务会跳转至PreemptiveInterrupt执行中断查询任务,执行相关的中断处理任务,PreemptiveInterrupt中断查询服务函数代码以下,因与调度器工做流程无关,此处不展开分析

void PreemptiveInterrupt(void)
{
    uint32 CurrentDPCLevel = dpc_lib::instance.level;
    dpc_lib::instance.level = dpc_lib::interruptlevel;
    uint32 IntrStatus  =  reg_ops::get_interrupt_status();
    while (IntrStatus)
    {
        uint32 IntrMask = ((IntrStatus^(IntrStatus-1))+1)>>1;
        uint32 IntrIndex;
        IntrIndex = bitscanreverse(IntrMask);
        aISRFunc[IntrIndex]();
        reg_ops::ack_interrupt(IntrMask);
        IntrStatus  = reg_ops::get_interrupt_status();
    }

    dpc_lib::instance.level = CurrentDPCLevel;
    while (true)
    {
        uint32 scheduleMask = dpc_lib::instance.enablemask & dpc_lib::instance.triggermask;
        uint32 NewDPCLevel = bitscan(scheduleMask);
        if (NewDPCLevel+1 >= CurrentDPCLevel)
            break;

        //Save Current DPCLevel and Registers
        dpc_lib::instance.level = NewDPCLevel+1;
        dpc_lib::instance.triggermask ^= (0x80000000u >> NewDPCLevel);
        __asm__ __volatile__ ("addik r1, r1, -4":::"memory");
        __asm__ __volatile__ ("swi r14, r1, 0":::"memory");

        //Enable Interrupt
        setinterrupt();

        dpc_lib::instance.dpcfun[NewDPCLevel]();

        //Disable Interrupt
        clearinterrupt();

        //Restore Current DPCLevel and Registers
        __asm__ __volatile__ ("lwi r14, r1, 0":::"memory");
        __asm__ __volatile__ ("addik r1, r1, 4":::"memory");
        dpc_lib::instance.level = CurrentDPCLevel;
    }
}
  • 调度器的工做流程

在理清调度器主要调度方法后,咱们能够在下文对调度器展开模拟工做过程分析

调度器资源存活的内存物理区间

为了在调度器工做过程当中实现更高的响应性能,则须要优化调度器工做中资源开销的IO访存性能, 本例中调度器采用静态实例资源实现,经过连接手段将调度器资源放置在处理器的紧耦合内存(TCM, Tight Couppling Memory)上, MicroBlaze所拥的TCM内存称为BRAM供指令和数据同时使用。处理器内核访问BRAM的延迟一般在5个时钟周期之内,远小于访问主存DDR带来的数百个时钟周期延迟。
bram

在实际应用中咱们在连接脚本ld.script中自定义了一种快速数据段FASTDATA_SECTION(fastdata),从而将调度器资源指定到BRAM空间内

调度器资源创建位置代码

#define FASTDATA_SECTION    __attribute__ ((section ("fastdata")))

thread_lib thread_lib::instance FASTDATA_SECTION;

ld.script

_STACK_SIZE = DEFINED(_STACK_SIZE) ? _STACK_SIZE : 0x100000;
_HEAP_SIZE = DEFINED(_HEAP_SIZE) ? _HEAP_SIZE : 0;

/* Define Memories in the system */

MEMORY
{
   microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl : ORIGIN = 0x00000050, LENGTH = 0x0001FFB0
   axi_7series_ddrx_0_S_AXI_BASEADDR : ORIGIN = 0xC0000000, LENGTH = 0x3BF60000
}

/* Specify the default entry point to the program */

ENTRY(_start)

/* Define the sections, and where they are mapped in memory */

SECTIONS
{
.vectors.reset 0x00000000 : {
   *(.vectors.reset)
} 
.vectors.interrupt 0x00000010 : {
   *(.vectors.interrupt)
} 
.vectors.exception 0x00000020 : {
   *(.vectors.exception)
} 

.text : {
   *(.text)
   *(.text.*)
   *(.gnu.linkonce.t.*)
} > microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl

.fastdata : {
   . = ALIGN(4);
   *(.fastdata)   
} > microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl
}

调度器的模拟工做流程

如下经过流程图来展现整个调度器生命周期的工做过程

  1. 调度器完成初始化过程后处理器GPR被主线程的上下文环境所占据,此时ready_queue中尚无有效线程单元,current指针指向main_thread实例空间(内容为空)

init

  1. 用户经过create_thread方法不断从spare_queue中抽取空白线程单元填入各种线程任务方法,此时ready_queue中high/normal/low三种优先级队列中被注册了若干线程任务函数,current指针仍指向main_thread实例空间(内容为空)

create

  1. 程序执行yield方法从ready_queue中按照从高到底优先级抽取线程对象,将新线程与current指针绑定,启动线程让步后,主线程上下文被保存至main_thread实例空间并插入到low-priority队列的尾部。所以主线程只当优先级在其以前的全部线程池任务执行完毕才能从新得到CPU的计算资源。

yield

  1. yield方法会将除主线程之外全部线程执行的启动位置定位到"run_thread", 当执行完current所指向的线程所承载的任务函数后,将已执行完的线程从ready_queue中删除,放回到spare_queue队列,并再次启动reschedule过程从线程池挑选新的线程任务提交给处理器执行。

exit

  1. 当线程池中全部用户程序任务被执行完毕并放回到spare_queue, 主线程将再次得到处理器计算资源,主线程将按照执行yield时的上下文继续向下执行yield以后的程序。

return

  • 与调度器相关的事件与信号量方法

原始设计中一样提供了与调度器运行相关的事件与信号量方法

线程事件方法

class event_dt
{
private:
    volatile uint32 status;
    thread_dt* volatile thread;
public:
    inline void init(uint32 initialvalue)
    {
        status = initialvalue;
        thread=NULL;
    }
    inline bool isset() const
    {
        return status!=0;
    }
    inline void __set()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        status = 1;
        if(thread != NULL)
        {
            thread_lib::get_readyqueue(thread->priority).insert(thread);
            thread=NULL;
        }
    }
    inline void set()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        __set();
        interruptrestore(oldstatus);
    }
    inline void reset()
    {
        status = 0;
    }
    inline void __wait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status == 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            setinterrupt();
            thread_lib::reschedule();
            clearinterrupt();
        }
    }
    inline void wait()
    {
        ASSERT(isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        uint32 oldstatus = clearinterruptacquire();
        if(status == 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            interruptrestore(oldstatus);
            thread_lib::reschedule();
        }
        else
        {
            interruptrestore(oldstatus);
        }
    }
};

本例实现的事件用于用户线程之间的触发等待通讯,其成员变量和方法的含义以下


成员变量 功能说明
status 事件阻塞式等待标志,0为阻塞等待,非0则为非阻塞访问
thread 用于承载阻塞等待线程的线程容器,待事件触发后返回

操做函数 功能说明
init 事件初始化方法,用于设置事件实例的等待方式(默认为0)和承载线程(设为NULL)
isset 查看事件有无被触发(status设置为0)
__set 事件触发函数,设置status状态,并将事件等待所在线程从新注册到线程池队列末尾
set 事件触发线程,用于开关中断并调用__set
reset 事件重置方法,将事件实例状态归为初始化状态
__wait 普通中断模式下的事件等待方法,为整个事件通讯流程的发起函数
wait 快速中断模式下的事件等待方法,为整个事件通讯流程的发起函数
  1. 以下图所示,事件类在被申请和初始化后,事件类内部的事件触发标志status被初始化为0,wait操做的寄生线程为NULL

eventinit

  1. 执行事件wait函数等待事件触发后,wait操做的寄生线程被指向等待操做所在的线程(红框标识)

eventwait

  1. wait函数将启动调度器的reschedule重调度操做,将wait所在线程替换出CPU上下文环境,并放回到spare_queue,同时在event的容器内保留该寄生线程的备份, 用户可按照业务须要在其余用户线程中放置事件促发操做(紫逛标识)

eventyield

  1. 当调度器将事件促发线程调换到CPU执行时,事件触发操做执行,设置事件类中的促发标志(status标黄),同时将事件类容器中的等待操做寄生线程从新插入回ready_queue的线程池,上述操做执行完毕,触发线程将被删除b并被放回spare_queue。

eventset

  1. 调度器从新将事件等待寄生线程调换回CPU,处理器上下文回到等待函数的在yield后的代码并继续向下执行,事件等待操做完成。

eventwaitdone

以上事件操做涉及多个线程,所以事件类的声明须要放置在全局变量空间(.data或者.bss段,或其余自定义的全局空间段),因为调度器属于抢占式,所以对于触发标志的检查不须要在while循环中执行,这是因为reschedule的操做会将已执行的线程从线程池删除而且触发线程必定会在等待线程以前完成,采用循环检查会致使第二轮次的yield出现线程空指针错误。

线程信号量方法

class semaphore_dt
{
private:
    volatile int32 status;
    thread_dt* volatile thread;

public:
    inline semaphore_dt()
    {
    }
    inline semaphore_dt(int32 initialvalue)
    {
        init(initialvalue);
    }
    inline void init(int32 initialvalue)
    {
        status = initialvalue;
        thread=NULL;
    }
    inline void __inc()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        if(status == 0)
        {
            if(thread == NULL)
            {
                ++status;
            }
            else
            {
                thread_lib::get_readyqueue(thread->priority).insert(thread);
                thread=NULL;
            }
        }
        else
        {
            ++status;
        }
    }
    inline void inc()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        __inc();
        interruptrestore(oldstatus);
    }
    inline int32 getresourcecount() const
    {
        return status;
    }
    inline bool isneedwait() const
    {
        return status <=0;
    }
    inline bool __trywait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status <= 0)
        {
            return false;
        }
        --status;
        return true;
    }
    inline bool trywait()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        bool ret=__trywait();
        interruptrestore(oldstatus);
        return ret;
    }
    inline void __wait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status <= 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();

            setinterrupt();
            thread_lib::reschedule();
            clearinterrupt();
        }
        else
        {
            --status;
        }
    }
    inline void wait()
    {
        ASSERT(isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        uint32 oldstatus = clearinterruptacquire();
        if(status <= 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            interruptrestore(oldstatus);
            thread_lib::reschedule();
        }
        else
        {
            --status;
            interruptrestore(oldstatus);
        }
    }
};

本例的信号量用于用户线程之间的旗帜信令握手,其成员变量和方法的含义以下


成员变量 功能说明
status 用于初始化旗语容许的最大信号量,可用于控制用户线程事件执行的数量
thread 用于承载阻塞等待线程的线程容器,待事件触发后返回

操做函数 功能说明
init 初始化信号量的最大受权数量,可用于控制使用该信号量同步的线程执行个数
__inc 旗语信号量的放回操做方法,用于恢复信号量的受权数
inc 执行旗语信号量的放汇操做,开关中断响应,调用__inc
getresourcecount 得到当前可用的信号量个数
isneedwait 判断当前信号量是否已经所有受权出(新等待受权任务须要等待)
__trywait 普通中断模式下尝试等待信号量容器受权信令,若没有效受权,申请受权线程不造成阻塞
trywait 快速中断模式下尝试等待信号量容器受权信令,若没有效受权,申请受权线程不造成阻塞
__wait 普通中断模式下尝试等待信号量容器受权信令,若没有效受权,申请受权线程将阻塞
wait 快速中断模式下尝试等待信号量容器受权信令,若没有效受权,申请受权线程将阻塞
  1. 信号量的初始化完成后,内部信号量容器会设置若干受权信令(status初始值),同时将寄生线程指针指向NULL

semainit

  1. 信号量寄生线程(红框标识)经过屡次等待函数的执行从内部信号量容器取走信令(每执行一次wait函数,status减1,直至递减至0),同时将寄生线程指针指向当前线程。在线程池中注入多个信令交还线程(紫框标识),当受权取空时,再次在寄生线程中启动等待函数会迫使当前线程启动reschedule函数重调度线程

semawait

  1. 当信令注入线程被调度器调度处处理器执行时,inc函数会交还一个受权信令(status加1),同时将寄生线程指针从新放回ready_queue线程池

semainc

  1. 当寄生线程从新调度会理器执行时,原来因为缺少受权信令而进入重调度过程的函数得到新的信令得以继续执行,信令受权申请结束。

semareturn

与事件类操做涉相似,信号量类的声明也须要放置在全局变量空间,对于信号受权检查的方式因为与事件类相同的缘由,也不采用循环检查的方式。

  • 后记

以上讨论了一个很是简单的RTOS调度器的实现,真实的分时系统所采用的调度器因为平均时间片和中断的引入,在结构和流程设计时会更加复杂。将来将尝试在上述调度器上逐步升级加入时间片切换和中断方式。
衡量一个调度器设计的优劣通常能够考察其工做性能。

调度器的性能指标

调度器的最终目标是运行用户程序,让处理器被合理的利用。那么,评价一个调度器算法的指标是什么?
通常定量的指标,一般咱们第一个想到的就是CPU利用率。CPU利用率在必定程度上能够说明问题,表示CPU的繁忙程度,但不够细致,由于咱们不清楚CPU到底在忙什么。
从以系统为中心和以用户为中心,大约有如下几个能够利用的指标:

  • 以系统为中心:
  1. CPU利用率:CPU处理器运行指令的繁忙时间的占比
  2. 吞吐量:表示单位时间内所完成的做业个数
  3. 平均周转时间:测量任务进入和离开系统平均所花的时间(t1+t2+...+tn)/n
  4. 平均等待时间:表示系统任务的平均等待时间(w1+w2_...wn)/n
  • 以用户为中心:
  1. 响应时间:表示特定的任务i的周转时间ti
  2. 响应时间方差:表示给定进程的实际响应时间与其指望值的统计差别

除了上面介绍的定量指标,值得一提的调度器算法的定性指标:

  1. 饥饿:在任何进程做业的组合中,调度策略都应该确保全部的任务一直都有进展,若是因为某种缘由,一个进程任务并无任何进展,咱们把这种状况称之为饥饿。这种状况的定量表现是,某个特定任务的响应时间没有上限。
  2. 护送效应(convey effect):在任何进程做业的组合中,调度策略应该预防长时间运行的某个任务彻底占据CPU的使用。若是出于某种缘由,任务的调度符合固定的规律(相似于军队的护卫),这种状况称之为护送效应。这种现象的定量表现为,任务的响应时间的方差很大。

调度算法

这里介绍几种典型的非抢占式和抢占式的算法。

  1. 非抢占式的调度算法
  • 先到先服务算法(FCFS, First-Come First-Served)

这个算法会用到的属性是进程的到达时间,也就是启动运行一个进程的时间。先启动的进程会优先被调度器选中,以下图所示,P1是第一个到达的,而后再是P2, P3,因此根据先到先服务原则,调度器老是会优先选择P1,而后P2,P3。

优势:这个算法有一个很好的性质,就是任何进程都不会饥饿,也就是说算法没有回致使任务进程拒绝服务的内在偏向

缺点:但因为上面这个性质,响应时间的方差会很大。举个例子,一个长时间任务到达后,后面跟着一个短期的任务,那么短任务被长做业挡在后面,它的响应时间就会很糟糕,因为护送效应致使低下的CPU利用率。因此这个算法并无对短任何给予任何优先考虑。

  • 最短做业优先(SJF, Shortest Job First)

既然先到先服务对短任务不是很友好,那么这个算法就是为了让短做业得到更好的响应时间。
优势:调度器会优先选择时间较短的任务,让短任务得到更好的响应时间;
缺点:有可能会让一个长时任务饥饿。
解决这个缺点有一个方案,当一个做业的年龄到达一个阈值,调度器忽略SJF, 选择FCFS算法。

  • 优先级算法

出于调度的目的,多数OS会给每一个进程赋予一个属性——优先级。好比,在UNIX系统中,每一个用户级进程开始时都有一个固定的默认优先级。Ready Queue中包含多个子队列,每一个队列都对应着一个优先级,每一个子队列内部采用FCFS算法,以下图所示:

优势:灵活,能够提供差别化服务

缺点:会产生饥饿,能够根据进程的等待时间来提升优先级

  1. 抢占式调度算法

抢占式与非抢占式的区别在于:在一个新进程或刚完成I/O的进程进入到ready queue中时,会从新评估一些属性(好比剩余执行时间),以决定要不要抢占当前正在运行的进程。原则上说,上面讨论到的任何一个非抢占式算法都能改形成抢占式的,好比FCFS算法,每次从新进入就绪队列时,调度器能够决定抢占当前正在执行的进程(若是新任务的到达时间比较早),相似的,SJF和优先级也同样。
下面介绍两种抢占式算法:

  • 最短剩余时间优先(SRTF, Shortest Remaining Time First)

调度器会估计每一个进程的运行时间,当一个进程回到就绪队列,调度器计算这个任务的剩余处理时间,根据计算结果,放入ready queue中合适的位置。若是该进程的剩余时间比当前的进程要少,那么调度器就会抢占当前运行的任务,让这个新任务先执行。跟FCFS算法相比,最短剩余时间的平均等待时间通常比较低。

  • RR(Round Robin)调度器

分时环境特别适合使用RR调度器,即每一个进程都应该获得处理器时间的一部分。所以,非抢占式的调度器就不适合这种环境。假设有n个就绪的进程,调度器把CPU资源分红一个一个时间片,而后分配给各个进程,以下图所示。就绪队列里每一个进程都会获得处理器的时间片q。当时间片用完了,当前调度的进程会被放入就绪队列的尾部,造成一个ring。但考虑到在不通进程切换会有开销,因此选择时间片q的适合要考虑上下文切换。

写在后面的话

这篇文章从思考到写成大概用了1个月的时间,做为一个硬件工程师写软件源码分析掺入了不少对嵌入式高手来说显得啰里吧嗦的话,请大腿们海涵,谨以此文向老猫崇拜的两位技术偶像致敬,第一位大牛的做品被老猫在2019年反复拜读并从中窥探了系统设计的奥妙(了解Linux驱动设计,入门了高性能固件的设计思想)。强烈推荐这位偶像的一篇文章理性的赌徒-SSD写带宽保持恒稳的秘密 让老猫感慨大牛把科学工程与技术作成了蒙娜丽莎般高雅的艺术,活出了老猫心目中技术领袖该有的风范。另一位偶像集风骚与技术为一身,教导老猫理解顶级验证工程师的发展方向是系统架构工程师,通过了一年半的系统实践,老猫已在系统之路上略有收获。向在写做此文中提供了技术答疑帮助的帕啊哥,大腿马哥,肌肉强哥表达抱大腿通常的革命感情,向提供了研究项目平台和实践机会的唐总表达由衷的感谢。

相关文章
相关标签/搜索