玩转 OpenResty 协程 API

注意:本文中列出的全部代码只是 Proof Of Concept,基本上都没有进行错误处理。另外对于一些边际状况,也可能没有考虑清楚。因此对于直接复制文中代码到项目中所形成的一切后果,请自负责任。编程

OK,言归正题。OpenResty 提供了以 ngx.thread.*coroutine.*ngx.semaphore 等一系列协程 API。虽然受限于 Nginx 的请求处理方式,表现力不如通用语言的协程 API 那么强大。可是开开脑洞,仍是能够玩出一些花样来的。
借助这些 API,让咱们尝试模拟下其余编程平台里面的调度方式。promise

模拟 Java 里面的 Future

Java 里的 Future 可让咱们建立一个任务,而后在须要的时候才去 get 任务的返回值。另外 Future 还有超时功能。
咱们能够启用一个协程来完成具体的任务,再加一个定时结束的协程,用于实现超时。app

像这样:dom

local function task()
    ngx.sleep(3)
    ngx.say("Done")
end

local task_thread = ngx.thread.spawn(task)
local timeout_thread = ngx.thread.spawn(function(timeout)
    ngx.sleep(timeout)
    error("timeout")
end, 2)
local ok, res = ngx.thread.wait(task_thread, timeout_thread)
if not ok then
    if res == "timeout" then
        ngx.thread.kill(task_thread)
        ngx.say("task cancelled by timeout")
        return
    end
    ngx.say("task failed, result: ", res)
end
ngx.thread.kill(timeout_thread)

注意一点,在某一协程退出以后,咱们须要 kill 掉另一个协程。由于若是没有调用 ngx.exit 之类的方法显式退出的话,一直到全部协程退出为止,当前阶段都不会结束。post

引用文档里相关的内容:ui

By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate untillua

  1. both the "entry thread" and all the user "light threads" terminates,spa

  2. a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), orcode

  3. the "entry thread" terminates with a Lua error.协程

模拟 Javascript 里面的 Promise.race/all

Promise.race/all 能够接收多个 Promise,而后打包成一个新的 Promise 返回。引用相关的文档:

The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.

The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.

这里 reject 等价于协程运行中抛出 error,而 resolve 相对于协程返回告终果。这两个 API 对于 reject 的处理是一致的,都是有任一出错则马上返回异常结果。对于正常结果,race 会在第一个结果出来时返回,而 all 则会在全部结果都出来后返回。
值得注意的是,Javascript 原生的 Promise 暂时没有 cancell 的功能。因此即便其中一个 Promise reject 了,其余 Promise 依然会继续运行。对此咱们也照搬过来。

Promise.race 的实现:

local function apple()
    ngx.sleep(0.1)
    --error("apple lost")
    return "apple done"
end

local function banana()
    ngx.sleep(0.2)
    return "banana done"
end

local function carrot()
    ngx.sleep(0.3)
    return "carrot done"
end

local function race(...)
    local functions = {...}
    local threads = {}
    for _, f in ipairs(functions) do
        local th, err = ngx.thread.spawn(f)
        if not th then
            -- Promise.race 没有实现 cancell 接口,
            -- 因此我偷下懒,无论已经建立的协程了
            return nil, err
        end
        table.insert(threads, th)
    end
    local ok, res = ngx.thread.wait(unpack(threads))
    if not ok then
        return nil, res
    end
    return res
end

local res, err = race(apple, banana, carrot)
ngx.say("res: ", res, " err: ", err)
ngx.exit(ngx.OK)

Promise.all 的实现:

local function all(...)
    local functions = {...}
    local threads = {}
    for _, f in ipairs(functions) do
        local th, err = ngx.thread.spawn(f)
        if not th then
            return nil, err
        end
        table.insert(threads, th)
    end
    local res_group = {}
    for _ = 1, #threads do
        local ok, res = ngx.thread.wait(unpack(threads))
        if not ok then
            return nil, res
        end
        table.insert(res_group, res)
    end
    return res_group
