读RILL SERVERnode
由于源码是前段时间下载的,最近才拿出来分析,今天发现已经更新了,好比删除了module中订阅那些代码。可是并不影响整体的思路。
他加入了behavior3 、 pl 、FSM,DDZ等等有空在分析。
-----session
有几个维度能够分析。app
启动,从service中的main启动。而后逐个启动service,
service启动的时候,加载了faci下的全部文件,这个是一个服务的各项基本功能,
包括,如何从skynet接受消息,并如何分发到几个不一样实体上,dispatch forward 或者event 和 watch等。less
从一次登陆消息来讲,首先来到gateway,而后从faci中的dispatch,根据cmd,转发到forward的具体函数。处理完再发回。函数
1 首先进入service下的main,好比开启了 login
传入了2个参数ui
local p = skynet.newservice("login", "login", i)
2 继续将参数带入到下个faci.servicelua
local name, id = ... local s = require "faci.service" s.init(name, id)
3 这个是一个公共模块。spa
而后进入faci.service,
把参数保存下来debug
function service.init(name, id) env.name = name or "nameless server" env.id = tonumber(id) or 0 end
11 而后咱们看看,这个文件加载的东西,包括dispatch event module 等等。
这些模块都是每一个服务的一部分(基础部分)。一个一个看。code
首先是dispatch,终于,咱们看到了他跟skynet接洽的部分,也就是,将"client"消息让
函数:client_dispatch 处理。这个函数是系统跟 skynet的入口处。
还有一个就是,"lua"消息,由lua_dispatch处理。
skynet.dispatch("lua", lua_dispatch) skynet.register_protocol{ name = "client", id = skynet.PTYPE_CLIENT, unpack = skynet.unpack, dispatch = client_dispatch, }
12 delicious,美味啊,来看看做者是怎么写客户端传入的消息的。
hmm。。彷佛多了一个check?先无论他。
get_queue_id(cmd) 获得 CMD的对应编号;
为每一个fd,创建了一个等待队列
该队列下,对应的命令也初始化下。env.waiting_queue[fd][queue_id] = {}
function client_dispatch(session, source, fd, cmd, check, msg) local queue_id = get_queue_id(cmd) -- get_queue_id(cmd) 获得 CMD的对应编号; if not queue_id then -- 若是指令不须要排队则直接执行 client_dispatch_help(cmd, check, msg, fd, source) return end if not env.waiting_queue[fd] then env.waiting_queue[fd] = {} --为每一个fd,创建了一个等待队列 end if not env.waiting_queue[fd][queue_id] then env.waiting_queue[fd][queue_id] = {} --该队列下,对应的命令也初始化下 end local queues = env.waiting_queue[fd][queue_id] if #queues > 0 then -- 若是有未完成的任务则插入后直接返回。 table.insert(queues, {cmd, check, msg, fd, source}) return end -- 若是没有则直接进入后执行 table.insert(queues, {cmd, check, msg, fd, source}) for i = 1, 100 do local queue = table.remove(queues) if not queue then return end client_dispatch_help(table.unpack(queue)) end if #queues > 0 then -- 若是执行到这,也就是队列超过100个则抛弃该FD的该指令队列 log.error("%s queue is full, queue_id: %d", fd, queue_id) end env.waiting_queue[fd][queue_id] = nil end
13 继续看指令的拆分和执行,拆分CMD中的
local function client_dispatch_help(cmd, check, msg, fd, source) msg._cmd = cmd msg._check = check --TODO check校验 local cmdlist = string.split(cmd, ".") local isok, ret --派发到本服 isok, ret = local_dispatch(cmdlist[1], cmdlist[2], fd, msg, source) --派发到远端 if not isok then isok, ret = romote_dispatch(cmdlist[1], cmdlist[2], fd, msg, source) end if ret then skynet.send(source, "lua", "send", fd, ret) end end
14 local_dispatch中cmd1应该是模块名,好比login/game,cmd2,是具体的指令,好比是msgLogin这种。
注意,本地是调用forward的上的名为cmd2方法。
远程的话须要额外知道,player.romote[cmd1],也就是玩家的远程服务的地址。
function local_dispatch(cmd1, cmd2, fd, msg, source) local module = env.module[cmd1] if type(module) ~= "table" then log.info("local_dispatch module is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg)) return false end local forward = module.forward if type(forward) ~= "table" then log.info("local_dispatch forward is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg)) return false end local cb = forward[cmd2] if type(cb) ~= "function" then log.info("local_dispatch cb is not function, cmd = %s.%s, str = %s", cmd1, cmd2, tool.dump(msg)) return false end --开始分发 local v = get_v(fd) local isok, ret = xpcall(cb, traceback, v, msg, source) if not isok then log.error("local_dispatch handle msg error, cmd = %s, msg = %s, err=%s", cmd1, tool.dump(msg), ret) return true --报错的状况也表示分发到位 end return true, ret end
15 在看内部的消息转发 lua_dispatch,可是接受3种保留类型,
其余的消息都经过dispatch来作,跟上面经过forward相似,各自处理一类消息。
(相似CMD和 request)
watch 和 sys 消息类型不太明白
local function lua_dispatch(session, addr, cmd, ...) local cmdlist = string.split(cmd, ".") local cmd1 = cmdlist[1] local cmd2 = cmdlist[2] --forward分发 if cmd1 == "client_forward" and not cmd2 then local isok, msg = local_dispatch(...) skynet.retpack(isok, msg) return true elseif cmd1 == "watch" and not cmd2 then local isok, msg, acm = watch(...) skynet.retpack(isok, msg, acm) return true elseif cmd1 == "sys" then local isok, msg = sys_dispatch(cmd2, ...) skynet.retpack(isok, msg) return true end --模块 local module = env.module[cmd1] if type(module) ~= "table" then log.info("lua_dispatch module is not table, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end local dispatch = module.dispatch if type(dispatch) ~= "table" then log.info("lua_dispatch dispatch is not table, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end local cb = dispatch[cmd2] if type(cb) ~= "function" then log.info("lua_dispatch cb is not function, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end --分发 local function skyret(ok, ...) if not ok then skynet.ret() else skynet.retpack(...) end end local ret = {xpcall(cb, traceback, ...)} local isok = ret[1] if not isok then log.info("lua_dispatch cb call fail, cmd = %s.%s, err = %s", cmd1, cmd2, ret) skynet.ret() return false end skyret(table.unpack(ret)) end
16 一个 watch 的例子,供其余模块查看该模块内部的状况
function module.watch(acm) --统计在线人数 local logined = 0 --成功登录 for i, v in pairs(env.players) do logined = logined + 1 end local ret = {logined = logined} --总统计 acm.logined = acm.logined and acm.logined + logined or logined return ret, acm end
17 sys的命令经过dispatch执行,热更新reload和stop。
18 再看看事件机制。假设有2个节点A 和B
A要订阅B,就调用module中的subscribe_event,
提供如下细信息:要订阅的事件,要订阅的节点,目标服务
function M.subscribe_event(event, nodename, service, key) local local_service = skynet.self() if nodename == local_nodename then return skynet.call(service, "sys.subscribe_event", event, local_nodename, local_service, key) end return cluster.call(nodename, service, "sys.subscribe_event", event, local_nodename, local_service, key) end
此时,B收到这个订阅消息,这样处理
local sys = { } function sys.subscribe_event(event, nodename, service, key) faci._subscribe_event(event, nodename, service, key) return true end --上在DISPATH文件,下在MODULE中 function M._subscribe_event(event, nodename, service, key) if not env.events[event] then env.events[event] = {} end if not env.events[event][nodename] then env.events[event][nodename] = {} end if not env.events[event][nodename][service] then env.events[event][nodename][service] = {} end if not env.events[event][nodename][service][key] then env.events[event][nodename][service][key] = {} end env.events[event][nodename][service][key] = true end
事件的触发
local event_cache = {} function M.fire_event(name, ...) --获取列表 local cache = event_cache[name] if not cache then event_cache[name] = {} for i, v in pairs(env.module) do if type(v.event[name]) == "function" then table.insert(event_cache[name], v.event[name]) end end end cache = event_cache[name] --执行 for _, fun in ipairs(cache) do log.info("fire event %s", name) xpcall(fun, function(err) log.error(tostring(err)) log.error(debug.traceback()) end, ...) end --远程事件 local events = env.events[name] if not events then return end for nodename, nodes in pairs(events) do for service, keys in pairs(nodes) do if nodename == localname then skynet.send(service, name, keys, ...) else cluster.send(nodename, service, name, keys, ...) end end end end
4 servive中,而后进入start
skynet.start(function() init() if env.init then env.init() end end)
5 看看init,将传递的name命名到该服务,设置他的全局变量。
初始化模块的相关东西。
启动了事件,唤醒和开始?
local function init() --名字和编号 local name = env.name local id = env.id if not name then return end --命名 local idstr = env.id > 0 and tostring(env.id) or "" local name = string.format("%s%s", name, idstr) skynet.name(name, skynet.self()) --设置 log.set_name(name) --全局变量 _G["env"] = env _G["log"] = log --模块 module.init_modules() module.fire_event("awake") module.fire_event("start") log.debug("start ok "..name.."...") end
6 看看这个module.init_modules()
function M.init_modules() require_modules() end
7 继续跟踪,牛皮,加载了根目录+mod+文件名(login)下的全部lua,直接require
在项目中,是2个文件login_forward.lua 和 login_mode_test.lua
local function require_modules() local path = skynet.getenv("app_root").."mod/"..env.name lfstool.attrdir(path, function(file) local file = string.match(file, ".*mod/(.*)%.lua") if file then log.info(string.format("%s%d require file:%s", env.name, env.id, file)) require(file) end end) end
8 回去继续看 module.fire_event("awake")
从上下文理解,这个应该是启动call event ,awake 不是解雇。
这里没太明白,大体意思应该是,凡是注册过该节点的awake的都会调用,包括本地和远程。
local event_cache = {} function M.fire_event(name, ...) --获取列表 local cache = event_cache[name] if not cache then event_cache[name] = {} for i, v in pairs(env.module) do if type(v.event[name]) == "function" then table.insert(event_cache[name], v.event[name]) end end end cache = event_cache[name] --执行 for _, fun in ipairs(cache) do log.info("fire event %s", name) xpcall(fun, function(err) log.error(tostring(err)) log.error(debug.traceback()) end, ...) end --远程事件 local events = env.events[name] if not events then return end for nodename, nodes in pairs(events) do for service, keys in pairs(nodes) do if nodename == localname then skynet.send(service, name, keys, ...) else cluster.send(nodename, service, name, keys, ...) end end end end
9 再回去看加载的那些,7.
这2个东西应该比较重要,一个是模块,一个是全局变量。
先看看这个函数get_module
local module, static = faci.get_module("Login")
10 在文件module中,这里给出了每一个模块的4个基本动做
dispatch , forward , event, watch
dispatch是本文件处理的,这个是skynet必备的。其余三个多是做者本身加的?
local module = {} function M.get_module(name) --模块处理函数 env.module[name] = env.module[name] or { dispatch = {}, forward = {}, event = {}, watch = nil, } --模块全局变量 env.static[name] = env.static[name] or { } return env.module[name], env.static[name] end