基于wokerman实现即时消息推送

目前网站有两个用到实时消息推送的功能,源码:https://github.com/wuzhc/teamjavascript

  • 最新动态,实时显示用户的操做行为
  • 消息推送,如重要消息通知,任务指派等等

考虑的问题

  • 既然要实现即时,那就少不了socketio。由于项目是PHP写的,因此服务端直接用phpsocket.io
  • 咱们应该保存离线消息,不然若是用户不在线,那就接受不到消息。这里我用mongodb来存储消息。php

    • 一是消息不须要关联表,一条消息一个文档
    • 二是mongodb适合作海量数据存储,而且分片也很简单
    • 三是消息不须要永久存储,随着时间推移,消息的价值性越低,能够用固定集合来存储消息,当数据量达到必定值时覆盖最久的消息
  • 一个页面链接一个socket,若用户同时打开了多个页面(即一个用户会对应多个socketID),那么消息应该推送到用户每个打开的页面。一个简单的作法就是把用户多个sockID加到group分组(socket.emit('group')),group分组名称能够是"uid:1",表示userID为1的用户,而后socket.broadcast.to('uid:1').emit('event_name', data);

就能够实现对多个页面推送消息html

实时动态

图片描述

实时消息推送

图片描述

客户端

// 初始化io对象
        var socket = io('http://' + document.domain + ':2120');
        // 当socket链接后发送登陆请求
        socket.on('connect', function () {
            socket.emit('login', {
                uid: "<?=Yii::$app->user->id?>",
                companyID: "<?=Yii::$app->user->identity->fdCompanyID?>"
            });
        });
        // 实时消息,当服务端推送来消息时触发
        socket.on('new_msg', function (msg) {
            if (msg.typeID == '<?= \common\config\Conf::MSG_HANDLE?>') {
                var html = '<li>' +
                    '<a href="' + msg.url + '">' +
                    '<div class="pull-left">' +
                    '<img src="<?= $directoryAsset ?>/img/user2-160x160.jpg" class="img-circle" alt="User Image"/>' +
                    '</div>' +
                    '<h4 style="overflow:hidden;text-overflow:ellipsis;">' + msg.title + '</h4>' +
                    '<p style="overflow:hidden;text-overflow:ellipsis;">' + msg.content + '</p>' +
                    '</a>' +
                    '</li>';
                $('#msg-handle').prepend(html);

                var num = $('.msg-handle-num').eq(1).text() * 1;
                $('.msg-handle-num').text(num + 1);
            }
        });
        
        // 实时动态,当服务端推送来消息时触发
        socket.on('update_dynamic', function (msg) {
            var html = '<li>' +
                '<i class="portrait">' +
                '<img class="img-circle img-bordered-sm portrait-img" src="' + msg.portrait + '" alt="User Image"></i>' +
                '<div class="timeline-item">' +
                '<span class="time">' +
                '<i class="fa fa-clock-o"></i>' + msg.date + '</span>' +
                '<h3 class="timeline-header">' +
                '<a href="' + msg.url + '">' + msg.operator + '</a>' +
                '<span class="desc">' + msg.title + '</span>' +
                '</h3>' +
                '<div class="timeline-body">' + msg.content + '</div>' +
                '</div>' +
                '</li>';
            $('#dynamic-list').prepend(html);
        });
        
        // 在线人数
        socket.on('update_online_count', function (msg) {
            $('#online-people').html(msg);
        });

服务端

