教你如何把openfire的muc聊天室改造为群

openfire群聊与QQ群对比

应该是去年的时候开始接触openfire,当时在分析后发现基于xmpp协议的openfire已经具有了群聊的功能。也就没太当回事,以为加点功能就能够作成相似于QQ群的那种模式。后来仔细了解后才发现并非这么简单:web

  • muc其实聊天室的形式,房间建立后能够加入聊天,用户离开就退出聊天室了,并无一个用户固化的功能,因此要单独为这部分开发
  • muc由于没有固化的成员关系,因此并无1对1聊天的那种离线消息。并且考虑到消息量是群发的缘由,因此服务器对于加入聊天室的成员只会推送必定量的消息,固然这个能够经过策略来配置为所有推送。事实上考虑到群聊天的特性,推送指定条数多是更靠谱的。
  • 还有一些QQ特有的功能,好比邀请进群须要管理员审核之类的管理功能就更少了,这块都须要扩展实现

改造Openfire群聊天室为群

实际上对于openfire的muc改造来讲,持久化成员是第一个重要的工做。咱们指望的是这个房间里的人都是固定的成员,这些成员能够离开聊天室,但下次能够进来继续聊天。其实实现起来也挺简单的:redis

基于openfire的实现

  1. 创建数据表,用于保存成员列表
    在openfire里已经有一系列的表用于保存muc相关的数据:
  • ofMucRoom-这个是房间表,保存了聊天室的信息
  • ofMucAffiliation-这个是保存房间里管理员角色人员的表(owner(10)、admin(20)、outcast(40))
  • ofMucMember-这个是房间里成员的列表(对应的是member(30))

这里ofMucAffiliation+ofMucMember保存的数据实际上是用于记录的是用户权限的,固然会发现其实这已经对应上咱们需求上的群成员咯?确实是这样的。数据库

只不过有一个问题,就是ofMucAffiliation+ofMucMember里只能知道用户的jid,可是群的话可能每一个用户会修改本身的昵称,对不对?因此还要加一个表用于保存这种用户的个性化数据。固然这个表也挺简单的就不细写了。api

  1. 经过openfire的插件体系增长一个插件,在服务端实现加群、退群等功能
    毕竟xmpp协议里是没有得到群列表和房间成员的功能的,以及一些加群、退群的管理功能都没有,因此要本身开发。这里能够经过openfire的插件体系来作,这样比较独立,不影响openfire内核功能。

这块涉及到写插件的技术,网上有不少,我就很少说了。缓存

  1. 本身定义一套协议来完成客户端与服务端的通信
    由于要走openfire,因此仍是要定义xmpp协议,我用的是IQ。考虑到我使用的是smack作的,因此这部分就再也不写了。有兴趣或者须要的网上找找IQ协议的写法就好了。

其余方式

其实这些功能无非就是增删改查,并且咱们添加的功能完成能够独立于openfire以外,因此本身写一套也是能够的。好比用web的方式实现也是能够的。服务器

特别是能够设计成rest api,这样对于端来讲是比较友好通用的,兼顾PC、移动端就简单多了,特别是移动端走http协议总比走长连接方便吧。网络

分析openfire muc群聊历史消息的实现

简单的介绍了群的实现,另一个比较头痛的问题就是muc离线消息。在openfire里是有相似的支持的,这里就作一些简单的分析吧。ide

历史消息策略HistoryStrategy

由于在openfire里历史消息推送策略是这样的,咱们看一下它的策略类HistoryStrategy,里面设定了一个枚举:this

/**
 * Strategy type.
 */
public enum Type {
    defaulType, none, all, number;
}

能够看出,其实就是三种:none(不显示历史记录)、all(显示整个历史记录)、number(指定条数记录)。默认的是number。插件

策略类会维护一个内存列表,用于给新加入的用户发送历史记录用:

private ConcurrentLinkedQueue<Message> history = new ConcurrentLinkedQueue<>();

实际上本身也能够实现一套策略来代替它,好比将消息存在redis之类。只不过Openfire并无提供扩展,只能是修改openfire代码来实现咯。

历史消息的保存与维护

历史消息的保存是在openfire里的MultiUserChatServiceImpl里实现的,它会启动一个TimerTask,定时的将消息保存到历史消息表里。下面是定时任务的实现

