ngx_lua中的协程调度(三)

经过lua-nginx-module的ngx.socket能够方便的创建与其余服务器的链接和数据传输,这些也是lua-resty-redis,lua-resty-mysql等众多请求第三方服务的模块的基础。这里只介绍ngx.socket.tcp,udp的实现相似。mysql

经过lua_resume返回值

在Lua中经过下面的方式使用ngx.socket APInginx

local sock = ngx.socket.tcp()
local ok, err = sock:connect("google.com", 80)
if not ok then
    return
end

local ok, err = sock:send("GET / HTTP/1.0\r\nHost:google.com\r\n\r\n")
if not ok then
    return
end

local data, err = sock:receive()
if not data then
    return
end

sock:connect,sock:send,sock:receive这三个均可能不能当即完成,都是经过与ngx.sleep相似的方式,先调用lua_yield将协程挂起,等到事件就绪(或链接创建,或收到数据包等)再调用lua_resume继续运行协程。这样底层是异步结构,在Lua协程中看一个Lua脚本确实连续的在执行。redis

不一样的是ngx.sleep没有任何返回值,socket操做确是须要的,创建链接须要知道成功了仍是失败了,发送接受数据须要知道是否成功,失败时也须要知道缘由。这些方法的返回值是经过lua_resume实现的。sql

以connect为例下面这行代码能够分红两部分,api

  1. sock:connect函数调用
  2. 将函数调用的结果赋值给ok,err两个局部变量
local ok, err = sock:connect("GET")

第一步调用sock:connect函数的时候发生了协程的挂起(lua_yield),协程继续运行时是从第二步开始的。那么给ok,err这两个局部变量赋于什么值呢?值从哪里来呢?很明显只能经过lua_resume来操做。服务器

下面是lua_resume的声明,第二个参数是返回值的个数。异步

int lua_resume (lua_State *L, int narg);

在执行lua_resume前将返回值压到栈上,调用lua_resume的第二个参数控制返回值的个数。socket

connect操做成功时,经过下面的操做tcp

lua_pushboolean(L, 1);
lua_resume(L, 1);

将ok赋值为true,err赋值为nil,代表操做成功。函数

失败时,用下面的方法,将ok赋值为nil,err赋值为错误信息,这里使用"connect refused"只是为了示例。

lua_pushnil(L);
lua_pushliteral(L, "connect refuled");
lua_resume(L, 2);

 

注册API

lua-nginx-module经过ngx_http_lua_inject_socket_tcp_api将tcpx相关的API注册到ngx中。这里其实只是注册了ngx.socket.tcp(ngx.socket.stream)和ngx.socket.connect。ngx.socket.connect只是一个语法糖,即便去掉也没有什么影响。

void
ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
{
    ngx_int_t         rc;

    lua_createtable(L, 0, 4 /* nrec */);    /* ngx.socket */

    lua_pushcfunction(L, ngx_http_lua_socket_tcp);
    lua_pushvalue(L, -1);
    lua_setfield(L, -3, "tcp");
    lua_setfield(L, -2, "stream");

    {
        const char  buf[] = "local sock = ngx.socket.tcp()"
                            " local ok, err = sock:connect(...)"
                            " if ok then return sock else return nil, err end";

        rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "=ngx.socket.connect");
    }

    if (rc != NGX_OK) {
        ngx_log_error(NGX_LOG_CRIT, log, 0,
                      "failed to load Lua code for ngx.socket.connect(): %i",
                      rc);

    } else {
        lua_setfield(L, -2, "connect");
    }

    lua_setfield(L, -2, "socket");

为何这里没有connect, send, receive等方法呢?这个涉及到ngx.socket.tcp方法的实现。

 

ngx.socket.tcp

调用ngx.socket.tcp时,实际执行的是ngx_http_lua_socket_tcp这个函数。这里只是建立了一个Lua中的table并将其返回,并无真正建立一个socket。

注意这里的lua_setmetatable(L, -2)。 在建立table的同时给这个table设置了一个元表,将socket须要执行的connect,send,receive等方法放在了元表中。

