经过lua-nginx-module的ngx.socket能够方便的创建与其余服务器的链接和数据传输,这些也是lua-resty-redis,lua-resty-mysql等众多请求第三方服务的模块的基础。这里只介绍ngx.socket.tcp,udp的实现相似。mysql
在Lua中经过下面的方式使用ngx.socket APInginx
local sock = ngx.socket.tcp() local ok, err = sock:connect("google.com", 80) if not ok then return end local ok, err = sock:send("GET / HTTP/1.0\r\nHost:google.com\r\n\r\n") if not ok then return end local data, err = sock:receive() if not data then return end
sock:connect,sock:send,sock:receive这三个均可能不能当即完成,都是经过与ngx.sleep相似的方式,先调用lua_yield将协程挂起,等到事件就绪(或链接创建,或收到数据包等)再调用lua_resume继续运行协程。这样底层是异步结构,在Lua协程中看一个Lua脚本确实连续的在执行。redis
不一样的是ngx.sleep没有任何返回值,socket操做确是须要的,创建链接须要知道成功了仍是失败了,发送接受数据须要知道是否成功,失败时也须要知道缘由。这些方法的返回值是经过lua_resume实现的。sql
以connect为例下面这行代码能够分红两部分,api
local ok, err = sock:connect("GET")
第一步调用sock:connect函数的时候发生了协程的挂起(lua_yield),协程继续运行时是从第二步开始的。那么给ok,err这两个局部变量赋于什么值呢?值从哪里来呢?很明显只能经过lua_resume来操做。服务器
下面是lua_resume的声明,第二个参数是返回值的个数。异步
int lua_resume (lua_State *L, int narg);
在执行lua_resume前将返回值压到栈上,调用lua_resume的第二个参数控制返回值的个数。socket
connect操做成功时,经过下面的操做tcp
lua_pushboolean(L, 1); lua_resume(L, 1);
将ok赋值为true,err赋值为nil,代表操做成功。函数
失败时,用下面的方法,将ok赋值为nil,err赋值为错误信息,这里使用"connect refused"只是为了示例。
lua_pushnil(L); lua_pushliteral(L, "connect refuled"); lua_resume(L, 2);
lua-nginx-module经过ngx_http_lua_inject_socket_tcp_api将tcpx相关的API注册到ngx中。这里其实只是注册了ngx.socket.tcp(ngx.socket.stream)和ngx.socket.connect。ngx.socket.connect只是一个语法糖,即便去掉也没有什么影响。
void ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) { ngx_int_t rc; lua_createtable(L, 0, 4 /* nrec */); /* ngx.socket */ lua_pushcfunction(L, ngx_http_lua_socket_tcp); lua_pushvalue(L, -1); lua_setfield(L, -3, "tcp"); lua_setfield(L, -2, "stream"); { const char buf[] = "local sock = ngx.socket.tcp()" " local ok, err = sock:connect(...)" " if ok then return sock else return nil, err end"; rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "=ngx.socket.connect"); } if (rc != NGX_OK) { ngx_log_error(NGX_LOG_CRIT, log, 0, "failed to load Lua code for ngx.socket.connect(): %i", rc); } else { lua_setfield(L, -2, "connect"); } lua_setfield(L, -2, "socket");
为何这里没有connect, send, receive等方法呢?这个涉及到ngx.socket.tcp方法的实现。
调用ngx.socket.tcp时,实际执行的是ngx_http_lua_socket_tcp这个函数。这里只是建立了一个Lua中的table并将其返回,并无真正建立一个socket。
注意这里的lua_setmetatable(L, -2)。 在建立table的同时给这个table设置了一个元表,将socket须要执行的connect,send,receive等方法放在了元表中。
static int ngx_http_lua_socket_tcp(lua_State *L) { ngx_http_request_t *r; ngx_http_lua_ctx_t *ctx; if (lua_gettop(L) != 0) { return luaL_error(L, "expecting zero arguments, but got %d", lua_gettop(L)); } r = ngx_http_lua_get_req(L); if (r == NULL) { return luaL_error(L, "no request found"); } ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return luaL_error(L, "no ctx found"); } ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE | NGX_HTTP_LUA_CONTEXT_ACCESS | NGX_HTTP_LUA_CONTEXT_CONTENT | NGX_HTTP_LUA_CONTEXT_TIMER | NGX_HTTP_LUA_CONTEXT_SSL_CERT | NGX_HTTP_LUA_CONTEXT_SSL_SESS_FETCH); lua_createtable(L, 3 /* narr */, 1 /* nrec */); lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key); lua_rawget(L, LUA_REGISTRYINDEX); lua_setmetatable(L, -2); dd("top: %d", lua_gettop(L)); return 1; }
相似这样的使用, sock只是一个空的table,调用connect方法时利用了Lua中元表的特性来实现。
local sock = ngx.socket.tcp() local ok, err = sock:connect("1.1.1.1", 80)
其他connect, send, receive等的实现与以前介绍的sleep等相似。这里用的是非阻塞的socket,若是操做没有当即结束,会将当前的协程挂起,等到操做完成后再恢复协程运行。
socket的connect, send,receive等操做都可能致使协程的挂起。以发送数据为例,函数ngx_http_lua_socket_tcp_send中,若是当前socket缓冲区已满,此时rc == NGX_AGAIN, 先设置r->write_event_handler回调函数,最后经过lua_yield挂起协程。
/* rc == NGX_AGAIN */ coctx = ctx->cur_co_ctx; ngx_http_lua_cleanup_pending_operation(coctx); coctx->cleanup = ngx_http_lua_coctx_cleanup; coctx->data = u; if (u->raw_downstream) { ctx->writing_raw_req_socket = 1; } if (ctx->entered_content_phase) { r->write_event_handler = ngx_http_lua_content_wev_handler; } else { r->write_event_handler = ngx_http_core_run_phases; } u->write_co_ctx = coctx; u->write_waiting = 1; u->write_prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler; dd("setting data to %p", u); return lua_yield(L, 0);
等到socket的缓冲区空闲,此时socket可写,会调用r->write_event_handler继续运行,最终经过ngx_http_lua_socket_tcp_resume_helper继续协程的运行。
static ngx_int_t ngx_http_lua_socket_tcp_resume_helper(ngx_http_request_t *r, int socket_op) { int nret; lua_State *vm; ngx_int_t rc; ngx_connection_t *c; ngx_http_lua_ctx_t *ctx; ngx_http_lua_co_ctx_t *coctx; ngx_http_lua_socket_tcp_retval_handler prepare_retvals; ngx_http_lua_socket_tcp_upstream_t *u; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return NGX_ERROR; } ctx->resume_handler = ngx_http_lua_wev_handler; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp operation done, resuming lua thread"); coctx = ctx->cur_co_ctx; dd("coctx: %p", coctx); u = coctx->data; switch (socket_op) { case SOCKET_OP_CONNECT: case SOCKET_OP_WRITE: prepare_retvals = u->write_prepare_retvals; break; case SOCKET_OP_READ: prepare_retvals = u->read_prepare_retvals; break; default: /* impossible to reach here */ return NGX_ERROR; } ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket calling prepare retvals handler %p, " "u:%p", prepare_retvals, u); nret = prepare_retvals(r, u, ctx->cur_co_ctx->co); if (nret == NGX_AGAIN) { return NGX_DONE; } c = r->connection; vm = ngx_http_lua_get_lua_vm(r, ctx); rc = ngx_http_lua_run_thread(vm, r, ctx, nret); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua run thread returned %d", rc); if (rc == NGX_AGAIN) { return ngx_http_lua_run_posted_threads(c, vm, r, ctx); } if (rc == NGX_DONE) { ngx_http_lua_finalize_request(r, NGX_DONE); return ngx_http_lua_run_posted_threads(c, vm, r, ctx); } if (ctx->entered_content_phase) { ngx_http_lua_finalize_request(r, rc); return NGX_DONE; } return rc; }
仍是一样的套路,ngx_http_lua_get_lua_vm, 而后经过ngx_http_lua_run_thread运行协程,根据返回值作相应的处理。