end

模拟 Go 里面的 channel (仅部分实现)

再进一步,试试模拟下 Go 里面的 channel。
咱们须要实现以下的语义:

  1. 当数据没有被消费时,生产者会在发送数据以后中断运行。

  2. 当数据没有被生产时,消费者会在接收数据以前中断运行。

  3. 当存在等待消费者接收数据的生产者时,其余生产者会在发送数据以前中断运行。

此次要用到 ngx.semaphore

local semaphore = require "ngx.semaphore"

local Chan = {
    new = function(self)
        local chan_attrs = {
            _read_sema = semaphore:new(),
            _write_sema = semaphore:new(),
            _exclude_sema = semaphore:new(),
            _buffer = nil,
            _waiting_thread_num = 0,
        }
        return setmetatable(chan_attrs, {__index = self})
    end,
    send = function(self, value, timeout)
        timeout = timeout or 60
        while self._buffer do
            self._waiting_thread_num = self._waiting_thread_num + 1
            self._exclude_sema:wait(timeout)
            self._waiting_thread_num = self._waiting_thread_num - 1
        end
        self._buffer = value
        self._read_sema:post()
        self._write_sema:wait(timeout)
    end,
    receive = function(self, timeout)
        timeout = timeout or 60
        self._read_sema:wait(timeout)
        local value = self._buffer
        self._buffer = nil
        self._write_sema:post()
        if self._waiting_thread_num > 0 then
            self._exclude_sema:post()
        end
        return value
    end,
}

local chan = Chan:new()

-- 如下是使用方法
local function worker_a(ch)
    for i = 1, 10 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end

local function worker_c(ch)
    for i = 11, 20 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end

local function worker_d(ch)
    for i = 21, 30 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end


local function worker_b(ch)
    for _ = 1, 20 do
        ngx.sleep(math.random() / 10)
        local v = ch:receive(1)
        ngx.say("recv ", v)
    end
end

local function worker_e(ch)
    for _ = 1, 10 do
        ngx.sleep(math.random() / 10)
        local v = ch:receive(1)
        ngx.say("recv ", v)
    end
end

ngx.thread.spawn(worker_a, chan)
ngx.thread.spawn(worker_b, chan)
ngx.thread.spawn(worker_c, chan)
ngx.thread.spawn(worker_d, chan)
ngx.thread.spawn(worker_e, chan)

模拟 Buffered channel 也是可行的。

local ok, new_tab = pcall(require, "table.new")
if not ok then
    new_tab = function (_, _) return {} end
end


local BufferedChan = {
    new = function(self, buffer_size)
        if not buffer_size or buffer_size <= 0 then
            error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")
        end
        local chan_attrs = {
            _read_sema = semaphore:new(),
            _write_sema = semaphore:new(),
            _waiting_thread_num = 0,
            _buffer_size = buffer_size,
        }
        chan_attrs._buffer = new_tab(buffer_size, 0)
        return setmetatable(chan_attrs, {__index = self})
    end,
    send = function (self, value, timeout)
        timeout = timeout or 60
        while #self._buffer >= self._buffer_size do
            self._waiting_thread_num = self._waiting_thread_num + 1
            self._write_sema:wait(timeout)
            self._waiting_thread_num = self._waiting_thread_num - 1
        end
        table.insert(self._buffer, value)
        self._read_sema:post()
    end,
    receive = function(self, timeout)
        timeout = timeout or 60
        self._read_sema:wait(timeout)
        local value = table.remove(self._buffer)
        if self._waiting_thread_num > 0 then
            self._write_sema:post()
        end
        return value
    end,
}

local chan = BufferedChan:new(2)
-- ...

固然上面的山寨货仍是有不少问题的。好比它缺乏相当重要的 select 支持,另外也没有实现 close 相关的特性。

相关文章
相关标签/搜索