static int
ngx_http_lua_socket_tcp(lua_State *L)
{
    ngx_http_request_t      *r;
    ngx_http_lua_ctx_t      *ctx;

    if (lua_gettop(L) != 0) {
        return luaL_error(L, "expecting zero arguments, but got %d",
                          lua_gettop(L));
    }

    r = ngx_http_lua_get_req(L);
    if (r == NULL) {
        return luaL_error(L, "no request found");
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return luaL_error(L, "no ctx found");
    }

    ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE
                               | NGX_HTTP_LUA_CONTEXT_ACCESS
                               | NGX_HTTP_LUA_CONTEXT_CONTENT
                               | NGX_HTTP_LUA_CONTEXT_TIMER
                               | NGX_HTTP_LUA_CONTEXT_SSL_CERT
                               | NGX_HTTP_LUA_CONTEXT_SSL_SESS_FETCH);

    lua_createtable(L, 3 /* narr */, 1 /* nrec */);
    lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key);
    lua_rawget(L, LUA_REGISTRYINDEX);
    lua_setmetatable(L, -2);

    dd("top: %d", lua_gettop(L));

    return 1;
}

相似这样的使用, sock只是一个空的table,调用connect方法时利用了Lua中元表的特性来实现。

local sock = ngx.socket.tcp()
local ok, err = sock:connect("1.1.1.1", 80)

 

非阻塞socket操做的处理

其他connect, send, receive等的实现与以前介绍的sleep等相似。这里用的是非阻塞的socket,若是操做没有当即结束,会将当前的协程挂起,等到操做完成后再恢复协程运行。

socket的connect, send,receive等操做都可能致使协程的挂起。以发送数据为例,函数ngx_http_lua_socket_tcp_send中,若是当前socket缓冲区已满,此时rc == NGX_AGAIN, 先设置r->write_event_handler回调函数,最后经过lua_yield挂起协程。

/* rc == NGX_AGAIN */

    coctx = ctx->cur_co_ctx;

    ngx_http_lua_cleanup_pending_operation(coctx);
    coctx->cleanup = ngx_http_lua_coctx_cleanup;
    coctx->data = u;

    if (u->raw_downstream) {
        ctx->writing_raw_req_socket = 1;
    }

    if (ctx->entered_content_phase) {
        r->write_event_handler = ngx_http_lua_content_wev_handler;

    } else {
        r->write_event_handler = ngx_http_core_run_phases;
    }

    u->write_co_ctx = coctx;
    u->write_waiting = 1;
    u->write_prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler;

    dd("setting data to %p", u);

    return lua_yield(L, 0);

等到socket的缓冲区空闲,此时socket可写,会调用r->write_event_handler继续运行,最终经过ngx_http_lua_socket_tcp_resume_helper继续协程的运行。

static ngx_int_t
ngx_http_lua_socket_tcp_resume_helper(ngx_http_request_t *r, int socket_op)
{
    int                          nret;
    lua_State                   *vm;
    ngx_int_t                    rc;
    ngx_connection_t            *c;
    ngx_http_lua_ctx_t          *ctx;
    ngx_http_lua_co_ctx_t       *coctx;

    ngx_http_lua_socket_tcp_retval_handler  prepare_retvals;

    ngx_http_lua_socket_tcp_upstream_t      *u;

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return NGX_ERROR;
    }

    ctx->resume_handler = ngx_http_lua_wev_handler;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua tcp operation done, resuming lua thread");

    coctx = ctx->cur_co_ctx;

    dd("coctx: %p", coctx);

    u = coctx->data;

    switch (socket_op) {
    case SOCKET_OP_CONNECT:
    case SOCKET_OP_WRITE:
        prepare_retvals = u->write_prepare_retvals;
        break;

    case SOCKET_OP_READ:
        prepare_retvals = u->read_prepare_retvals;
        break;

    default:
        /* impossible to reach here */
        return NGX_ERROR;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua tcp socket calling prepare retvals handler %p, "
                   "u:%p", prepare_retvals, u);

    nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);
    if (nret == NGX_AGAIN) {
        return NGX_DONE;
    }

    c = r->connection;
    vm = ngx_http_lua_get_lua_vm(r, ctx);

    rc = ngx_http_lua_run_thread(vm, r, ctx, nret);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua run thread returned %d", rc);

    if (rc == NGX_AGAIN) {
        return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
    }

    if (rc == NGX_DONE) {
        ngx_http_lua_finalize_request(r, NGX_DONE);
        return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
    }

    if (ctx->entered_content_phase) {
        ngx_http_lua_finalize_request(r, rc);
        return NGX_DONE;
    }

    return rc;
}

仍是一样的套路,ngx_http_lua_get_lua_vm, 而后经过ngx_http_lua_run_thread运行协程,根据返回值作相应的处理。

相关文章
相关标签/搜索