应该是去年的时候开始接触openfire,当时在分析后发现基于xmpp协议的openfire已经具有了群聊的功能。也就没太当回事,以为加点功能就能够作成相似于QQ群的那种模式。后来仔细了解后才发现并非这么简单:web
实际上对于openfire的muc改造来讲,持久化成员是第一个重要的工做。咱们指望的是这个房间里的人都是固定的成员,这些成员能够离开聊天室,但下次能够进来继续聊天。其实实现起来也挺简单的:redis
这里ofMucAffiliation+ofMucMember保存的数据实际上是用于记录的是用户权限的,固然会发现其实这已经对应上咱们需求上的群成员咯?确实是这样的。数据库
只不过有一个问题,就是ofMucAffiliation+ofMucMember里只能知道用户的jid,可是群的话可能每一个用户会修改本身的昵称,对不对?因此还要加一个表用于保存这种用户的个性化数据。固然这个表也挺简单的就不细写了。api
这块涉及到写插件的技术,网上有不少,我就很少说了。缓存
其实这些功能无非就是增删改查,并且咱们添加的功能完成能够独立于openfire以外,因此本身写一套也是能够的。好比用web的方式实现也是能够的。服务器
特别是能够设计成rest api,这样对于端来讲是比较友好通用的,兼顾PC、移动端就简单多了,特别是移动端走http协议总比走长连接方便吧。网络
简单的介绍了群的实现,另一个比较头痛的问题就是muc离线消息。在openfire里是有相似的支持的,这里就作一些简单的分析吧。ide
由于在openfire里历史消息推送策略是这样的,咱们看一下它的策略类HistoryStrategy,里面设定了一个枚举:this
/** * Strategy type. */ public enum Type { defaulType, none, all, number; }
能够看出,其实就是三种:none(不显示历史记录)、all(显示整个历史记录)、number(指定条数记录)。默认的是number。插件
策略类会维护一个内存列表,用于给新加入的用户发送历史记录用:
private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();
实际上本身也能够实现一套策略来代替它,好比将消息存在redis之类。只不过Openfire并无提供扩展,只能是修改openfire代码来实现咯。
历史消息的保存是在openfire里的MultiUserChatServiceImpl里实现的,它会启动一个TimerTask,定时的将消息保存到历史消息表里。下面是定时任务的实现
/** * Logs the conversation of the rooms that have this feature enabled. */ private class LogConversationTask extends TimerTask { @Override public void run() { try { logConversation(); } catch (Throwable e) { Log.error(LocaleUtils.getLocalizedString("admin.error"), e); } } } private void logConversation() { ConversationLogEntry entry; boolean success; for (int index = 0; index <= log_batch_size && !logQueue.isEmpty(); index++) { entry = logQueue.poll(); if (entry != null) { success = MUCPersistenceManager.saveConversationLogEntry(entry); if (!success) { logQueue.add(entry); } } } }
这是具体的保存聊天历史的代码,能够看到消息是放在logQueue里的,而后定时任务从里面取必定的条数保存到数据库存中。MUCPersistenceManager就是数据库的访问类。
在start方法里启动
@Override public void start() { XMPPServer.getInstance().addServerListener( this ); // Run through the users every 5 minutes after a 5 minutes server startup delay (default // values) userTimeoutTask = new UserTimeoutTask(); TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout); // Log the room conversations every 5 minutes after a 5 minutes server startup delay // (default values) logConversationTask = new LogConversationTask(); TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout); // Remove unused rooms from memory cleanupTask = new CleanupTask(); TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY); // Set us up to answer disco item requests XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this); XMPPServer.getInstance().getIQDiscoInfoHandler().setServerNodeInfoProvider(this.getServiceDomain(), this); XMPPServer.getInstance().getServerItemsProviders().add(this); ArrayList<String> params = new ArrayList<>(); params.clear(); params.add(getServiceDomain()); Log.info(LocaleUtils.getLocalizedString("startup.starting.muc", params)); // Load all the persistent rooms to memory for (LocalMUCRoom room : MUCPersistenceManager.loadRoomsFromDB(this, this.getCleanupDate(), router)) { rooms.put(room.getName().toLowerCase(), room); } }
这里是聊天室服务启动的过程,它会启动LogConversationTask用于按期将聊天记录保存到库里。并且这里最后几句会发现启动时会从库里读取数据(MUCPersistenceManager.loadRoomsFromDB),loadRoomsFromDB实现了读取Hitory数据到historyStrategy里。
具体的数据保存在ofMucConversationLog表中。
有了历史消息推送策略和数据,那么怎么样推送给客户端呢?这里有一个history协议,在LocalMUCUser处理packet的时候,若是这个packet是Presence,而且带有history节说明是客户端发来要历史记录的。
在LocalMUCUser.process(Presence packet)里有history消息节的处理代码,由于代码太多,就截取关键的部分:
// Get or create the room MUCRoom room = server.getChatRoom(group, packet.getFrom()); // User must support MUC in order to create a room HistoryRequest historyRequest = null; String password = null; // Check for password & requested history if client supports MUC if (mucInfo != null) { password = mucInfo.elementTextTrim("password"); if (mucInfo.element("history") != null) { historyRequest = new HistoryRequest(mucInfo); } } // The user joins the room role = room.joinRoom(recipient.getResource().trim(), password, historyRequest, this, packet.createCopy());
这里能够看到,先获取到historyRequest节的信息,而后调用room.joinRoom方法。这里的room.joinRoom就是用户加入聊天室的关键部分。在joinRoom里会发送历史消息给这个用户:
if (historyRequest == null) { Iterator<Message> history = roomHistory.getMessageHistory(); while (history.hasNext()) { joinRole.send(history.next()); } } else { historyRequest.sendHistory(joinRole, roomHistory); }
这里会发现有两种状况,1种是historyRequest为空的状况时,服务端默认按照策略的设置向用户发送历史消息。若是不为空,则根据客户端的请求参数发送。那么这里咱们看看historyRequest的实现:
public class HistoryRequest { private static final Logger Log = LoggerFactory.getLogger(HistoryRequest.class); private static final XMPPDateTimeFormat xmppDateTime = new XMPPDateTimeFormat(); private int maxChars = -1; private int maxStanzas = -1; private int seconds = -1; private Date since; public HistoryRequest(Element userFragment) { Element history = userFragment.element("history"); if (history != null) { if (history.attribute("maxchars") != null) { this.maxChars = Integer.parseInt(history.attributeValue("maxchars")); } if (history.attribute("maxstanzas") != null) { this.maxStanzas = Integer.parseInt(history.attributeValue("maxstanzas")); } if (history.attribute("seconds") != null) { this.seconds = Integer.parseInt(history.attributeValue("seconds")); } if (history.attribute("since") != null) { try { // parse since String into Date this.since = xmppDateTime.parseString(history.attributeValue("since")); } catch(ParseException pe) { Log.error("Error parsing date from history management", pe); this.since = null; } } } } /** * Returns the total number of characters to receive in the history. * * @return total number of characters to receive in the history. */ public int getMaxChars() { return maxChars; } /** * Returns the total number of messages to receive in the history. * * @return the total number of messages to receive in the history. */ public int getMaxStanzas() { return maxStanzas; } /** * Returns the number of seconds to use to filter the messages received during that time. * In other words, only the messages received in the last "X" seconds will be included in * the history. * * @return the number of seconds to use to filter the messages received during that time. */ public int getSeconds() { return seconds; } /** * Returns the since date to use to filter the messages received during that time. * In other words, only the messages received since the datetime specified will be * included in the history. * * @return the since date to use to filter the messages received during that time. */ public Date getSince() { return since; } /** * Returns true if the history has been configured with some values. * * @return true if the history has been configured with some values. */ private boolean isConfigured() { return maxChars > -1 || maxStanzas > -1 || seconds > -1 || since != null; } /** * Sends the smallest amount of traffic that meets any combination of the requested criteria. * * @param joinRole the user that will receive the history. * @param roomHistory the history of the room. */ public void sendHistory(LocalMUCRole joinRole, MUCRoomHistory roomHistory) { if (!isConfigured()) { Iterator<Message> history = roomHistory.getMessageHistory(); while (history.hasNext()) { joinRole.send(history.next()); } } else { Message changedSubject = roomHistory.getChangedSubject(); boolean addChangedSubject = (changedSubject != null) ? true : false; if (getMaxChars() == 0) { // The user requested to receive no history if (addChangedSubject) { joinRole.send(changedSubject); } return; } int accumulatedChars = 0; int accumulatedStanzas = 0; Element delayInformation; LinkedList<Message> historyToSend = new LinkedList<>(); ListIterator<Message> iterator = roomHistory.getReverseMessageHistory(); while (iterator.hasPrevious()) { Message message = iterator.previous(); // Update number of characters to send String text = message.getBody() == null ? message.getSubject() : message.getBody(); if (text == null) { // Skip this message since it has no body and no subject continue; } accumulatedChars += text.length(); if (getMaxChars() > -1 && accumulatedChars > getMaxChars()) { // Stop collecting history since we have exceded a limit break; } // Update number of messages to send accumulatedStanzas ++; if (getMaxStanzas() > -1 && accumulatedStanzas > getMaxStanzas()) { // Stop collecting history since we have exceded a limit break; } if (getSeconds() > -1 || getSince() != null) { delayInformation = message.getChildElement("delay", "urn:xmpp:delay"); try { // Get the date when the historic message was sent Date delayedDate = xmppDateTime.parseString(delayInformation.attributeValue("stamp")); if (getSince() != null && delayedDate != null && delayedDate.before(getSince())) { // Stop collecting history since we have exceded a limit break; } if (getSeconds() > -1) { Date current = new Date(); long diff = (current.getTime() - delayedDate.getTime()) / 1000; if (getSeconds() <= diff) { // Stop collecting history since we have exceded a limit break; } } } catch (Exception e) { Log.error("Error parsing date from historic message", e); } } // Don't add the latest subject change if it's already in the history. if (addChangedSubject) { if (changedSubject != null && changedSubject.equals(message)) { addChangedSubject = false; } } historyToSend.addFirst(message); } // Check if we should add the latest subject change. if (addChangedSubject) { historyToSend.addFirst(changedSubject); } // Send the smallest amount of traffic to the user for (Object aHistoryToSend : historyToSend) { joinRole.send((Message) aHistoryToSend); } } } }
这里面主要是用于约定发送历史消息的一些参数:
private int maxChars = -1; private int maxStanzas = -1; private int seconds = -1; private Date since;
这是能够设定的几个参数,具体的对应关系以下面的表格所示
属性 | 数据类型 | 含义 |
---|---|---|
maxchars | int | 限制历史中的字符总数为"X" (这里的字符数量是所有 XML 节的字符数, 不仅是它们的 XML 字符数据). |
maxstanzas | int | 制历史中的消息总数为"X". |
seconds | int | 仅发送最后 "X" 秒收到的消息. |
since | datetime | 仅发送从指定日期时间 datetime 以后收到的消息 (这个datatime必须 MUST 符合XMPP Date and Time Profiles 13 定义的DateTime 规则,). |
固然这里还实现了一个sendHistory方法,也就是针对客户端提交了查询要求时的历史消息发送方法。具体的实现上面的代码吧。也就是根据历史管理属性里设定的几个参数进行针对性的发送。
可是这里有个关键点就是since属性,它表示客户端能够设定一个时间戳,服务端根据发送这个时间戳以后的增量数据给客户端。这个对于客户端而已仍是颇有做用的。
那么看完了openfire的历史消息的实现,再来实现离线消息是否是就简单的多了。群聊天历史消息有几个问题:
因此不用举太多问题,就这两个就够了,那么我以为openfire的这种历史消息策略中使用number(条数)是很重要的。好比服务器只缓存最近1000条聊天历史,这样总体的服务器缓存量就低了。这就解决了第一个问题。
若是群用户须要查询历史上的数据,应该是另开一个服务接口专门用于查询历史数据,这样就不用在刚上线进入群时接收一堆的离线消息。
前面分析HistoryRequest时提到了它能够设置一个时间戳参数,这个是告诉服务端从这个参数以后的历史消息推送过来。
好比,用户A昨天晚20:00下的线(最后消息时间戳是2017-06-07 20:00:00),今天早上8:00上线。在用户A离线的时间里有100条离心线消息记录。
那么用户A上线,客户端发送HistoryRequest(since=2017-06-07 20:00:00),服务器则只发送2017-06-07 20:00:00以后的聊天记录100条。这样就实现了增量的消息,对于服务端和客户端都是友好的。
固然,这里能发多少消息最终仍是要看服务端缓存了多少消息用于发送给客户端,毕竟就像问题2中提出的那样,用户可能一个月都不上线,这期间的历史消息要是都推送那确定崩了。因此上线时的历史消息推送这个功能仅适合推送少许的数据。这个在具体的系统设计时应该根据实际状况来设计。