上一章,Session通过预建立、认证以后,才正常可用。认证时,最重要的操做,就是将Session加入到路由表,使之拥用了通讯功能。html
添加到至路由表的操做,是在SessionManager中操做的,以下:node
SessionManager.addSession(LocalClientSession session):算法
public void addSession(LocalClientSession session) { // Add session to the routing table (routing table will know session is not available yet) routingTable.addClientRoute(session.getAddress(), session); // Remove the pre-Authenticated session but remember to use the temporary ID as the key localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString()); SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ? SessionEventDispatcher.EventType.anonymous_session_created : SessionEventDispatcher.EventType.session_created; // Fire session created event. SessionEventDispatcher.dispatchEvent(session, event); if (ClusterManager.isClusteringStarted()) { // Track information about the session and share it with other cluster nodes sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session)); } }
进入路由表模块, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:数据库
public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); localRoutingTable.addRoute(route.toString(), destination); ...... return added; }
从这里能够看出,路由表的底层,是借助LocalRoutingTable类来实现。缓存
LocalRoutingTable类的成员构成,很是的简单:服务器
Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();
也就是说,路由表的实质,就是一个Map的数据结构,其Key为JID地址,Velue为RoutableChannelHandler类型报文处理器。session
查看路由表RoutingTableImpl模块中的路由添加方法,能够看到表中存储的是以RoutableChannelHandler衍生出来的几个Session类型,总共提供了三种:数据结构
LocalOutgoingServerSession(用于存储链接本机的远程服务端)、LocalClientSession(用于存储链接到本机的客户端)、RoutableChannelHandler(用于存储组件),类结构以下:dom
|-- RoutableChannelHandler |-- Session |-- LocalSession |-- LocalClientSession |-- LocalServerSession |-- LocalOutgoingServerSession
而LocalRoutingTable内的全部方法,就是一系列对这个Map结构的操做函数,核心的以下几个:ide
添加路由:
boolean addRoute(String address, RoutableChannelHandler route) { return routes.put(address, route) != route; }
获取路由:
RoutableChannelHandler getRoute(String address) { return routes.get(address); }
获取客户端的Session列表:
Collection<LocalClientSession> getClientRoutes() { List<LocalClientSession> sessions = new ArrayList<>(); for (RoutableChannelHandler route : routes.values()) { if (route instanceof LocalClientSession) { sessions.add((LocalClientSession) route); } } return sessions; }
移除路由
void removeRoute(String address) { routes.remove(address); }
还有一个每3分钟一次的定时任务,查询并关闭被闲置了的远程服务器Session,在路由表中启动该任务
public void start() { int period = 3 * 60 * 1000; TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period); }
路由表是Openfire的核心module之一,RoutingTable接口定义了一系列操做标准,主要围绕路由表进行,提供添加,删除,查询,消息路由等操做,而RoutingTableImpl负责具体实现。
/** * 缓存外部远程服务器session * Key: server domain, Value: nodeID */ private Cache<String, byte[]> serversCache; /** * 缓存服务器的组件 * Key: component domain, Value: list of nodeIDs hosting the component */ private Cache<String, Set<NodeID>> componentsCache; /** * 缓存已认证的客户端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> usersCache; /** * 缓存已认证匿名的客户端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> anonymousUsersCache; /** * 缓存已认证(包括匿名)的客户端Resource,一个用户,在每一端登陆,都会有一个resource * Key: bare JID, Value: list of full JIDs of the user */ private Cache<String, Collection<String>> usersSessions; private String serverName; // 服务器的域名 private XMPPServer server; // XMPP服务 private LocalRoutingTable localRoutingTable; // 路由表底层 private RemotePacketRouter remotePacketRouter; // 远程包路由器 private IQRouter iqRouter; // IQ包路由器 private MessageRouter messageRouter; // Message包路由器 private PresenceRouter presenceRouter; // Presence包路由器 private PresenceUpdateHandler presenceUpdateHandler; // 在线状态更新处理器
成员列表中,除了LocalRoutingTable以外,还定义了一堆的缓存。这些缓存干吗用?
Openfire支持集群机制,即在多台服务器上分别运行一个Openfire实例,并使各个实例的数据同步。算法一致,数据一致,用户无论链接到任意一台服务器,效果就都同样。
集群中的数据同步,除了数据库以外,其余的都是用经过缓存来处理,而上面的这些缓存正是集群同步的一部分,用于同步用户路由信息,每一个服务器都会有缓存的副本。
总的来讲,LocalRoutingTable用于存储本机的路由数据,而Cache中是存储了整个集群的路由数据。
可是,须要注意的一点,LocalRoutingTable与Cache,这二者的数据结构并不相同:
(1)LocalRoutingTable中记录了本机中全部的Session实例,能够用来通讯
(2)Cache中只存储了用户路由节点信息,须要经过集群管理组件来获取Session实例
路由表的操做,实际上就是在会话管理中,对会话实例的操做。为免与上面混淆,这一节的功能说明,以会话代称。
添加路由(会话)
代码以下:
@Override public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); // 加入到路由表 localRoutingTable.addRoute(route.toString(), destination); // 若为匿名客户端,添加到anonymousUsersCache、usersSessions缓存队列中 if (destination.getAuthToken().isAnonymous()) { Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache); try { lockAn.lock(); added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockAn.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); usersSessions.put(route.toBareJID(), Arrays.asList(route.toString())); } finally { lock.unlock(); } } } // 非匿名客户端,添加到usersCache、usersSessions缓存队列中 else { Lock lockU = CacheFactory.getLock(route.toString(), usersCache); try { lockU.lock(); added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockU.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids == null) { // Optimization - use different class depending on current setup if (ClusterManager.isClusteringStarted()) { jids = new HashSet<>(); } else { jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); } } jids.add(route.toString()); usersSessions.put(route.toBareJID(), jids); } finally { lock.unlock(); } } } return added; }
主要两步:
(1)添加到路由表
(2)添加到对应的缓存中
移除路由(会话)
代码以下:
@Override public boolean removeClientRoute(JID route) { boolean anonymous = false; String address = route.toString(); ClientRoute clientRoute = null; // 从缓存中移除客户端的Session信息 Lock lockU = CacheFactory.getLock(address, usersCache); try { lockU.lock(); clientRoute = usersCache.remove(address); } finally { lockU.unlock(); } if (clientRoute == null) { Lock lockA = CacheFactory.getLock(address, anonymousUsersCache); try { lockA.lock(); clientRoute = anonymousUsersCache.remove(address); anonymous = true; } finally { lockA.unlock(); } } if (clientRoute != null && route.getResource() != null) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); if (anonymous) { usersSessions.remove(route.toBareJID()); } else { Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids != null) { jids.remove(route.toString()); if (!jids.isEmpty()) { usersSessions.put(route.toBareJID(), jids); } else { usersSessions.remove(route.toBareJID()); } } } } finally { lock.unlock(); } } // 将对应客户端的Session信息,移出路由表 localRoutingTable.removeRoute(address); return clientRoute != null; }
操做与添加相似:
(1)移除缓存里的路由信息
(2)移除路由表中的信息
获取路由(会话)
@Override public ClientSession getClientRoute(JID jid) { // Check if this session is hosted by this cluster node ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString()); if (session == null) { // The session is not in this JVM so assume remote RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Check if the session is hosted by other cluster node ClientRoute route = usersCache.get(jid.toString()); if (route == null) { route = anonymousUsersCache.get(jid.toString()); } if (route != null) { session = locator.getClientSession(route.getNodeID().toByteArray(), jid); } } } return session; }
从上面的方法代码中能够看到,获取路由的方法是:先查找本地路由表,若获取不到对应Session时,则经过集群获取。RemoteSessionLocator是用于适配不一样的集群组件所抽象的接口,为不一样集群组件提供了透明处理。
至于如何从集群中获取Session,主要就在于sersCache和anonymousUsersCache这两个cache,它们记录了每一个客户端的路由节点信息,经过它能够取得对应的Session实例。详见第八章《集群管理》
根据发送的形式,分为两种:一是广播、二是单点路由
一、以广播的形式,向全部在线的客户端发送消息
@Override public void broadcastPacket(Message packet, boolean onlyLocal) { // Send the message to client sessions connected to this JVM for(ClientSession session : localRoutingTable.getClientRoutes()) { session.process(packet); } // Check if we need to broadcast the message to client sessions connected to remote cluter nodes if (!onlyLocal && remotePacketRouter != null) { remotePacketRouter.broadcastPacket(packet); } }
二、单点发送的形式,向某个指定的客户端发送消息
@Override public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException { boolean routed = false; try { if (serverName.equals(jid.getDomain())) { // Packet sent to our domain. routed = routeToLocalDomain(jid, packet, fromServer); Log.info("routeToLocalDomain"); } else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) { // Packet sent to component hosted in this server routed = routeToComponent(jid, packet, routed); Log.info("routeToComponent"); } else { // Packet sent to remote server routed = routeToRemoteDomain(jid, packet, routed); Log.info("routeToRemoteDomain"); } } catch (Exception ex) { // Catch here to ensure that all packets get handled, despite various processing // exceptions, rather than letting any fall through the cracks. For example, // an IAE could be thrown when running in a cluster if a remote member becomes // unavailable before the routing caches are updated to remove the defunct node. // We have also occasionally seen various flavors of NPE and other oddities, // typically due to unexpected environment or logic breakdowns. Log.error("Primary packet routing failed", ex); } if (!routed) { if (Log.isDebugEnabled()) { Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML()); } if (packet instanceof IQ) { iqRouter.routingFailed(jid, packet); } else if (packet instanceof Message) { messageRouter.routingFailed(jid, packet); } else if (packet instanceof Presence) { presenceRouter.routingFailed(jid, packet); } } }
路由表中的功能,最后由SessionManager集中处理,详见上一章的分析,这里再也不赘述。
特色提一点,比较有用:路由表作为一个module已经在Openfire主服务启动时完成实例化,因此,在自定义的插件、或者其余任何须要发送消息的地方,只需选择调用以下两个方法中之一,便可完成消息发送:
XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);
XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);
而消息发送中,最后消息如何送到网卡实现发送,在第三章《消息路由》中已经详细分析,一样再也不赘述。
本章就到此结束,OVER!