这是关于 Swoole 学习的第七篇文章:Swoole RPC 的实现。html
有位读者说 “上篇文章,下载代码后直接运行成功,代码简洁明了,简直是 Swoole 入门最好的 Demo ”。react
“哈哈哈...”json
还有读者说 “有一块儿学习的组织群吗,能够在里面进行疑难答疑?”微信
这个还真没有,总以为维护一个微信群不容易,由于本身自己就不爱在群里说话,另外,本身也在不少微信群中,开始氛围挺好的,你们都聊聊技术,后来技术聊的少了改为聊八卦啦,再后来慢慢就安静了,还有在群里起冲突的...负载均衡
固然我也知道维护一个微信群的好处是很是大的,若是有这方面经验的同窗,我们一块儿交流交流 ~框架
还有出版社找我写书的.curl
他们也真是放心,我本身肚子里几滴墨水仍是知道的,目前确定是不行,之后嘛,再说。异步
还有一些大佬加了微信,多是出于对晚辈的提携吧,偷偷告诉你,从大佬的朋友圈能学到不少东西。tcp
我真诚的建议,作技术的应该本身多总结总结,将本身会的东西写出来分享给你们,先不说给别人带来太多的价值,反正对本身的帮助是很是很是大的,这方面想交流的同窗,能够加我,我能够给你无私分享。
可能都会说时间少,时间只要挤,总会有的,每一个人都 24 小时,时间对每一个人是最公平的。说到这推荐你们读一下《暗时间》这本书,这是我这本书的 读书笔记,你们能够瞅瞅。
开始今天的文章吧,这篇文章实现了一个简单的 RPC 远程调用,在实现以前须要先了解什么是 RPC,不清楚的能够看下以前发的这篇文章 《我眼中的 RPC》。
下面的演示代码主要使用了 Swoole 的 Task 任务池,经过 OnRequest/OnReceive 得到信息交给 Task 去处理。
举个工做中的例子吧,在电商系统中的两个模块,我的中心模块和订单管理模块,这两个模块是独立部署的,可能不在一个机房,可能不是一个域名,如今我的中心须要经过 用户ID 和 订单类型 获取订单数据。
//代码片断 <?php $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $ch = curl_init(); $options = [ CURLOPT_URL => 'http://10.211.55.4:9509/', CURLOPT_POST => 1, CURLOPT_POSTFIELDS => json_encode($demo), ]; curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch);
//代码片断 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $this->client->send(json_encode($demo));
SW 单个请求,等待结果
发出请求后,分配给 Task ,并等待 Task 执行完成后,再返回。
SN 单个请求,不等待结果
发出请求后,分配给 Task 以后,就直接返回。
$demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ];
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnRequest { private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $request, $response) { try { $data = decrypt($request->rawContent()); self::$query = $data; if (empty($data)) { self::$code = '-1'; self::$msg = '非法请求'; self::end($request, $response); } //TODO 验证Token switch ($data['type']) { case 'SW': //单个请求,等待结果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) { self::$code = '1'; self::$msg = '成功'; self::$data = $rs_data['response']; self::end($request, $response); }); if ($rs === false) { self::$code = '-1'; self::$msg = '失败'; self::end($request, $response); } break; case 'SN': //单个请求,不等待结果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失败'; } else { self::$code = '1'; self::$msg = '成功'; } self::end($request, $response); break; default: self::$code = '-1'; self::$msg = '非法请求'; self::end($request, $response); } } catch(Exception $e) { } } private static function end($request = null, $response = null) { $rs['request_method'] = $request->server['request_method']; $rs['request_time'] = $request->server['request_time']; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $response->end(json_encode($rs)); self::$data = []; return; } }
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnReceive { private static $request_time; private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $fd, $reactor_id, $data) { try { self::$request_time = time(); $data = decrypt($data); self::$query = $data; //TODO 验证Token switch ($data['type']) { case 'SW': //单个请求,等待结果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失败'; self::handlerTask($serv, $fd); } break; case 'SN': //单个请求,不等待结果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失败'; } else { self::$code = '1'; self::$msg = '成功'; } self::handlerTask($serv, $fd); break; default: self::$code = '-1'; self::$msg = '非法请求'; self::handlerTask($serv, $fd); } } catch(Exception $e) { } } private static function handlerTask($serv, $fd) { $rs['request_method'] = 'TCP'; $rs['request_time'] = self::$request_time; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $serv->send($fd, json_encode($rs)); } }
Demo 代码仅供参考,里面有不少不严谨的地方!
服务的调用方与提供方中间须要有一个服务注册中心,很显然上面的代码中没有,须要本身去实现。
服务注册中心,负责管理 IP、Port 信息,提供给调用方使用,还要能负载均衡和故障切换。
根据本身的状况,服务注册中心实现可容易可复杂,用 Redis 也行,用 Zookeeper、Consul 也行。
感兴趣的也能够了解下网关 Kong ,包括 身份认证、权限认证、流量控制、监控预警...
再推荐一个 Swoole RPC 框架 Hprose,支持多语言。
就到这了,上面的 Demo 须要源码的,加我微信。(菜单-> 加我微信-> 扫我)
本文欢迎转发,转发请注明做者和出处,谢谢!