/**
 * Logs the conversation of the rooms that have this feature enabled.
 */
private class LogConversationTask extends TimerTask {
    @Override
    public void run() {
        try {
            logConversation();
        }
        catch (Throwable e) {
            Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
        }
    }
}

private void logConversation() {
    ConversationLogEntry entry;
    boolean success;
    for (int index = 0; index <= log_batch_size && !logQueue.isEmpty(); index++) {
        entry = logQueue.poll();
        if (entry != null) {
            success = MUCPersistenceManager.saveConversationLogEntry(entry);
            if (!success) {
                logQueue.add(entry);
            }
        }
    }
}

这是具体的保存聊天历史的代码,能够看到消息是放在logQueue里的,而后定时任务从里面取必定的条数保存到数据库存中。MUCPersistenceManager就是数据库的访问类。

在start方法里启动

@Override
public void start() {
    XMPPServer.getInstance().addServerListener( this );

    // Run through the users every 5 minutes after a 5 minutes server startup delay (default
    // values)
    userTimeoutTask = new UserTimeoutTask();
    TaskEngine.getInstance().schedule(userTimeoutTask, user_timeout, user_timeout);
    // Log the room conversations every 5 minutes after a 5 minutes server startup delay
    // (default values)
    logConversationTask = new LogConversationTask();
    TaskEngine.getInstance().schedule(logConversationTask, log_timeout, log_timeout);
    // Remove unused rooms from memory
    cleanupTask = new CleanupTask();
    TaskEngine.getInstance().schedule(cleanupTask, CLEANUP_FREQUENCY, CLEANUP_FREQUENCY);

    // Set us up to answer disco item requests
    XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
    XMPPServer.getInstance().getIQDiscoInfoHandler().setServerNodeInfoProvider(this.getServiceDomain(), this);
    XMPPServer.getInstance().getServerItemsProviders().add(this);

    ArrayList<String> params = new ArrayList<>();
    params.clear();
    params.add(getServiceDomain());
    Log.info(LocaleUtils.getLocalizedString("startup.starting.muc", params));
    // Load all the persistent rooms to memory
    for (LocalMUCRoom room : MUCPersistenceManager.loadRoomsFromDB(this, this.getCleanupDate(), router)) {
        rooms.put(room.getName().toLowerCase(), room);
    }
}

这里是聊天室服务启动的过程,它会启动LogConversationTask用于按期将聊天记录保存到库里。并且这里最后几句会发现启动时会从库里读取数据(MUCPersistenceManager.loadRoomsFromDB),loadRoomsFromDB实现了读取Hitory数据到historyStrategy里。

具体的数据保存在ofMucConversationLog表中。

如何推送历史消息给客户端

有了历史消息推送策略和数据,那么怎么样推送给客户端呢?这里有一个history协议,在LocalMUCUser处理packet的时候,若是这个packet是Presence,而且带有history节说明是客户端发来要历史记录的。

在LocalMUCUser.process(Presence packet)里有history消息节的处理代码,由于代码太多,就截取关键的部分:

// Get or create the room
MUCRoom room = server.getChatRoom(group, packet.getFrom());
// User must support MUC in order to create a room
HistoryRequest historyRequest = null;
String password = null;
// Check for password & requested history if client supports MUC
if (mucInfo != null) {
    password = mucInfo.elementTextTrim("password");
    if (mucInfo.element("history") != null) {
        historyRequest = new HistoryRequest(mucInfo);
    }
}
// The user joins the room
role = room.joinRoom(recipient.getResource().trim(),
        password,
        historyRequest,
        this,
        packet.createCopy());

这里能够看到,先获取到historyRequest节的信息,而后调用room.joinRoom方法。这里的room.joinRoom就是用户加入聊天室的关键部分。在joinRoom里会发送历史消息给这个用户:

if (historyRequest == null) {
    Iterator<Message> history = roomHistory.getMessageHistory();
    while (history.hasNext()) {
        joinRole.send(history.next());
    }
}
else {
    historyRequest.sendHistory(joinRole, roomHistory);
}

这里会发现有两种状况,1种是historyRequest为空的状况时,服务端默认按照策略的设置向用户发送历史消息。若是不为空,则根据客户端的请求参数发送。那么这里咱们看看historyRequest的实现:

