摘要:鸿蒙轻内核的任务排序链表,用于任务延迟到期/超时唤醒等业务场景,是一个很是重要、很是基础的数据结构。git
本文会继续给读者介绍鸿蒙轻内核源码中重要的数据结构:任务排序链表TaskSortLinkAttr
。鸿蒙轻内核的任务排序链表,用于任务延迟到期/超时唤醒等业务场景,是一个很是重要、很是基础的数据结构。本文中所涉及的源码,以OpenHarmony LiteOS-M
内核为例,都可以在开源站点https://gitee.com/openharmony/kernel_liteos_m 获取。数组
1 任务排序链表
咱们先看下任务排序连接的数据结构。任务排序链表是一个环状的双向链表数组,任务排序链表属性结构体TaskSortLinkAttr
做为双向链表的头结点,指向双向链表数组的第一个元素,还维护游标信息,记录当前的位置信息。咱们先看下排序链表属性的结构体的定义。数据结构
1.1 任务排序链表属性结构体定义
在kernel\include\los_task.h
头文件中定义了排序链表属性的结构体TaskSortLinkAttr
。该结构体定义了排序链表的头节点LOS_DL_LIST *sortLink
,游标UINT16 cursor
,还有一个保留字段,暂时没有使用。函数
源码以下:学习
typedef struct { LOS_DL_LIST *sortLink; UINT16 cursor; UINT16 reserved; } TaskSortLinkAttr;
在文件kernel\src\los_task.c
中定义了排序链表属性结构体TaskSortLinkAttr
类型的全局变量g_taskSortLink
,该全局变量的成员变量sortLink
做为排序链表的头结点,指向一个长度为32的环状的双向链表数组,成员变量cursor
做为游标记录环状数组的当前游标位置。源代码以下。url
LITE_OS_SEC_BSS TaskSortLinkAttr g_taskSortLink;
咱们使用示意图来说述一下。任务排序链表是环状双向链表数组,长度为32,每个元素是一个双向链表,挂载任务LosTaskCB
的链表节点timerList
。任务LosTaskCB
的成员变量idxRollNum
记录数组的索引和滚动数。全局变量g_taskSortLink
的成员变量cursor
记录当前游标位置,每过一个Tick
,游标指向下一个位置,转一轮须要32 ticks
。当运行到的数组位置,双向链表不为空,则把第一个节点维护的滚动数减1。这样的数据结构相似钟表表盘,也称为时间轮
。spa
咱们举个例子来讲明,基于时间轮实现的任务排序链表是如何管理任务延迟超时的。假如当前游标cursor
为1,当一个任务须要延时72 ticks
,72=2*32+8,表示排序索引sortIndex
为8,滚动数rollNum
为2。会把任务插入数组索引为sortIndex
+cursor
=9的双向链表位置,索要9处的双向链表维护节点的滚动为2。随着Tick
时间的进行,从当前游标位置运行到数组索引位置9,历时8 ticks
。运行到9时,若是滚动数大于0,则把滚动数减1。等再运行2轮,共须要72 ticks
,任务就会延迟到期,能够从排序链表移除。每一个数组元素对应的双向链表的第一个链表节点的滚动数表示须要转多少轮,节点任务才到期。第二个链表节点的滚动数须要加上第一个节点的滚动数,表示第二个节点须要转的轮数。依次类推。.net
示意图以下:设计
1.2 任务排序链表宏定义
在OS_TSK_SORTLINK_LEN
头文件中定义了一些和任务排序链表相关的宏定义。延迟任务双向链表数组的长度定义为32,高阶bit
位位数为5,低阶bit
位位数为27。对于任务的超时时间,取其高27位做为滚动数,低5位做为数组索引。3d
源码以下:
/** * 延迟任务双向链表数组的数量(桶的数量):32 */ #define OS_TSK_SORTLINK_LEN 32 /** * 高阶bit位数目:5 */ #define OS_TSK_HIGH_BITS 5U /** * 低阶bit位数目:27 */ #define OS_TSK_LOW_BITS (32U - OS_TSK_HIGH_BITS) /** * 滚动数最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111 */ #define OS_TSK_MAX_ROLLNUM (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN) /** * 任务延迟时间数的位宽:5 */ #define OS_TSK_SORTLINK_LOGLEN 5 /** * 延迟任务的桶编号的掩码:3一、0001 1111 */ #define OS_TSK_SORTLINK_MASK (OS_TSK_SORTLINK_LEN - 1U) /** * 滚动数的高阶掩码:1111 1000 0000 0000 0000 0000 0000 0000 */ #define OS_TSK_HIGH_BITS_MASK (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS) /** * 滚动数的低阶掩码:0000 0111 1111 1111 1111 1111 1111 1111 */ #define OS_TSK_LOW_BITS_MASK (~OS_TSK_HIGH_BITS_MASK)
2 任务排序链表操做
咱们分析下任务排序链表的操做,包含初始化,插入,删除,滚动数更新,获取下一个到期时间等。
2.1 初始化排序链表
在系系统内核初始化启动阶段,在函数UINT32 OsTaskInit(VOID)
中初始化任务排序链表。该函数的调用关系以下,main.c:main() --> kernel\src\los_init.c:LOS_KernelInit() --> kernel\src\los_task.c:OsTaskInit()
。
初始化排序链表函数的源码以下:
LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID) { UINT32 size; UINT32 index; LOS_DL_LIST *listObject = NULL; ...... ⑴ size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN; listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size); ⑵ if (listObject == NULL) { (VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray); return LOS_ERRNO_TSK_NO_MEMORY; } ⑶ (VOID)memset_s((VOID *)listObject, size, 0, size); ⑷ g_taskSortLink.sortLink = listObject; g_taskSortLink.cursor = 0; for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) { ⑸ LOS_ListInit(listObject); } return LOS_OK; }
⑴处代码计算须要申请的双向链表的内存大小,OS_TSK_SORTLINK_LEN
为32,即须要为32个双向链表节点申请内存空间。而后申请内存,⑵处申请内存失败时返回相应错误码。⑶处初始化申请的内存区域为0等。⑷处把申请的双向链表节点赋值给g_taskSortLink
的链表节点.sortLink
,做为排序链表的头节点,游标.cursor
初始化为0。而后⑸处的循环,调用LOS_ListInit()
函数把双向链表数组每一个元素都初始化为双向循环链表。
2.2 插入排序链表
插入排序链表的函数为OsTaskAdd2TimerList()
。在任务等待互斥锁/信号量等资源时,都须要调用该函数将任务加入到对应的排序链表中。该函数包含两个入参,第一个参数LosTaskCB *taskCB
用于指定要延迟的任务,第二个参数UINT32 timeout
指定超时等待时间。
源码以下:
LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout) { LosTaskCB *taskDelay = NULL; LOS_DL_LIST *listObject = NULL; UINT32 sortIndex; UINT32 rollNum; ⑴ sortIndex = timeout & OS_TSK_SORTLINK_MASK; rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN); ⑵ (sortIndex > 0) ? 0 : (rollNum--); ⑶ EVALUATE_L(taskCB->idxRollNum, rollNum); ⑷ sortIndex = (sortIndex + g_taskSortLink.cursor); sortIndex = sortIndex & OS_TSK_SORTLINK_MASK; ⑸ EVALUATE_H(taskCB->idxRollNum, sortIndex); ⑹ listObject = g_taskSortLink.sortLink + sortIndex; ⑺ if (listObject->pstNext == listObject) { LOS_ListTailInsert(listObject, &taskCB->timerList); } else { ⑻ taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); do { ⑼ if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) { UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum); } else { ⑽ UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum); break; } ⑾ taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList); } while (&taskDelay->timerList != (listObject)); ⑿ LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList); } }
⑴处代码计算等待时间timeout
的低5位做为数组索引,高27位做为滚动数rollNum
。这2行代码数学上的意义,就是把等待时间处于32获得的商做为滚动数,余数做为数组索引。⑵处代码,若是余数为0,能够整除时,滚动数减1。减1设计的缘由是,在函数VOID OsTaskScan(VOID)
中,每个tick
到来时,若是滚动数大于0,滚动数减1,并继续滚动一圈。后文会分析该函数VOID OsTaskScan(VOID)
。
⑶处代码把滚动数赋值给任务taskCB->idxRollNum
的低27位。⑷处把数组索引加上游标,而后执行⑸赋值给任务taskCB->idxRollNum
的高5位。⑹根据数组索引获取双向链表头结点,⑺若是此处双向链表为空,直接插入链表里。若是链表不为空,执行⑻获取第一个链表节点对应的任务taskDelay
,而后遍历循环双向链表,把任务插入到合适的位置。⑼处若是待插入任务taskCB
的滚动数大于等于当前链表节点对应任务的滚动数,则从待插入任务taskCB
的滚动数中减去当前链表节点对应任务的滚动数,而后执行⑾获取下一个节点继续遍历。⑽处若是待插入任务taskCB
的滚动数小于当前链表节点对应任务的滚动数,则从当前链表节点对应任务的滚动数中减去待插入任务taskCB
的滚动数,而后跳出循环。执行⑿,完成任务插入。插入过程,能够结合上文的示意图进行理解。
2.3 从排序链表中删除
从排序链表中删除的函数为VOID OsTimerListDelete(LosTaskCB *taskCB)
。在任务恢复/删除等场景中,须要调用该函数将任务从任务排序链表中删除。该函数包含一个参数LosTaskCB *taskCB
,用于指定要从排序链表中删除的任务。
源码以下:
LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB) { LOS_DL_LIST *listObject = NULL; LosTaskCB *nextTask = NULL; UINT32 sortIndex; ⑴ sortIndex = UWSORTINDEX(taskCB->idxRollNum); ⑵ listObject = g_taskSortLink.sortLink + sortIndex; ⑶ if (listObject != taskCB->timerList.pstNext) { ⑷ nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList); UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum); } ⑸ LOS_ListDelete(&taskCB->timerList); }
⑴处代码获取待从排序链表中删除的任务对应的数字索引。⑵处代码获取排序链表的头节点listObject
。⑶处代码判断待删除节点是不是最后一个节点,若是不是最后一个节点,执行执行⑷处代码获取待删除节点的下一个节点对应的任务nextTask
,在下一个节点的滚动数中增长待删除节点的滚动数,而后执行⑸处代码执行删除操做。若是是最后一个节点,直接执行⑸处代码删除该节点便可。
2.4 获取下一个超时到期时间
获取下一个超时到期时间的函数为OsTaskNextSwitchTimeGet()
,咱们分析下其代码。
源码以下:
UINT32 OsTaskNextSwitchTimeGet(VOID) { LosTaskCB *taskCB = NULL; UINT32 taskSortLinkTick = LOS_WAIT_FOREVER; LOS_DL_LIST *listObject = NULL; UINT32 tempTicks; UINT32 index; ⑴ for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) { ⑵ listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN); ⑶ if (!LOS_ListEmpty(listObject)) { ⑷ taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); ⑸ tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index; ⑹ tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN); ⑺ if (taskSortLinkTick > tempTicks) { taskSortLinkTick = tempTicks; } } } return taskSortLinkTick; }
⑴处代码循环遍历双向链表数组,⑵处代码从当前游标位置开始获取排序链表的头节点listObject
。⑶处代码判断排序链表是否为空,若是排序链表为空,则继续遍历下一个数组。若是链表不为空,⑷处代码获取排序链表的第一个链表节点对应的任务。⑸处若是遍历的数字索引为0,tick
数目使用32,不然使用具体的数字索引。⑹处获取任务的滚动数,计算出须要的等待时间,加上⑸处计算出的不足滚动一圈的时间。⑺处计算出须要等待的最小时间,即下一个最快到期的时间。
3 排序链表和Tick时间的关系
任务加入到排序链表后,时间一个tick
一个tick
的逝去,排序链表中的滚动数该如何更新呢?
时间每走过一个tick
,系统就会调用Tick
中断的处理函数OsTickHandler()
,该函数在kernel\src\los_tick.c
文件中实现。下面是该函数的代码片断,⑴处代码分别任务的超时到期状况。
LITE_OS_SEC_TEXT VOID OsTickHandler(VOID) { #if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1) platform_tick_handler(); #endif g_ullTickCount++; #if (LOSCFG_BASE_CORE_TIMESLICE == 1) OsTimesliceCheck(); #endif ⑴ OsTaskScan(); // task timeout scan #if (LOSCFG_BASE_CORE_SWTMR == 1) (VOID)OsSwtmrScan(); #endif }
详细分析下函数OsTaskScan()
,来了解排序链表和tick
时间的关系。函数在kernel\base\los_task.c
文件中实现,代码片断以下:
LITE_OS_SEC_TEXT VOID OsTaskScan(VOID) { LosTaskCB *taskCB = NULL; BOOL needSchedule = FALSE; LOS_DL_LIST *listObject = NULL; UINT16 tempStatus; UINTPTR intSave; intSave = LOS_IntLock(); ⑴ g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN; listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor; ⑵ if (listObject->pstNext == listObject) { LOS_IntRestore(intSave); return; } ⑶ for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); &taskCB->timerList != (listObject);) { tempStatus = taskCB->taskStatus; ⑷ if (UWROLLNUM(taskCB->idxRollNum) > 0) { UWROLLNUMDEC(taskCB->idxRollNum); break; } ⑸ LOS_ListDelete(&taskCB->timerList); ⑹ if (tempStatus & OS_TASK_STATUS_PEND) { taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND); LOS_ListDelete(&taskCB->pendList); taskCB->taskSem = NULL; taskCB->taskMux = NULL; } ⑺ else if (tempStatus & OS_TASK_STATUS_EVENT) { taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT); } ⑻ else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) { LOS_ListDelete(&taskCB->pendList); taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE); } else { taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY); } ⑼ if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) { taskCB->taskStatus |= OS_TASK_STATUS_READY; OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB); OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority); needSchedule = TRUE; } if (listObject->pstNext == listObject) { break; } taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList); } LOS_IntRestore(intSave); ⑽ if (needSchedule) { LOS_Schedule(); } }
⑴处代码更新全局变量g_taskSortLink
的游标,指向双向链表数组下一个位置,而后获取该位置的双向链表头结点listObject
。⑵若是链表为空,则返回。若是双向链表不为空,则执行⑶循环遍历每个链表节点。⑷处若是链表节点的滚动数大于0,则滚动数减1,说明任务还须要继续等待一轮。若是链表节点的滚动数等于0,说明任务超时到期,执行⑸从排序链表中删除。接下来须要根据任务状态分别处理,⑹处若是代码是阻塞状态,取消阻塞状态,并从阻塞链表中删除。⑺处若是任务阻塞在事件中,取消阻塞状态。⑻若是任务阻塞在队列,从阻塞链表中删除,取消阻塞状态,若是不是上述状态,取消延迟状态OS_TASK_STATUS_DELAY
。⑼处若是代码是挂起状态,设置任务为就绪状态,加入任务就绪队列,设置须要从新调度标记。⑽若是设置须要从新调度,调用调度函数触发任务调度。
小结
掌握鸿蒙轻内核的排序链表TaskSortLinkAttr
这一重要的数据结构,会给进一步学习、分析鸿蒙轻内核源代码打下了基础,让后续的学习更加容易。后续也会陆续推出更多的分享文章,敬请期待,也欢迎你们分享学习、使用鸿蒙轻内核的心得,有任何问题、建议,均可以留言给咱们: https://gitee.com/openharmony/kernel_liteos_m/issues 。为了更容易找到鸿蒙轻内核代码仓,建议访问 https://gitee.com/openharmony/kernel_liteos_m ,关注Watch
、点赞Star
、并Fork
到本身帐户下,谢谢。