chuck-lua使用的是单线程模型,依赖于底层高效率的事件回调框架.从前文介绍过的使用示例中能够看出,基本接口与node.js相似,大量依赖方法回调.node
对于lua这种支持coroutine的语言,使用coroutine来将异步回调转换成同步接口是很方便的.git
chuck-lua提供了两种使用coroutine解决异步问题的方式:github
首先来看一个例子,看下如何将异步的redis事件转换成同步接口:redis
redis.lua服务器
local chuck = require("chuck") local engine = require("distri.engine") local Sche = require("distri.uthread.sche") local LinkQue = require("distri.linkque") local map = {} local client = {} function client:new(c) local o = {} o.__index = client o.__gc = function() print("redis_client gc") end setmetatable(o,o) o.c = c o.pending = LinkQue:New() return o end function client:Close(err) local co while true do co = self.pending:Pop() if co then co = co[1] Sche.WakeUp(co,false,err) else map[self.c] = nil self.c:Close() self.c = nil return end end end function client:Do(cmd) local co = Sche.Running() if not co then return "should call in a coroutine context " end if not self.c then return "client close" end if "ok" == self.c:Execute(cmd,function (_,reply) Sche.WakeUp(co,true,reply) end) then local node = {co} self.pending:Push(node) --[[ 若是succ == true,则reply是操做结果, 不然,reply是传递给Close的err值 ]]-- local succ,reply = Sche.Wait() self.pending:Remove(node) if succ then return nil,reply else return reply end end return "error" end local redis = {} function redis.Connect(ip,port,on_error) local err,c = chuck.redis.Connect(engine,ip,port,function (_,err) local c = map[_] if c then on_error(c,err) end end) if c then return err,client:new(c) else return err end end return redis
redis.lua封装了异步事件接口,向用户提供了同步的调用方式,惟一的使用约束是redis:Do必须在coroutine上下文中才能被使用.网络
咱们首先看下如何在第一种方式下使用这个接口:并发
local Distri = require("distri.distri") local Redis = require("distri.redis") local err,client = Redis.Connect("127.0.0.1",6379) if client then local function co_fun(i) local cmd = string.format("hmget chaid:%d chainfo skills",i) local err,reply = client:Do(cmd) if reply then for k,v in pairs(reply) do print(k,v) end end end for i = 1,1000 do Sche.Spawn(co_fun,i) end Distri.Run() end
在这种模式下,每一个redis任务都由一个单独的coroutine直接执行.框架
接下来再看下如何利用pool和任务队列完成一样的效果:异步
local Distri = require("distri.distri") local Redis = require("distri.redis") local Task = require("distri.uthread.task") local err,client = Redis.Connect("127.0.0.1",6379) if client then for i = 1,1000 do Task.New(function () local cmd = string.format("hmget chaid:%d chainfo skills",i) local err,reply = client:Do(cmd) if reply then for k,v in pairs(reply) do print(k,v) end end end) end Distri.Run() end
对于每一个任务,使用Task.New建立一个任务,任务被建立以后会被添加到任务队列尾部,预先建立的pool中的coroutine将会被唤醒,从队列中提取任务并执行.socket
最后,再来看一下对于网络消息,如何利用Task处理任务.
local Task = require("distri.uthread.task") local Distri = require("distri.distri") local Redis = require("distri.redis") local Socket = require("distri.socket") local Packet = require("chuck").packet local clone = Packet.clone local err,client = Redis.Connect("127.0.0.1",6379) if client then local server = Socket.stream.listen("127.0.0.1",8010,function (s,errno) if s then if s:Ok(4096,Socket.stream.rawdecoder,Task.Wrap(function (_,msg,errno) if msg then local cmd = string.format("hmget chaid:%d chainfo skills",1) local err,reply = client:Do(cmd) local result = "" if reply then for k,v in pairs(reply) do result = result .. v .. "\n" end else result = "error\n" end s:Send(Packet.rawpacket(result)) else s:Close() s = nil end end),client) then s:SetRecvTimeout(5000) end end end) if server then Distri.Run() end end
这是一个简单的echo服务,当用户链接上服务器发送消息,服务器收到消息以后提交一个redis请求,并将结果返回给客户.
这里的关键点是使用Task.Wrap封装了事件回调函数.也就是说,对网络消息的处理也是在coroutine上下中执行的.所以在事件回调中的阻塞并不会致使没法响应其它到来的并发请求.