Openresty 优秀lua源码库分析(一)nginx
源码:https://github.com/cloudflare/lua-resty-logger-socket/git
用法:github
local logger = require "resty.logger.socket" if not logger.initted() then local ok, err = logger.init{ host = 'xxx', port = 1234, flush_limit = 1234, drop_limit = 5678, } if not ok then ngx.log(ngx.ERR, "failed to initialize the logger: ",err) return end end local bytes, err = logger.log(msg) if err then ngx.log(ngx.ERR, "failed to log message: ", err) return end
咱们能够看到这里有两个比较类似的函数的用法数组
initted(),init()//做为一个库,也间接担任了一个储存变量的部分
function _M.initted() return logger_initted end
function _M.init(user_config) if (type(user_config) ~= "table") then return nil, "user_config must be a table" end 遍历table的key/value值,并进行赋值,注意赋值以前进行类型的判断,严谨(初始化一次面临的问题) for k, v in pairs(user_config) do if k == "host" then if type(v) ~= "string" then return nil, '"host" must be a string' end host = v .......... end end //对必要参数进行校验 if not (host and port) and not path then return nil, "no logging server configured. \"host\"/\"port\" or " .. "\"path\" is required." end flushing = false exiting = false //将链接分为两个阶段,链接中(ing,为了是提供重试次数的一个状态),链接完毕(ed)根据true/false肯定是否链接成功 connecting = false connected = false retry_connect = 0 retry_send = 0 logger_initted = true //检查是不是按时间发送缓存,若是是就将need_periodic_flush设置为true,而后使用timer_at函数解决定时回调函数 if periodic_flush then if debug then ngx_log(DEBUG, "periodic flush enabled for every " .. periodic_flush .. " seconds") end need_periodic_flush = true timer_at(periodic_flush, _periodic_flush)//定时刷盘,但应该是一次的 end return logger_initted//返回初始化后的状态 end
function _M.log(msg) //判断状态 if not logger_initted then return nil, "not initialized" end local bytes //将传入字符串进行判断和转换 if type(msg) ~= "string" then msg = tostring(msg) end if (debug) then ngx.update_time() ngx_log(DEBUG, ngx.now(), ":log message length: " .. #msg) end --判断长度 local msg_len = #msg //没有搞清楚这个注释的真正意义 -- response of "_flush_buffer" is not checked, because it writes -- error buffer --对work是否正在退出事件进行捕捉,防止丢失部分日志,这个部分是这个库的健全性,考虑了work自己的生命周期 if (is_exiting()) then exiting = true _write_buffer(msg) _flush_buffer() if (debug) then ngx_log(DEBUG, "Nginx worker is exiting") end bytes = 0 --判断是否达到了缓存的最大值 elseif (msg_len + buffer_size < flush_limit) then _write_buffer(msg) bytes = msg_len --判断是否到了丢弃的最大值 elseif (msg_len + buffer_size <= drop_limit) then _write_buffer(msg) _flush_buffer() bytes = msg_len else --增加速度过大,必须丢弃,否则会压垮缓存 _flush_buffer() if (debug) then ngx_log(DEBUG, "logger buffer is full, this log message will be " .. "dropped") end bytes = 0 --- this log message doesn't fit in buffer, drop it end if last_error then local err = last_error last_error = nil return bytes, err end return bytes end
总结:对bytes为0会有两种可能,一种是work正在退出,第二种,超过了丢弃的限制不加入缓存,并丢弃。缓存
local function _write_buffer(msg) log_buffer_index = log_buffer_index + 1 log_buffer_data[log_buffer_index] = msg buffer_size = buffer_size + #msg return buffer_size end
-- internal variables local buffer_size = 0 -- 2nd level buffer, it stores logs ready to be sent out local send_buffer = "" -- 1st level buffer, it stores incoming logs local log_buffer_data = new_tab(20000, 0) -- table.new(narr, nrec) local succ, new_tab = pcall(require, "table.new") if not succ then new_tab = function () return {} end end
local function _flush_buffer() local ok, err = timer_at(0, _flush) //使用这个的缘由还不清楚,将定时刷盘设置为false need_periodic_flush = false if not ok then _write_error(err) return nil, err end end
使用timer_at()回调_flush()函数,时间为0就不等待,之因此使用这个是为了突破nginx.socket的做用范围,简直就是一个神器。socket
local function _flush() local err --对没有上锁的状况进行上锁,并往下走 -- pre check if not _flush_lock() then if debug then ngx_log(DEBUG, "previous flush not finished") end -- do this later return true end --进入事后再判断缓存数据是否为空,毕竟哪一个线程拿到锁也说不清楚 if not _need_flush() then if debug then ngx_log(DEBUG, "no need to flush:", log_buffer_index) end _flush_unlock() return true end -- start flushing retry_send = 0--记录重试次数 if debug then ngx_log(DEBUG, "start flushing") end local bytes while retry_send <= max_retry_times do if log_buffer_index > 0 then _prepare_stream_buffer() end bytes, err = _do_flush()--发送 if bytes then--使用返回字节作判断 break end if debug then ngx_log(DEBUG, "resend log messages to the log server: ", err) end -- ngx.sleep time is in seconds if not exiting then ngx_sleep(retry_interval / 1000) end retry_send = retry_send + 1 end _flush_unlock()--除去锁住的状况 if not bytes then local err_msg = "try to send log messages to the log server " .. "failed after " .. max_retry_times .. " retries: " .. err _write_error(err_msg) return nil, err_msg else if debug then ngx_log(DEBUG, "send " .. bytes .. " bytes") end end buffer_size = buffer_size - #send_buffer--减去剩下的buffer大小 send_buffer = ""--对发送值制空 return bytes end
_prepare_stream_buffer处理发送数据函数(参数max_buffer_reuse为0能够确保没有脏数据)函数
--从新使用日志buffer的次数限制,不清楚为何会有一个复用限制,counter就是纯粹的计数,到达次数就开辟一个新的空间,用回收的方式应该会有一部分的脏数据我以为,发送间歇请求差距过大的状况下,但日志脏数据影响不啊 local function _prepare_stream_buffer() local packet = concat(log_buffer_data, "", 1, log_buffer_index) send_buffer = send_buffer .. packet--最新的buffer和之前加起来,怎么处理log_buffer_data比较好奇 log_buffer_index = 0 counter = counter + 1 if counter > max_buffer_reuse then log_buffer_data = new_tab(20000, 0) counter = 0 if debug then ngx_log(DEBUG, "log buffer reuse limit (" .. max_buffer_reuse .. ") reached, create a new \"log_buffer_data\"") end end end