公司原有的业务消息推送是靠前端 ajax 轮眉请求后端接口完成的。而后我新来的,让我改为 websocket 双向通讯的来作消息推送。php
业务场景 => PC 端浏览器打开后台系统后,若是有业务订单,而后时时推送到 PC 端上,PC 进行时时的语音播报。
安装框架前端
composer create-project hyperf/hyperf-skeleton
【注意要符合框架的使用环境,在安装】安装 websocket 服务端的 composer 安装包node
1. `composer require hyperf/websocket-server`
安装 websocket 的客户端,安装的缘由是 经过 http 请求后,websocket 客户端直接向 websocket 服务端创建链接,而后推送消息。react
1. `composer require hyperf/websocket-client`
修改 config 文件中的 server.php 配置文件,有时候,配置文件不存在,须要本身手动建立。git
1. server.php 配置文件代码:
<?php declare(strict\_types=1); /\*\* \* This file is part of Hyperf. \* \* @link https://www.hyperf.io \* @document https://doc.hyperf.io \* @contact group@hyperf.io \* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE \*/ use Hyperf\\Server\\Server; use Hyperf\\Server\\SwooleEvent; return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 8098, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [Hyperf\\HttpServer\\Server::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER\_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 8099, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [Hyperf\\WebSocketServer\\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [Hyperf\\WebSocketServer\\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE => [Hyperf\\WebSocketServer\\Server::class, 'onClose'], ], ], ], 'settings' => [ 'enable_coroutine' => true, 'worker_num' => swoole_cpu_num(), 'pid_file' => BASE_PATH . '/runtime/hyperf.pid', 'open_tcp_nodelay' => true, 'max_coroutine' => 100000, 'open_http2\_protocol' => true, 'max_request' => 100000, 'socket_buffer_size' => 2 * 1024 * 1024, ], 'callbacks' => [ SwooleEvent::ON_BEFORE_START => [Hyperf\\Framework\\Bootstrap\\ServerStartCallback::class, 'beforeStart'], SwooleEvent::ON_WORKER_START => [Hyperf\\Framework\\Bootstrap\\WorkerStartCallback::class, 'onWorkerStart'], SwooleEvent::ON_PIPE_MESSAGE => [Hyperf\\Framework\\Bootstrap\\PipeMessageCallback::class, 'onPipeMessage'], ], ];
换成本身的路由对应的控制器和方法
<?php declare(strict\_types=1); namespace App\\Controller\\WebSocket; use App\\Service\\SendWebSocketQueueService; use Hyperf\\Contract\\OnCloseInterface; use Hyperf\\Contract\\OnMessageInterface; use Hyperf\\Contract\\OnOpenInterface; use Hyperf\\Validation\\ValidationException; use Swoole\\Exception; use Swoole\\Http\\Request; use Swoole\\Server; use Swoole\\Websocket\\Frame; use Swoole\\WebSocket\\Server as WebSocketServer; use App\\Exception\\WebSocketException; use Hyperf\\Utils\\ApplicationContext; use Hyperf\\Logger\\LoggerFactory; use Hyperf\\Di\\Annotation\\Inject; class VoiceBroadcastWebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface { protected $redis; /** * @Inject * @var SendWebSocketQueueService */ protected $service; /** * @var \\Psr\\Log\\LoggerInterface */ protected $logger; // 初始化 public function __construct(LoggerFactory $loggerFactory) { $container = ApplicationContext::getContainer(); $this->redis = $container->get(\Redis::class); $this->logger = $loggerFactory->get('log','default'); } // onmessage 方法接收 客户端或者服务端消息 public function onMessage(WebSocketServer $server, Frame $frame): void { $recvData = json_decode($frame->data); if(!is_object($recvData)) { $this->checkData($frame->data,$frame->fd); } else { $this->sendData($server,$frame->data,$frame->fd); } } /** * 校验数据 * @param $string * @param $fd * @return string */ function checkData ($string, $fd) { if (!is_string($string)) { $this->logger->error('字符串类型错误'); return '字符串类型错误'; } $strArray = explode('_',$string); $shopIds = json_decode($string[1],true); if(!is_array($shopIds) || empty($shopIds)) { $this->logger->error('参数错误'); return '参数有误'; } echo "所有映射成功" } /** * 发送消息到 PC 端 */ public function sendData($server,$sendData,$fd) { $recvData = json_encode($sendData,true); $uid = $recvData['uid']; $data = $recvData['data']; $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$uid); echo 'voiceSet_'.$uid; $data = [ 'result' => true, 'code'=>0, 'msg'=>'操做成功', 'data'=>$data, ]; if(count($fdsArr)) { foreach ($fdsArr as $key=>$value) { try { $server->push(intval($value),json_encode($data)); echo "线程:$fd 向线程 $value 发送信息\\n"; } catch (\\Throwable $e) { // 增长 重试次数 $this->service->push($recvData,1); // 把数据删除 $this->redis->sRem('jiayouwa:websocket:voiceSet_'.$uid,$value); continue; } }} } public function onClose(Server $server, int $fd, int $reactorId): void { echo "$fd\-closed\\n"; } public function onOpen(WebSocketServer $server, Request $request): void { echo "线程:$request\->fd\-打开\\n"; } }
/** * 向指定浏览器发送数据 * * @param VoiceBroadcastRequest $request * @param ResponseInterface $response * @return \\Psr\\Http\\Message\\ResponseInterface */ public function voiceBroadcast(VoiceBroadcastRequest $request, ResponseInterface $response) { $serviceType = $request->input('service_type',0); $name = $request->input('name',''); $shopNameId = $request->input('shop_name_id',0); // 判断是在redis 中存在 $fdsArr = $this->redis->sMembers('jiayouwa:websocket:voiceSet_'.$shopNameId); if(!count($fdsArr)) { return $this->returnSuccess([],true,0,'shop_name_id 不存在'); } $data = [ 'uid'=>$shopNameId, 'service'=>'voiceBroadcast', 'data'=> array( array( 'service_type'=>$serviceType, 'name'=>$name, ), ), ]; // 发送数据到 webSocket $this->connectWebSocket($data); return $this->returnSuccess(); } /** * 连接websocket 并发送数据 * @param $data * @return array */ public function connectWebSocket($data) { $client = $this->clientFactory->create($this->webSocketIp); $client->push(json_encode($data)); }
<script> //这里的ip地址改成本身服务器的ip地址 var ws = new WebSocket('ws://192.168.0.0:9502/voiceBroadcast'); ws.onopen = function(){ var uid = 'jiayouwa_[1,2,3]'; ws.send(uid); }; ws.onmessage = function(e){ var message_info =e.data console.log(message_info); }; </script>
用 hyperf 框架实现太简单了。只需下载 websocket 的客户端 和 服务端 composer 包。
若是要实现点对点的 消息推送,只须要将你的 uid 和 fd 进程 redis 的 key=> value 的映射便可
若是要实现 一个推送到 多个 pc 端,将 uid => [1,2,3,4,5] 用 redis 的集合类型就好
代码仅供参考,有什么问题评论就好
写文章今年才开始,写的很差,请多多包涵
hyperf 框架地址: https://doc.hyperf.io/#/