github: http://github.com/brewlin/im-...
im-cloud分布式中间件分析(四)-logic节点实现
logic 节点 做为生产者和client端,做为业务节点,提供push推送resetapi接口,能够扩容多个节点作nginx负载均衡
默认启动10个消息队列链接池,在task进程为每一个人物建立协程异步生产任务
直接调用组件task接口进行投递到task进程执行,task投递为非阻塞操做,执行完毕会直接返回,大大的提高了worker处理并发请求的能力,惟一的影响是,若是多个task进程的消费能力更不上worker的投递速度也会影响worker的处理能力,因此须要作取舍nginx
use Task\Task; /** * @var LogicPush */ Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
App\Task
下单个请求流程执行的生命周期会调用生成多个对象,多达10多个。并发大的状况下GC 几乎会首先挂掉,并且会耗时等待,因此new对象也有优化的空间git
项目在初始化也就是主进程启动期间就扫描相关代码,有注解的就进行收集,而后实例化到容器container中,之后再屡次使用的时候直接复用代码,而无需屡次new对象,大大节省空间和时间,以下图为建立一个协程去执行任务,相关的对象都从容器中获取github
Co::create(function ()use($op,$mids,$msg){ /** @var RedisDao $servers */ $servers = \container()->get(RedisDao::class)->getKeysByMids($mids); $keys = []; foreach($servers as $key => $server){ $keys[$server][] = $key; } foreach($keys as $server => $key){ //丢到队列里去操作,让job去处理 \container()->get(QueueDao::class)->pushMsg($op,$server,$key,$msg); } },true); //第二个参数为true 表示使用Context::waitGroup() 等待任务执行完成
如上图所示能够调用组件提供的多个方法获取容器对象json
container()->get(class)
bean(class)
即便将主要的耗时任务放到task进程中执行,worker进程中依然会有少许的等待时间,如今采起的方式,是请求到来时获取数据后,直接回复结束当前链接,而后在继续执行任务,这样就不用等到投递task任务后再结束当前链接,大大提升并发能力,虽然可能耗时性能没有发生太大的改变,可是并发能力大大的提高。以下所示:segmentfault
/** * @return \Core\Http\Response\Response|static */ public function mids() { Context::get()->getResponse()->end(); $post = Context::get()->getRequest()->input(); if(empty($post["operation"]) || empty($post["mids"]) ||empty($post["msg"])){ return $this->error("缺乏参数"); } $arg = [ "op" => $post["operation"], "mids" => is_array($post["mids"])?$post["mids"]:[$post["mids"]], "msg" => $post["msg"] ]; Log::debug("push mids post data:".json_encode($arg)); /** * @var LogicPush */ Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]); }
Context::get()->getResponse()->end();
经过协程上下文获取reponse对象直接结束当前链接,而后在继续执行当前任务,并释放内存