随便写写,可能有错的地方java
能够看出deliver对session的依赖很强,最后调用Connection的deliver方法进行信息的发送,实际上最下面用的是minaapi
信息的处理大约分为两种方式session
[1]:接收客户信息并做出响应(通常是在创建链接的时候),XMPPIoHandler的信息接收的回调函数中使用StanzaHandler中按照语法(Iq,Message,Presence)的类别进行分发到不一样类别的路由(Router)-->(Handler)-->Session-->Connection.deliverdom
[2]:对已经创建链接的用户进行消息的推送:NotificationManager.(sendBroadcast()|sendNotificationToUser())—>Session—>Connection.deliver函数
广播函数:链接创建以后,发送推送消息的广播接口spa
单个用户发送: 链接创建以后,根据用户名称进行推送的接口.net
分析:debug
1. 该方法为私有方法,那么确定是为了sendBroadcast和sendNotificationToUser两个方法所设立3d
2. 返回的是smack包中的IQ,使用IQ语法来发送信息code
下图显示了
IQ的set语法
自定义的命名空间NOTIFICATION,里面包含了自定的封装信息
数据包的截图:
private IQ createNotificationIQ(String apiKey, String title, String message, String uri) { Random random = new Random(); String id = Integer.toHexString(random.nextInt());//使用随机+hex的方法对id初始 // String id = String.valueOf(System.currentTimeMillis()); Element notification = DocumentHelper.createElement(QName.get( "notification", NOTIFICATION_NAMESPACE));//读取相应的namespace notification.addElement("id").setText(id); notification.addElement("apiKey").setText(apiKey); notification.addElement("title").setText(title); notification.addElement("message").setText(message); notification.addElement("uri").setText(uri); IQ iq = new IQ(); iq.setType(IQ.Type.set); iq.setChildElement(notification);//包装 return iq;
为XmppIoHandler信息接收器的直接引用
Xml信息的直接接收入口
对象属性 构造函数[可见实例化的StanzaHandler只为一个Connection进行服务,此实例在session创建的时候进行初始化] |
核心入口方法
public void process(String stanza, XMPPPacketReader reader) throws Exception { boolean initialStream = stanza.startsWith("<stream:stream");//判断是否为初始化状态 if (!sessionCreated || initialStream) { if (!initialStream) { return; // Ignore <?xml version="1.0"?>//若是session还未创建,而且initialStream并非初始化信息,直接弃掉 } if (!sessionCreated) {//若是session还未创建,不论stream是否为初始 sessionCreated = true;//是指session的标识符为创建 MXParser parser = reader.getXPPParser(); parser.setInput(new StringReader(stanza)); createSession(parser);//创建session } else if (startedTLS) {//若是session已经实例化,并且是xmpp的初始,判断是否须要进行tls握手 startedTLS = false; tlsNegotiated();//进行tls握手 } return; } // If end of stream was requested if (stanza.equals("</stream:stream>")) {//若是为结尾stanza session.close();//进行session的关闭操做 return; } // Ignore <?xml version="1.0"?> if (stanza.startsWith("<?xml")) { return; } // Create DOM object//这里排除了上述的可能性 Element doc = reader.read(new StringReader(stanza)).getRootElement();//得到根doc if (doc == null) { return; } String tag = doc.getName();//得到元素的name if ("starttls".equals(tag)) {//starttls关键字(重要)请求tls的通知节点 最后是利用Connection的startTLS(原理是在过滤器链的头就加入过滤器(这里用的是mina的基本方法)) if (negotiateTLS()) { // Negotiate TLS startedTLS = true; } else {//若是不成功,关闭链接 connection.close(); session = null; } } else if ("message".equals(tag)) {//message语法(按照图示的树形结构) processMessage(doc); } else if ("presence".equals(tag)) {//出席语法 log.debug("presence..."); processPresence(doc); } else if ("iq".equals(tag)) {//iq语法 log.debug("iq..."); processIQ(doc); } else { log.warn("Unexpected packet tag (not message, iq, presence)" + doc.asXML()); session.close(); } }
这个类把IQ packets分发到其相应的Handler之中
此类的核心方法
public void route(IQ packet) { if (packet == null) { throw new NullPointerException(); } JID sender = packet.getFrom();//获取JID ClientSession session = sessionManager.getSession(sender); if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED || ("jabber:iq:auth".equals(packet.getChildElement() .getNamespaceURI()) || "jabber:iq:register".equals(packet.getChildElement() .getNamespaceURI()) || "urn:ietf:params:xml:ns:xmpp-bind" .equals(packet.getChildElement().getNamespaceURI()))) { handle(packet);//在这里掉用私有方法进行回复 } else {//出现not_authorized 异常 IQ reply = IQ.createResultIQ(packet);//使用IQ自带的create方法对result进行建造 reply.setChildElement(packet.getChildElement().createCopy()); reply.setError(PacketError.Condition.not_authorized); session.process(reply);//直接包装成error的回复 } }
由route调用的私有方法
private void handle(IQ packet) { try { Element childElement = packet.getChildElement(); String namespace = null; if (childElement != null) { namespace = childElement.getNamespaceURI(); } if (namespace == null) { if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) { log.warn("Unknown packet " + packet); } } else { IQHandler handler = getHandler(namespace);//根据namespace获得handler if (handler == null) { sendErrorPacket(packet, PacketError.Condition.service_unavailable); } else { handler.process(packet); //进行操做 } } } catch (Exception e) {//处理中出现异常 log.error("Could not route packet", e); Session session = sessionManager.getSession(packet.getFrom()); if (session != null) { IQ reply = IQ.createResultIQ(packet); reply.setError(PacketError.Condition.internal_server_error); session.process(reply); } } }
获得handler的私有方法
private IQHandler getHandler(String namespace) { IQHandler handler = namespace2Handlers.get(namespace);//在这里调查namespace2Handler if (handler == null) { for (IQHandler handlerCandidate : iqHandlers) { if (namespace.equalsIgnoreCase(handlerCandidate.getNamespace())) { handler = handlerCandidate; namespace2Handlers.put(namespace, handler); break; } } } return handler; }
public IQRouter() { sessionManager = SessionManager.getInstance(); iqHandlers.add(new IQAuthHandler()); iqHandlers.add(new IQRegisterHandler()); iqHandlers.add(new IQRosterHandler()); }
从上面的代码能够看出所分配的Handler就是IQAuthHandler, IQRegisterHandler, IQRosterHandler利用handlerCandidate作的分类路由 |
出席路由:专门为了Presence packet作分发的路由
public void route(Presence packet) { if (packet == null) { throw new NullPointerException(); } ClientSession session = sessionManager.getSession(packet.getFrom()); if (session == null || session.getStatus() != Session.STATUS_CONNECTED) { handle(packet); } else { packet.setTo(session.getAddress()); packet.setFrom((JID) null); packet.setError(PacketError.Condition.not_authorized); session.process(packet); } }
private void handle(Presence packet) { try { Presence.Type type = packet.getType(); // Presence updates (null == 'available') if (type == null || Presence.Type.unavailable == type) {//不管是否可用交给handler进行操做 presenceUpdateHandler.process(packet);//在出席发送树之中,只有presenceUpdateHandler,须要对presenceUpdateHandler进行分析 } else { log.warn("Unknown presence type"); } } catch (Exception e) { log.error("Could not route packet", e); Session session = sessionManager.getSession(packet.getFrom()); if (session != null) { session.close(); } } }
PresenceUpdateHandler.process public void process(Packet packet) { ClientSession session = sessionManager.getSession(packet.getFrom()); try { Presence presence = (Presence) packet; Presence.Type type = presence.getType(); if (type == null) { // null == available if (session != null && session.getStatus() == Session.STATUS_CLOSED) { log.warn("Rejected available presence: " + presence + " - " + session); return; } if (session != null) { session.setPresence(presence); if (!session.isInitialized()) { // initSession(session); session.setInitialized(true); } //添加离线发送 String username = null; try { username = session.getUsername(); } catch (UserNotFoundException e) { log.error("User Not Found!"); } if(username != null && session.getPresence().isAvailable()) { (new NotificationManager()).pushCacheMessage(username); } } } else if (Presence.Type.unavailable == type) { if (session != null) { session.setPresence(presence); } } else { presence = presence.createCopy(); if (session != null) { presence.setFrom(new JID(null, session.getServerName(), null, true)); presence.setTo(session.getAddress()); } else { JID sender = presence.getFrom(); presence.setFrom(presence.getTo()); presence.setTo(sender); } presence.setError(PacketError.Condition.bad_request); PacketDeliverer.deliver(presence); } } catch (Exception e) { log.error("Internal server error. Triggered by packet: " + packet, e); } }