public class HistoryRequest {

    private static final Logger Log = LoggerFactory.getLogger(HistoryRequest.class);
    private static final XMPPDateTimeFormat xmppDateTime = new XMPPDateTimeFormat();

    private int maxChars = -1;
    private int maxStanzas = -1;
    private int seconds = -1;
    private Date since;

    public HistoryRequest(Element userFragment) {
        Element history = userFragment.element("history");
        if (history != null) {
            if (history.attribute("maxchars") != null) {
                this.maxChars = Integer.parseInt(history.attributeValue("maxchars"));
            }
            if (history.attribute("maxstanzas") != null) {
                this.maxStanzas = Integer.parseInt(history.attributeValue("maxstanzas"));
            }
            if (history.attribute("seconds") != null) {
                this.seconds = Integer.parseInt(history.attributeValue("seconds"));
            }
            if (history.attribute("since") != null) {
                try {
                    // parse since String into Date
                    this.since = xmppDateTime.parseString(history.attributeValue("since"));
                }
                catch(ParseException pe) {
                    Log.error("Error parsing date from history management", pe);
                    this.since = null;
                }
            }
        }
    }
    
    /**
     * Returns the total number of characters to receive in the history.
     * 
     * @return total number of characters to receive in the history.
     */
    public int getMaxChars() {
        return maxChars;
    }

    /**
     * Returns the total number of messages to receive in the history.
     * 
     * @return the total number of messages to receive in the history.
     */
    public int getMaxStanzas() {
        return maxStanzas;
    }

    /**
     * Returns the number of seconds to use to filter the messages received during that time. 
     * In other words, only the messages received in the last "X" seconds will be included in 
     * the history.
     * 
     * @return the number of seconds to use to filter the messages received during that time.
     */
    public int getSeconds() {
        return seconds;
    }

    /**
     * Returns the since date to use to filter the messages received during that time. 
     * In other words, only the messages received since the datetime specified will be 
     * included in the history.
     * 
     * @return the since date to use to filter the messages received during that time.
     */
    public Date getSince() {
        return since;
    }

    /**
     * Returns true if the history has been configured with some values.
     * 
     * @return true if the history has been configured with some values.
     */
    private boolean isConfigured() {
        return maxChars > -1 || maxStanzas > -1 || seconds > -1 || since != null;
    }

    /**
     * Sends the smallest amount of traffic that meets any combination of the requested criteria.
     * 
     * @param joinRole the user that will receive the history.
     * @param roomHistory the history of the room.
     */
    public void sendHistory(LocalMUCRole joinRole, MUCRoomHistory roomHistory) {
        if (!isConfigured()) {
            Iterator<Message> history = roomHistory.getMessageHistory();
            while (history.hasNext()) {
                joinRole.send(history.next());
            }
        }
        else {
            Message changedSubject = roomHistory.getChangedSubject();
            boolean addChangedSubject = (changedSubject != null) ? true : false;
            if (getMaxChars() == 0) {
                // The user requested to receive no history
                if (addChangedSubject) {
                    joinRole.send(changedSubject);
                }
                return;
            }
            int accumulatedChars = 0;
            int accumulatedStanzas = 0;
            Element delayInformation;
            LinkedList<Message> historyToSend = new LinkedList<>();
            ListIterator<Message> iterator = roomHistory.getReverseMessageHistory();
            while (iterator.hasPrevious()) {
                Message message = iterator.previous();
                // Update number of characters to send
                String text = message.getBody() == null ? message.getSubject() : message.getBody();
                if (text == null) {
                    // Skip this message since it has no body and no subject  
                    continue;
                }
                accumulatedChars += text.length();
                if (getMaxChars() > -1 && accumulatedChars > getMaxChars()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }
                // Update number of messages to send
                accumulatedStanzas ++;
                if (getMaxStanzas() > -1 && accumulatedStanzas > getMaxStanzas()) {
                    // Stop collecting history since we have exceded a limit
                    break;
                }

                if (getSeconds() > -1 || getSince() != null) {
                    delayInformation = message.getChildElement("delay", "urn:xmpp:delay");
                    try {
                        // Get the date when the historic message was sent
                        Date delayedDate = xmppDateTime.parseString(delayInformation.attributeValue("stamp"));
                        if (getSince() != null && delayedDate != null && delayedDate.before(getSince())) {
                            // Stop collecting history since we have exceded a limit
                            break;
                        }
                        if (getSeconds() > -1) {
                            Date current = new Date();
                            long diff = (current.getTime() - delayedDate.getTime()) / 1000;
                            if (getSeconds() <= diff) {
                                // Stop collecting history since we have exceded a limit
                                break;
                            }
                        }
                    }
                    catch (Exception e) {
                        Log.error("Error parsing date from historic message", e);
                    }

                }

                // Don't add the latest subject change if it's already in the history.
                if (addChangedSubject) {
                    if (changedSubject != null && changedSubject.equals(message)) {
                        addChangedSubject = false;
                    }
                }

                historyToSend.addFirst(message);
            }
            // Check if we should add the latest subject change.
            if (addChangedSubject) {
                historyToSend.addFirst(changedSubject);
            }
            // Send the smallest amount of traffic to the user
            for (Object aHistoryToSend : historyToSend) {
                joinRole.send((Message) aHistoryToSend);
            }
        }
    }
}

