怎样写一个相似ROS的易用的android机器人框架(3)android
为避免机器人执行多任务时对传感器,执行机构的占用冲突,同时又有知足机器人响应突发任务的需求,设计这样的任务框架:多线程
多个任务是排队执行的,即同一时间只有一个任务处于运行状态,任务执行过程当中若是有新任务到来并容许容许,当前任务会暂停,保存任务进度和状态后,再执行新任务,新任务结束后,再恢复执行暂停保存的任务。 每一个任务都有onStart
,onStop
,onPause
,onResume
四个生命周期,以便根据任务的不一样状态进行相应的任务参数设置。 当没有任务时,系统插入一个用户定义的空闲任务,用于通知系统进入待机模式或者控制显示待机界面。 完整的流程以下框架
IdelTask onStart
-> IdleTask onStop
-> Task onStart
-> Task onPause
-> NewTask onStart
-> NewTask onStop
-> Task onResume
-> Task onStop
-> IdleTask onStart
async
当前任务不可恢复时,不会走 onPause
,onResume
,流程以下: IdelTask onStart
-> IdleTask onStop
-> Task onStart
-> Task onStop
-> NewTask onStart
-> NewTask onStop
-> IdleTask onStart
函数
当前任务可恢复但不可打断,可是用户又想要当即执行新任务时,经过接口通知用户选择马上结束或暂停当前任务仍是保持现状。保持现状则新任务只能等到当前任务自行结束后自行,不然,将暂停或者结束当前任务,而后走如上的流程oop
代码位于 ai.easy.robot.task
包名下this
先定义任务接口,以便实现不一样的扩展,接口定义以下:.net
// 机器人任务接口,因为处理其行为逻辑 interface ITask { // 任務名稱 val name: String // 任务正在运行 val isAlive: Boolean // 任务是否能够恢复 fun canResume(): Boolean //任务开始时触发 fun onStart(ctx: TaskContext) //任务结束时触发 fun onStop(ctx: TaskContext) //任务暂停时触发 fun onPause(ctx: TaskContext) //任务恢复时触发 fun onResume(ctx: TaskContext) }
接着定义TaskPool, TaskPool是实现任务调度的类,其用过一个等待任务队列,保存等待执行的任务,还有一个任务栈,保存已经执行的和被暂停保存起来的任务线程
private val waitingTask: LinkedList<TaskTableItem> = LinkedList() private val taskStack: Stack<TaskTableItem> = Stack()
taskStack的栈顶元素即为当前任务设计
TaskTableItem
为任务的信息类 定义以下:
internal data class TaskTableItem( val name: String, val task: ITask, val ctx: TaskContext, val startData: Bundle? = null, val addedTime: Long, var startTime: Long = -1, //开始时间,用于延时任务 var pauseTime: Long = -1, var canInterrupted: Boolean = true, var forceStop: Boolean = false , var priority: Int = 0 )
每一个任务都有独立的与之绑定的 TaskContext , 随着任务 onStart 时建立,onStop 时释放。
经过调用 TaskPool 的 mainLoop()
执行任务列表的调度流程,具体流程见代码,为了不多线程开销,这里用了kotlin的协程实现:
val idleCtx = TaskContext(this) idleCtx.cc = context while (looping) { //是否有新任务 if (waitingTask.size > 0) { val ti = waitingTask.peek() //检测任务添加时间是否失效 if (checkOutDate(ti!!)) { waitingTask.poll() continue } val wanna_task = ti.task // // if (isIdle) { myIdleTask.onStop(idleCtx) log("Idle任务中止") isIdle = false val ctx = ti.ctx ctx.startData = ti.startData ?: idleCtx.startData ctx.cc = context // idleCtx.release() log("${wanna_task.name}任务开始") waitingTask.poll() wanna_task.onStart(ctx) } else if (taskStack.size > 0) { //当前有任务在运行 val cur = taskStack.peek() val cur_task = cur.task val cur_ctx = cur.ctx //当前任务不可打断 if (!cur.canInterrupted && cur_task.isAlive) { delay(20) continue } // if (cur_task.canResume() && cur_task.isAlive && !cur.forceStop) { cur_task.onPause(cur_ctx) log("${cur_task.name}任务暂停") } else { // cur_task.onStop(cur_ctx) log("${cur_task.name}任务中止") cur_ctx.release() taskStack.pop() // } // ti.ctx.startData = ti.startData ?: cur_ctx.startData ti.ctx.cc = context log("${wanna_task.name}任务开始") waitingTask.poll() wanna_task.onStart(ti.ctx) } //将可恢复任务压入堆栈 taskStack.push(ti) // continue } //没有新任务时 //检查当前是否还有任务,弹出失效的任务 if (taskStack.size > 0) { val cur = taskStack.peek() val cur_task = cur.task val cur_ctx = cur.ctx if (!cur_task.isAlive) { cur_task.onStop(cur_ctx) log("${cur_task.name}任务中止") //任务已中止,弹出 taskStack.pop() cur_ctx.release() // if (taskStack.size > 0) { taskStack.peek().let { log("${it.task.name}任务恢复") it.task.onResume(it.ctx) } } // continue } } //无任何任务,转入空闲状态 if (taskStack.size == 0) { if (!isIdle) { log("Idle任务开始") idleTask?.onStart(idleCtx) isIdle = true } } // delay(25) //挂起防止线程堵塞 }
经过调用 TaskPool 的 addNewTask()
通知TaskPool执行新的任务。
须要注意
须要根据新任务的优先级插入到合适的位置。
经过 InterruptComingSelector
的实现类显示UI通知用户进一步的操做,InterruptComingSelector
的定义以下:
/** * 询问用户是否打断任务的通知器 */ abstract class InterruptComingSelector { companion object { const val CHOICE_STOP = 1 const val CHOICE_PAUSE = 2 const val CHOICE_DO_NOTHING = 0 } /** * 开始询问用户时 * @param taskName 当前任务名 * @param canResume 当前任务可恢复 */ abstract fun makeSelection(taskName: String, canResume: Boolean) /** * 当前任务已结束或者用户取消选择时 */ abstract fun onFinishOrCancel() internal var isFinished: Boolean = false internal var userChoice: Int = -1 /** * 设置用户的选择结果 * @param choice * @see CHOICE_STOP * @see CHOICE_PAUSE * @see CHOICE_DO_NOTHING */ fun finish(choice: Int) { userChoice = choice isFinished = true } }
InterruptComingSelector
的生命周期是由 TaskContext 管理的,以便任务结束而用户还未作出选择时 销毁其显示的UI
TaskContext 实现提供任务内资源的管理,提供与TaskPool的通信,提供任务定时器等功能。
同时TaskContext经过一个子任务队列功能。子任务是比任务更小的任务,没有暂停和恢复选项,子任务只能按顺序一个个执行,可是当个子任务的run()
执行函数内能够并行运行多个函数,这些实际上是用协程实现的。子任务定义以下:
interface ISubTask { /** * 设置执行任务超时,一旦超时,将结束子任务run()并触发onCancel(), 单位: ms */ fun timeout(): Int /** * 子任务的主要工做 * @return 决定下个子任务是否继续 */ fun run(paraMgr: SubTaskParallelManager): Boolean /** * 子任务正常结束或者运行超时时触发 * @param isTimeout * @return 决定下个子任务是否继续 */ fun onCancel(isTimeout: Boolean): Boolean }
TaskContext.createSubTaskQueue()
建立一个子任务队列,该队列会在TaskContext释放时释放。
add()
往队列了添加子任务start()
开始运行队列中的子任务forceCancel()
是取消剩下的子任务,清空队列pauseSubTasks()
是在任务 onPause时暂停队列运行resumeSubTasks()
是在任务 onResume时恢复队列运行start()的实现以下:
fun start(whenFinish: () -> Unit) { work = async(ctx.cctx) { // var ok = true while (isActive && queue.size > 0 && ok) { //暂停时挂起 if (isPause) { delay(100) continue } val sub = queue.poll() val t = sub.timeout().toLong() val timeout = if (t < 100L) 100L else t var is_timeout = false try { withTimeout(t) { currJob = async(context) { sub.run(SubTaskParallelManager(ctx.cctx)) } ok = currJob?.await() ?: false } } catch (e: Throwable) { e.printStackTrace() is_timeout = true } finally { val r = sub.onCancel(is_timeout) ok = if (is_timeout) r else ok } } // currJob = null isFinished = true whenFinish() true } }
这个任务框架是经过检查 ITask的 isAlive 的值判断任务结束的,用户扩展ITask时须要在任务结束时将isAlive设为false
任务的onStart
,onStop
,onPause
,onResume
不该该堵塞线程,若是是长时间运行的任务,可经过TaskContext.doMainWork()
来执行, 配合TaskContext.delayMs()
来延时。若是doMainWork()
中有循环, 须要在循环体中调用 suspendMainWorkWhenPaused()
, 这样任务暂停是调用pauseMainWork()
, 就能在suspendMainWorkWhenPaused()
处挂起。