/**
     * 消息推送
     */
    public function actionStart()
    {
        // PHPSocketIO服务
        $io = new SocketIO(2120);
        // 客户端发起链接事件时,设置链接socket的各类事件回调
        $io->on('connection', function ($socket) use ($io) {

            /** @var Connection $redis */
            $redis = Yii::$app->redis;

            // 当客户端发来登陆事件时触发
            $socket->on('login', function ($data) use ($socket, $redis, $io) {

                $uid = isset($data['uid']) ? $data['uid'] : null;
                $companyID = isset($data['companyID']) ? $data['companyID'] : null;

                // uid和companyID应该作下加密 by wuzhc 2018-01-28
                if (empty($uid) || empty($companyID)) {
                    return ;
                }
                $this->companyID = $companyID;

                // 已经登陆过了
                if (isset($socket->uid)) {
                    return;
                }

                $key = 'socketio:company:' . $companyID;
                $field = 'uid:' . $uid;

                // 用hash方便统计用户打开页面数量
                if (!$redis->hget($key, $field)) {
                    $redis->hset($key, $field, 0);
                }

                // 同个用户打开新页面时加1
                $redis->hincrby($key, $field, 1);

                // 加入uid分组,方便对同个用户的全部打开页面推送消息
                $socket->join('uid:'. $uid);

                // 加入companyID,方便对整个公司的全部用户推送消息
                $socket->join('company:'.$companyID);

                $socket->uid = $uid;
                $socket->companyID = $companyID;

                // 整个公司在线人数更新
                $io->to('company:'.$companyID)->emit('update_online_count', $redis->hlen($key));
            });

            // 当客户端断开链接是触发(通常是关闭网页或者跳转刷新致使)
            $socket->on('disconnect', function () use ($socket, $redis, $io) {
                if (!isset($socket->uid)) {
                    return;
                }

                $key = 'socketio:company:' . $socket->companyID;
                $field = 'uid:' . $socket->uid;
                $redis->hincrby($key, $field, -1);

                if ($redis->hget($key, $field) <= 0) {
                    $redis->hdel($key, $field);

                    // 某某下线了,刷新整个公司的在线人数
                    $io->to('company:'.$socket->companyID)->emit('update_online_count', $redis->hlen($key));
                }
            });
        });

        // 开始进程时,监听2121端口,用户数据推送
        $io->on('workerStart', function () use ($io) {
            /** @var Connection $redis */
            $redis = Yii::$app->redis;

            $httpWorker = new Worker('http://0.0.0.0:2121');
            // 当http客户端发来数据时触发
            $httpWorker->onMessage = function ($conn, $data) use ($io, $redis) {
                $_POST = $_POST ? $_POST : $_GET;
                switch (@$_POST['action']) {
                    case 'message':
                        $to = 'uid:' . @$_POST['receiverID'];
                        $_POST['content'] = htmlspecialchars(@$_POST['content']);
                        $_POST['title'] = htmlspecialchars(@$_POST['title']);

                        // 有指定uid则向uid所在socket组发送数据
                        if ($to) {
                            $io->to($to)->emit('new_msg', $_POST);
                        }

                        $companyID = @$_POST['companyID'];
                        $key = 'socketio:company:' . $companyID;
                        $field = 'uid:' . $to;

                        // http接口返回,若是用户离线socket返回fail
                        if ($to && $redis->hget($key, $field)) {
                            return $conn->send('offline');
                        } else {
                            return $conn->send('ok');
                        }
                        break;
                    case 'dynamic':
                        $to = 'company:' . @$_POST['companyID'];
                        $_POST['content'] = htmlspecialchars(@$_POST['content']);
                        $_POST['title'] = htmlspecialchars(@$_POST['title']);

                        // 有指定uid则向uid所在socket组发送数据
                        if ($to) {
                            $io->to($to)->emit('update_dynamic', $_POST);
                        }

                        $companyID = @$_POST['companyID'];
                        $key = 'socketio:company:' . $companyID;

                        // http接口返回,若是用户离线socket返回fail
                        if ($to && $redis->hlen($key)) {
                            return $conn->send('offline');
                        } else {
                            return $conn->send('ok');
                        }
                }

                return $conn->send('fail');
            };

            // 执行监听
            $httpWorker->listen();
        });

        // 运行全部的实例
        global $argv;
        array_shift($argv);

        if (isset($argv[2]) && $argv[2] == 'daemon') {
            $argv[2] = '-d';
        }

        Worker::runAll();
    }

例子

