注意:本文中列出的全部代码只是 Proof Of Concept,基本上都没有进行错误处理。另外对于一些边际状况,也可能没有考虑清楚。因此对于直接复制文中代码到项目中所形成的一切后果,请自负责任。编程
OK,言归正题。OpenResty 提供了以 ngx.thread.*
,coroutine.*
和 ngx.semaphore
等一系列协程 API。虽然受限于 Nginx 的请求处理方式,表现力不如通用语言的协程 API 那么强大。可是开开脑洞,仍是能够玩出一些花样来的。
借助这些 API,让咱们尝试模拟下其余编程平台里面的调度方式。promise
Java 里的 Future 可让咱们建立一个任务,而后在须要的时候才去 get 任务的返回值。另外 Future 还有超时功能。
咱们能够启用一个协程来完成具体的任务,再加一个定时结束的协程,用于实现超时。app
像这样:dom
local function task() ngx.sleep(3) ngx.say("Done") end local task_thread = ngx.thread.spawn(task) local timeout_thread = ngx.thread.spawn(function(timeout) ngx.sleep(timeout) error("timeout") end, 2) local ok, res = ngx.thread.wait(task_thread, timeout_thread) if not ok then if res == "timeout" then ngx.thread.kill(task_thread) ngx.say("task cancelled by timeout") return end ngx.say("task failed, result: ", res) end ngx.thread.kill(timeout_thread)
注意一点,在某一协程退出以后,咱们须要 kill 掉另一个协程。由于若是没有调用 ngx.exit
之类的方法显式退出的话,一直到全部协程退出为止,当前阶段都不会结束。post
引用文档里相关的内容:ui
By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate untillua
both the "entry thread" and all the user "light threads" terminates,spa
a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), orcode
the "entry thread" terminates with a Lua error.协程
Promise.race/all 能够接收多个 Promise,而后打包成一个新的 Promise 返回。引用相关的文档:
The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.
The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.
这里 reject 等价于协程运行中抛出 error,而 resolve 相对于协程返回告终果。这两个 API 对于 reject 的处理是一致的,都是有任一出错则马上返回异常结果。对于正常结果,race 会在第一个结果出来时返回,而 all 则会在全部结果都出来后返回。
值得注意的是,Javascript 原生的 Promise 暂时没有 cancell 的功能。因此即便其中一个 Promise reject 了,其余 Promise 依然会继续运行。对此咱们也照搬过来。
Promise.race 的实现:
local function apple() ngx.sleep(0.1) --error("apple lost") return "apple done" end local function banana() ngx.sleep(0.2) return "banana done" end local function carrot() ngx.sleep(0.3) return "carrot done" end local function race(...) local functions = {...} local threads = {} for _, f in ipairs(functions) do local th, err = ngx.thread.spawn(f) if not th then -- Promise.race 没有实现 cancell 接口, -- 因此我偷下懒,无论已经建立的协程了 return nil, err end table.insert(threads, th) end local ok, res = ngx.thread.wait(unpack(threads)) if not ok then return nil, res end return res end local res, err = race(apple, banana, carrot) ngx.say("res: ", res, " err: ", err) ngx.exit(ngx.OK)
Promise.all 的实现:
local function all(...) local functions = {...} local threads = {} for _, f in ipairs(functions) do local th, err = ngx.thread.spawn(f) if not th then return nil, err end table.insert(threads, th) end local res_group = {} for _ = 1, #threads do local ok, res = ngx.thread.wait(unpack(threads)) if not ok then return nil, res end table.insert(res_group, res) end return res_group end
再进一步,试试模拟下 Go 里面的 channel。
咱们须要实现以下的语义:
当数据没有被消费时,生产者会在发送数据以后中断运行。
当数据没有被生产时,消费者会在接收数据以前中断运行。
当存在等待消费者接收数据的生产者时,其余生产者会在发送数据以前中断运行。
此次要用到 ngx.semaphore
。
local semaphore = require "ngx.semaphore" local Chan = { new = function(self) local chan_attrs = { _read_sema = semaphore:new(), _write_sema = semaphore:new(), _exclude_sema = semaphore:new(), _buffer = nil, _waiting_thread_num = 0, } return setmetatable(chan_attrs, {__index = self}) end, send = function(self, value, timeout) timeout = timeout or 60 while self._buffer do self._waiting_thread_num = self._waiting_thread_num + 1 self._exclude_sema:wait(timeout) self._waiting_thread_num = self._waiting_thread_num - 1 end self._buffer = value self._read_sema:post() self._write_sema:wait(timeout) end, receive = function(self, timeout) timeout = timeout or 60 self._read_sema:wait(timeout) local value = self._buffer self._buffer = nil self._write_sema:post() if self._waiting_thread_num > 0 then self._exclude_sema:post() end return value end, } local chan = Chan:new() -- 如下是使用方法 local function worker_a(ch) for i = 1, 10 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_c(ch) for i = 11, 20 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_d(ch) for i = 21, 30 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_b(ch) for _ = 1, 20 do ngx.sleep(math.random() / 10) local v = ch:receive(1) ngx.say("recv ", v) end end local function worker_e(ch) for _ = 1, 10 do ngx.sleep(math.random() / 10) local v = ch:receive(1) ngx.say("recv ", v) end end ngx.thread.spawn(worker_a, chan) ngx.thread.spawn(worker_b, chan) ngx.thread.spawn(worker_c, chan) ngx.thread.spawn(worker_d, chan) ngx.thread.spawn(worker_e, chan)
模拟 Buffered channel 也是可行的。
local ok, new_tab = pcall(require, "table.new") if not ok then new_tab = function (_, _) return {} end end local BufferedChan = { new = function(self, buffer_size) if not buffer_size or buffer_size <= 0 then error("Invalid buffer_size " .. (buffer_size or "nil") .. " given") end local chan_attrs = { _read_sema = semaphore:new(), _write_sema = semaphore:new(), _waiting_thread_num = 0, _buffer_size = buffer_size, } chan_attrs._buffer = new_tab(buffer_size, 0) return setmetatable(chan_attrs, {__index = self}) end, send = function (self, value, timeout) timeout = timeout or 60 while #self._buffer >= self._buffer_size do self._waiting_thread_num = self._waiting_thread_num + 1 self._write_sema:wait(timeout) self._waiting_thread_num = self._waiting_thread_num - 1 end table.insert(self._buffer, value) self._read_sema:post() end, receive = function(self, timeout) timeout = timeout or 60 self._read_sema:wait(timeout) local value = table.remove(self._buffer) if self._waiting_thread_num > 0 then self._write_sema:post() end return value end, } local chan = BufferedChan:new(2) -- ...
固然上面的山寨货仍是有不少问题的。好比它缺乏相当重要的 select 支持,另外也没有实现 close 相关的特性。