在团队协同工具worktile的使用过程当中,你会发现不管是右上角的消息通知,仍是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于xmpp协议、erlang语言实现的ejabberd,并在其源码基础上,结合咱们的业务,对源码做了修改以适配咱们自身的需求。另外,基于amqp协议也能够做为实时消息推送的一种选择,踢踢网就是采用rabbitmq+stomp协议实现的消息推送服务。本文将结合我在worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。html
相较于手机端的消息推送(通常都是以socket方式实现),web端是基于http协议,很难像tcp同样保持长链接。但随着技术的发展,出现了websocket, comet等新的技术能够达到相似长链接的效果,这些技术大致可分为如下几类:java
短轮询。页面端经过js定时异步刷新,这种方式实时效果较差。node
长轮询。页面端经过js异步请求服务端,服务端在接收到请求后,若是该次请求没有数据,则挂起此次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例以下:mysql
服务端的实现中,不管采用ejabberd仍是rabbitmq,都是基于erlang语言开发的,因此必须安装erlang运行时环境。Erlang是一种函数式语言,具备容错、高并发的特色,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中间件有rabbitmq等。对于服务端程序员来讲,erlang提供的高并发、容错、热部署等特性是其余语言没法达到的。不管在实时通讯仍是在游戏程序中,用erlang能够很容易为每个上线用户建立一个对应的process,对一台4核8个G的服务器来讲,承载上百万个这样的process是很是轻松的事。下图是erlang程序发起process的通常性示意图:nginx
如图所示,Session manager(or gateway)负责为每一个用户(uid)建立相对应的process, 并把这个对应关系(map)存放到数据表中。每一个process则对应用户数据,而且他们之间能够相互发送消息。Erlang的优点就是在内存足够的状况下建立上百万个这样的process,并且它的建立和销毁比java的thread要轻量的多,二者不是一个数量级的。git
好了,咱们如今开始着手erlang环境的搭建(实验的系统为ubuntu12.04, 4核8个G内存):程序员
一、依赖库安装github
sudo apt-get install build-essential sudo apt-get install libncurses5-dev sudo apt-get install libssl-dev libyaml-dev sudo apt-get install m4 sudo apt-get install unixodbc unixodbc-dev sudo apt-get install freeglut3-dev libwxgtk2.8-dev sudo apt-get install xsltproc sudo apt-get install fop tk8.5 libxml2-utils
二、官网下载otp源码包(http://www.erlang.org/download.html), 解压并安装:web
\>\> tar zxvf otpsrcR16B01.tar.gz \>\> cd otpsrcR16B01 \>\> configure \>\> make & make install
至此,erlang运行环境就完成了。下面将分别介绍rabbitmq和ejabberd构建实时消息服务。redis
RabbitMQ是在业界普遍应用的消息中间件,也是对AMQP协议实现最好的一种中间件。AMQP协议中定义了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等实体,他们的关系以下图所示:
消息发布者(Producer)链接交换器(Exchange), 交换器和消息队列(Message Queue)经过key进行Binding,Binding是根据Exchange的类型(分为fanout、direct、topic、header)分别对消息做不一样形式的派发。Message Queue又分为durable、temporary、auto-delete三种类型,durable queue是持久化队列,不会由于服务shutdown而消失,temporary queue则服务重启后会消失,auto-delete则是在没有consumer链接时自动删除。另外RabbitMQ有不少第三方插件,能够基于AMQP协议基础之上作出不少扩展的应用。下面咱们将介绍web stomp插件构建基于AMQP之上的stomp文本协议,经过浏览器websocket达到实时的消息传输。系统的结构如图:
如图所示,web端咱们使用stomp.js和sockjs.js与rabbitmq的web stomp plugin通讯,手机端能够用stompj, gozirra(Android)或者objc-stomp(IOS)经过stomp协议与rabbitmq收发消息。由于咱们是实时消息系统一般都是要与已有的用户系统结合,rabbitmq能够经过第三方插件rabbitmq-auth-backend-http来适配已有的用户系统,这个插件能够经过http接口完成用户链接时的认证过程。固然,认证方式还有ldap等其余方式。下面介绍具体步骤:
\>\> tar zxf rabbitmq-server-x.x.x.tar.gz \>\> cd rabbitmq-server-x.x.x \>\> make & make install
\>\> cd /path/to/your/rabbitmq \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples \>\> ./sbin/rabbitmqctl stop \>\> ./sbin/rabbitmqctl start \>\> ./sbin/rabbitmqctl status
将会显示下图所示的运行的插件列表
\>\> cd /path/to/your/rabbitmq/plugins \>\>wget http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez \>\> cd .. \>\> ./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http
编辑rabbitmq.config文件(默认存放于/etc/rabbitmq/下),添加:
[ ... {rabbit, [{auth_backends, [rabbit_auth_backend_http]}]}, ... {rabbitmq_auth_backend_http, [{user_path, “http://your-server/auth/user”}, {vhost_path, “http://your-server/auth/vhost”}, {resource_path, “http://your-server/auth/resource”} ]} ... ].
其中,user_path是根据用户名密码进行校验,vhost_path是校验是否有权限访问vhost, resource_path是校验用户对传入的exchange、queue是否有权限。我下面的代码是用nodejs实现的这三个接口的示例:
var express = require('express'); var app = express(); app.get('/auth/user', function(req, res){ var name = req.query.username; var pass = req.query.password; console.log("name : " + name + ", pass : " + pass); if(name === 'guest' && pass === "guest"){ console.log("allow"); res.send("allow"); }else{ res.send('deny'); } }); app.get('/auth/vhost', function(req, res){ console.log("/auth/vhost"); res.send("allow"); }); app.get('/auth/resource', function(req, res){ console.log("/auth/resource"); res.send("allow"); }); app.listen(3000);
...... var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp'); var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; client.debug = pipe('#second'); var print_first = pipe('#first', function(data) { client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data); }); var on_connect = function(x) { id = client.subscribe("/exchange/feed/user_x", function(d) { print_first(d.body); }); }; var on_error = function() { console.log('error'); }; client.connect('guest1', 'guest1', on_connect, on_error, '/'); ......
须要说明的时,在这里咱们首先要在rabbitmq实例中建立feed这个exchange,咱们用stomp.js链接成功后,根据当前登录用户的id(user_x)
绑定到这个exchange,即 subscribe("/exchange/feed/user_x", ...)
这个操做的行为,这样在向rabbitmq中feed exchange发送消息并指定用户id(user_x)为key,页面端就会经过websocket实时接收到这条消息。
到目前为止,基于rabbitmq+stomp实现web端消息推送就已经完成,其中不少的细节须要小伙伴们亲自去实践了,这里就很少说了。实践过程当中能够参照官方文档:
以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下如今在Worktile中如何经过ejabberd实现消息推送。
与rabbitmq不一样,ejabberd是xmpp协议的一种实现,与amqp相比,xmpp普遍应用于即时通讯领域。Xmpp协议的实现有不少种,好比java的openfire,但相较其余实现,ejabberd的并发性能无疑使最优秀的。Xmpp协议的前身是jabber协议,早期的jabber协议主要包括在线状态(presence)、好友花名册(roster)、IQ(Info/Query)几个部分。如今jabber已经成为rfc的官方标准,如rfc2799, rfc4622, rfc6121,以及xmpp的扩展协议(xep)。Worktile Web端的消息提醒功能就是基于XEP-012四、XEP-0206定义的BOSH扩展协议。
因为自身业务的须要,咱们对ejabberd的用户认证和好友列表模块的源码进行修改,经过redis保存用户的在线状态,而不是mnesia和mysql。另外好友这块咱们是从已有的数据库中(mongodb)中获取项目或团队的成员。Web端经过strophe.js来链接(http-bind),strophe.js能够以长轮询和websocket两种方式来链接,因为ejabberd尚未好的websocket的实现,就采用了BOSH的方式模拟长链接。整个系统的结构以下:
xmpp-framwork
链接, Android能够用smack直接连ejabberd服务器集群。这些都是现有的库,无需对client进行开发。用户认证直接修改了ejabberd_auth_internal.erl文件,经过mongodb驱动链接用户库,在线状态等功能是新加了模块,其部分代码以下:
-module(wt_mod_proj). -behaviour(gen_mod). -behaviour(gen_server). -include("ejabberd.hrl"). -include("logger.hrl"). -include("jlib.hrl"). -define(SUPERVISOR, ejabberd_sup). ... -define(ONLINE, 1). -define(OFFLINE, 0). -define(BUSY, 2). -define(LEAVE, 3). ... %% API -export([start_link/2, get_proj_online_users/2]). %% gen_mod callbacks -export([start/2, stop/1]). %% gen_server callbacks -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). %% Hook callbacks -export([user_available/1, unset_presence/3, set_presence/4]). -export([get_redis/1, remove_online_user/3, append_online_user/3]). ... -record(state,{host = <<"">>, server_host, rconn, mconn}). start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). user_available(New) -> LUser = New#jid.luser, LServer = New#jid.lserver, Proc = gen_mod:get_module_proc(LServer, ?MODULE), gen_server:cast(Proc, {user_available, LUser, LServer}). append_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {append_online_user, Uid, Proj}). remove_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {remove_online_user, Uid, Proj}). ... set_presence(User, Server, Resource, Packet) -> Proc = gen_mod:get_module_proc(Server, ?MODULE), gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}). ... start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 2000, worker, [?MODULE]}, supervisor:start_child(?SUPERVISOR, ChildSpec). stop(Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, stop), supervisor:delete_child(?SUPERVISOR, Proc). init([Host, Opts]) -> MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>), RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST), RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT), ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50), MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST), MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT), {ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}), C = c(RedisHost, RedisPort), ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}. terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) -> ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50), eredis:stop(C), ok. ... handle_call({append_online_user, Uid, ProjId}, _From, State) -> C = State#state.rconn, Key = <<?PRE_RPOJ_ONLINE_USERS/binary, ProjId/binary>>, Resp = eredis:q(C, ["SADD", Key, Uid]), {reply, Resp, State}; handle_call({remove_online_user, Uid, ProjId}, _From, State) -> ... handle_call({get_proj_online_users, ProjId}, _From, State) -> ... handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) -> C = State#state.rconn, Key = <<?USER_PRESENCE/binary, User/binary>>, Pids = get_user_projs(User, Mongo), Cmd = get_proj_key(Pids, ["SUNION"]), case xml:get_subtag_cdata(Packet, <<"show">>) of <<"away">> -> eredis:q(C, ["SET", Key, ?LEAVE]); <<"offline">> -> ... handle_cast(_Msg, State) -> {noreply, State}. handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) -> case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]); _ -> ok end, {noreply, State}; handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. ...
其中,user\_available\_hook和sm\_remove\_connection\_hook
就是用户上线和用户断开链接触发的事件,ejabberd 中正是因为这些hook,才能很容易扩展功能。
在用tsung对ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登陆、设置在线状态、发送一条文本消息、关闭链接操做,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。因为压力测试比较耗时,再等到有时间的时候,会在作一些更深刻的测试。
对于ejabberd的安装与集群的搭建,你们能够参照官方文档,这里再也不赘述。若是在使用过程当中有什么问题,能够加入worktile官方群(110257147),进行讨论。