这里面主要是用于约定发送历史消息的一些参数:

private int maxChars = -1;
private int maxStanzas = -1;
private int seconds = -1;
private Date since;

这是能够设定的几个参数,具体的对应关系以下面的表格所示

历史管理属性

属性 数据类型 含义
maxchars int 限制历史中的字符总数为"X" (这里的字符数量是所有 XML 节的字符数, 不仅是它们的 XML 字符数据).
maxstanzas int 制历史中的消息总数为"X".
seconds int 仅发送最后 "X" 秒收到的消息.
since datetime 仅发送从指定日期时间 datetime 以后收到的消息 (这个datatime必须 MUST 符合XMPP Date and Time Profiles 13 定义的DateTime 规则,).

还有sendHistory

固然这里还实现了一个sendHistory方法,也就是针对客户端提交了查询要求时的历史消息发送方法。具体的实现上面的代码吧。也就是根据历史管理属性里设定的几个参数进行针对性的发送。

可是这里有个关键点就是since属性,它表示客户端能够设定一个时间戳,服务端根据发送这个时间戳以后的增量数据给客户端。这个对于客户端而已仍是颇有做用的。

实现群离线消息的方法

那么看完了openfire的历史消息的实现,再来实现离线消息是否是就简单的多了。群聊天历史消息有几个问题:

  • 问题1:群人员庞大历史消息巨大服务端如何缓存这些历史数据?好比一个群1000人,一人一天发10条,就有10000条/天,一个月就是30万,这还只是一个聊天群的,100个群就是3000万。
  • 问题2:对于群成员而言,可能一个月未登陆,那么可能就要接收这一个月的离线消息,客户端基本就崩了,网络流量也很巨大,怎么处理?

利用HistoryStrategy限制服务端推送条数

因此不用举太多问题,就这两个就够了,那么我以为openfire的这种历史消息策略中使用number(条数)是很重要的。好比服务器只缓存最近1000条聊天历史,这样总体的服务器缓存量就低了。这就解决了第一个问题。

若是群用户须要查询历史上的数据,应该是另开一个服务接口专门用于查询历史数据,这样就不用在刚上线进入群时接收一堆的离线消息。

利用HistoryRequest来获取增量数据

前面分析HistoryRequest时提到了它能够设置一个时间戳参数,这个是告诉服务端从这个参数以后的历史消息推送过来。

好比,用户A昨天晚20:00下的线(最后消息时间戳是2017-06-07 20:00:00),今天早上8:00上线。在用户A离线的时间里有100条离心线消息记录。

那么用户A上线,客户端发送HistoryRequest(since=2017-06-07 20:00:00),服务器则只发送2017-06-07 20:00:00以后的聊天记录100条。这样就实现了增量的消息,对于服务端和客户端都是友好的。

固然,这里能发多少消息最终仍是要看服务端缓存了多少消息用于发送给客户端,毕竟就像问题2中提出的那样,用户可能一个月都不上线,这期间的历史消息要是都推送那确定崩了。因此上线时的历史消息推送这个功能仅适合推送少许的数据。这个在具体的系统设计时应该根据实际状况来设计。

相关文章
相关标签/搜索