最近在xmake中,用lua的协程实现了多任务编译,效果仍是不错的,不事后来发现一个问题:linux
若是全部编译进程都在处理编译,没有退出的时候,xmake的lua主进程会不断地在这些任务间,不停的切换轮询进程的状态,可是有没有机会执行其余任务,致使cpu太高,抢占了编译进程的cpu时间。。git
那若是在等不到完成的进程时候,加入sleep等待呢,又会致使编译速度变慢,无法合理利用cpu。。github
所以,为了解决这个问题,我打算扩展下lua的接口,实现了一个跨平台的多进程等待接口: process.waitlist
实现多个未完成进程的同时等待,让出xmake主进程的cpu时间,给其余编译进程充分利用shell
xmake中的lua代码以下:windows
-- wait processes local tasks_finished = {} local procs_count = #procs if procs_count > 0 then -- wait them local procinfos = process.waitlist(procs, ifelse(procs_count < jobs, 0, -1)) for _, procinfo in ipairs(procinfos) do -- the process info local proc = procinfo[1] local procid = procinfo[2] local status = procinfo[3] -- check assert(procs[procid] == proc) -- resume this task local job_task = tasks[procid] local job_proc = coroutine.resume(job_task, 1, status) -- the other process is pending for this task? if coroutine.status(job_task) ~= "dead" then -- check assert(job_proc) -- update the pending process procs[procid] = job_proc -- this task has been finised? else -- mark this task as finised tasks_finished[procid] = true end end end
在os.exec
运行进程的接口实现中,若是当前进程没有当即退出,就经过协程切换出去,知道上面的多进程等待,获取到实际的退出进程后,直接定向切换到退出进程的os.exec
中,继续完成后续操做,这样就不会有冗余切换问题:oop
-- execute shell function os.exec(cmd, outfile, errfile) -- open command local ok = -1 local proc = process.open(cmd, outfile, errfile) if proc ~= nil then -- wait process local waitok = -1 local status = -1 if coroutine.running() then -- save the current directory local curdir = os.curdir() -- wait it repeat -- poll it waitok, status = process.wait(proc, 0) if waitok == 0 then -- 外面的多进程等待到实际的状态值后,直接进行处理 waitok, status = coroutine.yield(proc) end until waitok ~= 0 -- resume the current directory os.cd(curdir) else waitok, status = process.wait(proc, -1) end -- get status if waitok > 0 then ok = status end -- close process process.close(proc) end -- ok? return ok end
lua的上层调用有了,那怎么去实现这个跨平台的多进程等待呢?this
在windows上咱们能想到就是WaitForMultipleObjects
这个接口了,我把它封装到了tbox里面具体实现以下:lua
tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout) { // check tb_assert_and_check_return_val(processes && infolist && infomaxn, -1); // make the process list tb_size_t procsize = 0; HANDLE proclist[256] = {0}; tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && procsize < tb_arrayn(proclist); pprocess++, procsize++) proclist[procsize] = (*pprocess)->pi.hProcess; tb_assertf(procsize < tb_arrayn(proclist), "too much waited processes!"); // wait processes DWORD exitcode = 0; tb_long_t infosize = 0; DWORD result = tb_kernel32()->WaitForMultipleObjects(procsize, proclist, FALSE, timeout < 0? INFINITE : (DWORD)timeout); switch (result) { case WAIT_TIMEOUT: break; case WAIT_FAILED: return -1; default: { // the process index DWORD index = result - WAIT_OBJECT_0; // the process tb_process_t* process = (tb_process_t*)processes[index]; tb_assert_and_check_return_val(process, -1); // save process info infolist[infosize].index = index; infolist[infosize].process = (tb_process_ref_t)process; infolist[infosize].status = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1; infosize++; // close thread handle tb_kernel32()->CloseHandle(process->pi.hThread); process->pi.hThread = INVALID_HANDLE_VALUE; // close process tb_kernel32()->CloseHandle(process->pi.hProcess); process->pi.hProcess = INVALID_HANDLE_VALUE; // next index index++; while (index < procsize) { // attempt to wait next process result = tb_kernel32()->WaitForMultipleObjects(procsize - index, proclist + index, FALSE, 0); switch (result) { case WAIT_TIMEOUT: // no more, exit loop index = procsize; break; case WAIT_FAILED: return -1; default: { // the process index index += result - WAIT_OBJECT_0; // the process process = (tb_process_t*)processes[index]; tb_assert_and_check_return_val(process, -1); // save process info infolist[infosize].index = index; infolist[infosize].process = (tb_process_ref_t)process; infolist[infosize].status = tb_kernel32()->GetExitCodeProcess(process->pi.hProcess, &exitcode)? (tb_long_t)exitcode : -1; infosize++; // close thread handle tb_kernel32()->CloseHandle(process->pi.hThread); process->pi.hThread = INVALID_HANDLE_VALUE; // close process tb_kernel32()->CloseHandle(process->pi.hProcess); process->pi.hProcess = INVALID_HANDLE_VALUE; // next index index++; } break; } } } break; } // ok? return infosize; }
若是在linux以及其余posix系统上,可使用wait
或者waitpid
接口,其实wait
也就是至关于调用了 waitpid(-1, &status, ..)
,因此我这里就直接使用waitpid
来实现了。。code
它跟windows的WaitForMultipleObjects
有些不一样,不能传递指定须要等待哪些进程句柄,想要等待多个进程,只能传递-1,表示等待全部子进程协程
不过咱们在封装接口的时候,能够仍是传入多个要等待的子进程列表,若是获取到的子进程不在这个列表里面,就直接忽略掉,有的话就返回出来,这样的话,行为上就跟windows的差很少了。。
tb_long_t tb_process_waitlist(tb_process_ref_t const* processes, tb_process_waitinfo_ref_t infolist, tb_size_t infomaxn, tb_long_t timeout) { // check tb_assert_and_check_return_val(processes && infolist && infomaxn, -1); // done tb_long_t infosize = 0; tb_hong_t time = tb_mclock(); do { // wait it tb_int_t status = -1; tb_long_t result = waitpid(-1, &status, timeout < 0? 0 : WNOHANG | WUNTRACED); tb_check_return_val(result != -1, -1); // exited? if (result != 0) { // find this process tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && (*pprocess)->pid != result; pprocess++) ; // found? if (*pprocess) { // save process info infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes; infolist[infosize].process = (tb_process_ref_t)*pprocess; infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1; infosize++; // attempt to wait other processes while (infosize < infomaxn) { // attempt to wait it status = -1; result = waitpid(-1, &status, WNOHANG | WUNTRACED); // error or timeout? end tb_check_break(result != 0); // find this process tb_process_t const** pprocess = (tb_process_t const**)processes; for (; *pprocess && (*pprocess)->pid != result; pprocess++) ; // found? if (*pprocess) { // save process info infolist[infosize].index = (tb_process_ref_t const*)pprocess - processes; infolist[infosize].process = (tb_process_ref_t)*pprocess; infolist[infosize].status = WIFEXITED(status)? WEXITSTATUS(status) : -1; infosize++; } else break; } // end break; } } // wait some time if (timeout > 0) tb_msleep(tb_min(timeout, 60)); } while (timeout > 0 && tb_mclock() - time < (tb_hong_t)timeout); // ok? return infosize; }
最后贴下这个跨平台接口的是如何使用的,这里给了一个比较完整的demo
// init processes tb_size_t count1 = 0; tb_process_ref_t processes1[5] = {0}; tb_process_ref_t processes2[5] = {0}; for (; count1 < 4; count1++) { processes1[count1] = tb_process_init(argv[1], (tb_char_t const**)(argv + 1), tb_null); tb_assert_and_check_break(processes1[count1]); } // ok? while (count1) { // trace tb_trace_i("waiting: %ld", count1); // wait processes tb_long_t infosize = -1; tb_process_waitinfo_t infolist[4]; if ((infosize = tb_process_waitlist(processes1, infolist, tb_arrayn(infolist), -1)) > 0) { tb_size_t i = 0; for (i = 0; i < infosize; i++) { // trace tb_trace_i("process(%ld:%p) exited: %ld", infolist[i].index, infolist[i].process, infolist[i].status); // exit process if (infolist[i].process) tb_process_exit(infolist[i].process); // remove this process processes1[infolist[i].index] = tb_null; } // update processes tb_size_t count2 = 0; for (i = 0; i < count1; i++) { if (processes1[i]) processes2[count2++] = processes1[i]; } tb_memcpy(processes1, processes2, count2 * sizeof(tb_process_ref_t)); processes1[count2] = tb_null; count1 = count2; } }