/**
     * 新建任务
     * @param $projectID
     * @param $categoryID
     * @return string
     * @throws ForbiddenHttpException
     * @since 2018-01-26
     */
    public function actionCreate($projectID, $categoryID)
    {
        $userID = Yii::$app->user->id;

        if (!Yii::$app->user->can('createTask')) {
            throw new ForbiddenHttpException(ResponseUtil::$msg[1]);
        }

        if ($data = Yii::$app->request->post()) {
            if (empty($data['name'])) {
                ResponseUtil::jsonCORS(['status' => Conf::FAILED, 'msg' => '任务标题不能为空']);
            }

            // 保存任务
            $taskID = TaskService::factory()->save([
                'name'       => $data['name'],
                'creatorID'  => $userID,
                'companyID'  => $this->companyID,
                'level'      => $data['level'],
                'categoryID' => $categoryID,
                'projectID'  => $projectID,
                'content'    => $data['content']
            ]);

            if ($taskID) {
                $url = Url::to(['task/view', 'taskID' => $taskID]);
                $portrait = UserService::factory()->getUserPortrait($userID);
                $username = UserService::factory()->getUserName($userID);
                $title = '建立了新任务';
                $content = $data['name'];

                // 保存操做日志
                LogService::factory()->saveHandleLog([
                    'objectID'   => $taskID,
                    'companyID'  => $this->companyID,
                    'operatorID' => $userID,
                    'objectType' => Conf::OBJECT_TASK,
                    'content'    => $content,
                    'url'        => $url,
                    'title'      => $title,
                    'portrait'   => $portrait,
                    'operator'   => $username
                ]);

                // 动态推送
                MsgService::factory()->push('dynamic', [
                    'companyID'  => $this->companyID,
                    'operatorID' => $userID,
                    'operator'   => $username,
                    'portrait'   => $portrait,
                    'title'      => $title,
                    'content'    => $content,
                    'url'        => $url,
                    'date'       => date('Y-m-d H:i:s'),
                ]);

                ResponseUtil::jsonCORS(null, Conf::SUCCESS, '建立成功');
            } else {
                ResponseUtil::jsonCORS(null, Conf::FAILED, '建立失败');
            }
        } else {
            return $this->render('create', [
                'projectID' => $projectID,
                'category'  => TaskCategory::findOne(['id' => $categoryID]),
            ]);
        }
    }

调用保存操做日志底层接口

LogService::factory()->saveHandleLog($args)

参数说明

参数 说明
operatorID 操做者ID,用于用户跳转连接
companyID 公司ID,用于该公司下的动态
operator 操做者姓名
portrait 操做者头像
receiver 接受者,可选
title 动态标题,例如建立任务
content 动态内容,例如建立任务的名称
date 动态时间
url 动态跳转连接
token 验证
objectType 对象类型,例如任务,文档等等
objectID 对象ID,例如任务ID,文档ID等等
  • companyID 用于查询该公司下的动态
  • objectType和objectID组合能够查询某个对象的操做日志,例如一个任务被操做的日志

保存消息

参数 说明
senderID 发送者ID,用于用户跳转连接
companyID 公司ID,用于该公司下的动态
sender 发送者姓名
portrait 发送者头像
receiverID 接受者ID
title 动态标题,例如建立任务
content 动态内容,例如建立任务的名称
date 动态时间
url 动态跳转连接
typeID 消息类型,1系统通知,2管理员通知,3操做通知
  • receiverID和typeID和isRead组合能够查询用户未读消息

请求接口

MsgService::factory()->push($action, $args)

即时动态

参数 说明
operatorID 操做者ID,用于用户跳转连接
companyID 公司ID,用于给该公司的全部socket推送
operator 操做者姓名
portrait 操做者头像
receiver 接受者,可选
title 动态标题,例如建立任务
content 动态内容,例如建立任务的名称
date 动态时间
url 动态跳转连接

即时消息

参数 说明
senderID 发送者ID,用于用户跳转连接
sender 发送者姓名
receiverID 接受者ID,用于给接受者的全部socket推送消息
typeID 消息类型,1系统通知,2管理员通知,3操做通知
portrait 发送者头像
title 消息标题
content 消息内容
url 消息跳转连接

完整代码参考:https://github.com/wuzhc/team

相关文章
相关标签/搜索