在研究如何实现Pushing功能期间,收集了不少关于Pushing的资料,其中有一个androidnp开源项目用的人比较多,可是因为长时间没有什么人去维护,据说bug的概率挺多的,为了之后本身的产品稳定些,因此就打算本身研究一下asmack的源码,本身作一个插件,androidnp移动端的源码中包含了一个叫作asmack的jar。html
Reader和Writerandroid
在asmack中有两个很是重要的对象PacketReader和PacketWriter,那么从类名上看Packet + (Reader/Wirter),而TCP/IP传输的数据,叫作Packet(包),asmack使用的是XMPP协议,XMPP简单讲就是使用TCP/IP协议 + XML流协议的组合。因此这个了对象的做用从字面上看应该是,写包与读包,做用为从服务端读写数据。设计模式
PacketWriter中必定含有一个Writer对象,这个Writer是一个输出流,一样的PacketReader对象中有一个Reader,而这个Reader是一个输入流,Writer和Reader对象就是一个简单的读写器,他们是从socket对象中获取出来后,通过装饰变成如今这个样子。数组
1 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); 2 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));
没有什么神奇的地方,主要看PacketWriter/Reader,这两个对象分别把对应的Writer和Reader引用到本身的内部进行操做,下面就先看一个PacketWriter。服务器
/** * Creates a new packet writer with the specified connection. * * @param connection the connection. */ protected PacketWriter(XMPPConnection connection) { this.queue = new ArrayBlockingQueue<Packet>(500, true); this.connection = connection; init(); }
还有就是PacketWriter初始化的时候将XMPPConnection对象传了进来,由于在init方法中使用到了XMPPConnection对象的writer成员,我想说的是,为何不直接传递writer成员?而是将整个对象XMPPConnection传了过来?其实这就是设计模式的好处,咱们若是每次都传递的是本身的成员,那么若是后期有改动,实现一个新的XMPPConnection与PacketWriter关联,那么老的代码维护起来是很巨大的,若是这里XMPPConnection和他的同事类PacketWriter都有相对应的接口,(XMPPConnection的接口是Connection)那就更完美了,而这里用到的模式应该是中介者,不是绝对意义的中介者,因为造成中介者的条件比较高,因此实际开发中可能是变形使用。PacketWriter对象在XMPPConnection中的connect方法中被初始化,它的最大做用是在其自身的内部建立了两个消息循环,其中一个用30s的heartbeats向服务器发送空白字符,保持长链接。而第二个循环则时刻从队列中主动取消息并发往服务器,而向外部提供的sendPacket方法则是向queue中添加消息,前面提到的循环机制都是在线程中工做,而消息的队列用的是ArrayBlockingQueue,这个无边界阻塞队列能够存听任何对象,这里存放的是Packet对象。并发
1 public void sendPacket(Packet packet) { 2 if (!done) { 3 try { 4 queue.put(packet); 5 } 6 catch (InterruptedException ie) { 7 ie.printStackTrace(); 8 return; 9 } 10 synchronized (queue) { 11 queue.notifyAll(); 12 } 13 } 14 }
while (!done && (writerThread == thisThread)) { Packet packet = nextPacket(); if (packet != null) { synchronized (writer) { writer.write(packet.toXML()); writer.flush(); // Keep track of the last time a stanza was sent to the server lastActive = System.currentTimeMillis(); } } }
消息循环则是一个经过各类成员变量控制的while loop,第一行的nextPacket方法是向queue中获取Packet消息,而且经过weiter将包发出去,这样生产/消费的模型就搭建好了,这里须要注意的是,我删减了不少影响阅读的代码,并无所有贴上。关于heartbeats循环其实也是一个在线程中运行的while loop,也是经过一些成员控制。wirter向服务端写了写什么?看下面的这个方法app
void openStream() throws IOException { StringBuilder stream = new StringBuilder(); stream.append("<stream:stream"); stream.append(" to=\"").append(connection.getServiceName()).append("\""); stream.append(" xmlns=\"jabber:client\""); stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\""); stream.append(" version=\"1.0\">"); writer.write(stream.toString()); writer.flush(); }
XML,没错,这也是符合XMPP协议规范的一种表现吧,至于更多XMPP协议的好处,因为本人的经验有限,就很少作点评,但愿后续会对其深刻了解。socket
下面看一个PacketReader这个类都包含了什么职责。工具
PacketReaderoop
PacketReader全部的核心逻辑都在一个线程中完成的,PacketReader的工做很专一,一样的在一个while loop中 不停的解析、刷新reader对象、同时做为事件源发送解析事后的各类Packet,解析这里用的是Android独特的Pull解析,Pull解析的特色事件驱动,在这里被彻底的利用了起来,随着不一样的标签,PacketReader都会作出不一样的处理,处理完这些数据用不一样Pocket对象封装,最后,分发出去,由监听者作最后的业务处理。
readerThread = new Thread() {
public void run() {
parsePackets(this);
}
};
因为解析过程的代码量过于多,我写到什么地方就分解什么地方,你们有时间最好本身看源码。
1、初始化/重置解析器
private void resetParser() {
try {
//用的是Pull解析
parser = XmlPullParserFactory.newInstance().newPullParser();
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
parser.setInput(connection.reader);
}
catch (XmlPullParserException xppe) {
xppe.printStackTrace();
}
}
上面这个resetParser方法还会在解析的过程当中碰到不一样的业务需求会不断的被调用,有用和业务逻辑比较紧密,没什么技术含量,关键是要看解析的方式和同时做为事件源发送解析事后的各类Packet,这两部分的设计,是很是的迷人的。
2、解析
do {
if (eventType == XmlPullParser.START_TAG) {
if (parser.getName().equals("message")) {
processPacket(PacketParserUtils.parseMessage(parser));
}
else if (parser.getName().equals("iq")) {
processPacket(PacketParserUtils.parseIQ(parser, connection));
}
else if (parser.getName().equals("presence")) {
processPacket(PacketParserUtils.parsePresence(parser));
}
PacketParserUtils是一个工具类,各个静态方法传入的仍是Parser对象,内部一样的使用Pull的方式进行解析,可是因为Pull是驱动解析,不会无端的浪费资源只会加载感兴趣的内容,试想一下,若是这里用Dom解析……PacketParserUtils的这些静态解析方法返回的实例对象也不同,从方法名能够看出有IQ、message、presence等,他们的父类为Packet,这些对象又被执行processPacket方法的时候传入
private void processPacket(Packet packet) {
if (packet == null) {
return;
}
// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: connection.getPacketCollectors()) {
collector.processPacket(packet);
}
// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}
processPacket方法内部有一个循环来转调collector.processPacket(packet);方法,前提是connection.getPacketCollectors()内部有货,到目前位置都没有涉及到PacketCollector这个接口的内容,他的做用实际上是一个观察者模式中的执行者的做用,也就是传说中的监听器,凡是注册了它的对象,均可以经过processPacket这个抽象方法,监听packet的变化。但是到如今任何对象都没有注册它,因此这个Loop尚未做用,由于目前咱们还处在链接的步骤(还没绕出来)。
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是个Runnable
/**
* A runnable to notify all listeners of a packet.
*/
private class ListenerNotification implements Runnable {
private Packet packet;
public ListenerNotification(Packet packet) {
this.packet = packet;
}
public void run() {
for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}
咱们上面看到listenerExecutor是一个线程池,在线程池中执行了一个凡是注册了ListenerWrapper的对象,都将接收到packet,一样的,到目前为止没有对象注册,(在RegisterTask过程当中ListenerWrapper被注册)
else if (eventType == XmlPullParser.END_TAG) {
if (parser.getName().equals("stream")) {
// Disconnect the connection
connection.disconnect();
}
}
当文档读取结束是将断开链接
void cleanup() {
connection.recvListeners.clear();
connection.collectors.clear();
}
看到了吗,只是将监听器接口集合清空而已,并无断开链接,或者取消消息循环
PacketReader对象的startup方法比较复杂,大致上执行了读取流,并将解析好的Packet对象发送给观察者,由观察者继续后续操做,目前观察者尚未出现,还有就是使用了线程池和令牌来操做执行线程,并且维护了一个connectionID成员,这个成员的做用还须要再看,这就很少说了。
关于Packet对象,packet对象有不少子类,上面举例了3个,其实还有不少,都是在parser时封装的
AuthMechanism\Challenge\Failure\IQ\Message\Presence\Response\Success
还有就是Pull解析的优势体现了出来,能够一个parser对象包含了不少信息,但可能没到一个时刻咱们须要的信息只是一小部分,这样用Pull解析的驱动式就大大减小了冗余的过程,PacketReader对象使用了2个监听器集合对象,PacketCollector、listenerWrapper,仍是那句话,还没看到观察者,因此还不知道什么状况下须要注册这两个监听。
到目前位置packetReader.startup()方法终于告一个段落了。
register过程分析
RegisterTask这个task在运行中,添加了一个监听,上面说道的PacketReader中有一个消息机制,在不停的解析服务器返回的结果,而后将解析事后的包分发给各个监听器(观察者),而register中就注册了一个监听器,比较有意思的是,监听器被注册时还加了一个过滤器,这个过滤器的目的是监听器只接收本身感兴趣的内容,这个设计真的很赞。这样就没必要在数据源头PacketReader中对数据进行过滤了,只要后期扩展本身Packet和本身的过滤器,就能达到排除本身不关心的信息的功能。
Registration registration = new Registration();
PacketFilter packetFilter = new AndFilter(new PacketIDFilter(registration.getPacketID()), new PacketTypeFilter(IQ.class));
其中Registration的类型其实一个IQ的子类,IQ是Packet的子类。
AndFilter是PacketFilter的子类,PacketFilter的种类型有不少,也能够本身扩展,AndFilter就是其中一个、PacketTypeFilter也是、PacketIDFilter也是,
其中PacketTypeFilter的构造方法传入一个IQ.class,其实就是经过这个类文件来过滤packet,这个PacketTypeFilter就是要设置关心的Packet,这里面它告诉监听器,只接收类型为IQ的Packet,这些Filter中都有一个关键方法,accept(Packet packet).这个accept方法每一个Filter的实现方式都不同,咱们可能够扩展本身的Filter而且重写这个方法,最有意思的是AndFilter这个类,他的构造方法传入的是一个动态数组,类型为PacketFilter,你能够传入你须要的过滤器,将他们当成组合条件使用来过滤Packet,这个就是典型的装饰设计模式和职责链模式的组合使用。
注册监听器
1 PacketListener packetListener = new PacketListener() { 2 //这一部分就是监听器接收到Packet后执行的后续操做 3 public void processPacket(Packet packet) { 4 Log.d("RegisterTask.PacketListener", "processPacket()....."); 5 Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML()); 6 7 if (packet instanceof IQ) { 8 IQ response = (IQ) packet; 9 if (response.getType() == IQ.Type.ERROR) { 10 if (!response.getError().toString().contains("409")) { 11 Log.e(LOGTAG, 12 "Unknown error while registering XMPP account! " 13 + response.getError() 14 .getCondition()); 15 } 16 } else if (response.getType() == IQ.Type.RESULT) { 17 xmppManager.setUsername(newUsername); 18 xmppManager.setPassword(newPassword); 19 Log.d(LOGTAG, "username=" + newUsername); 20 Log.d(LOGTAG, "password=" + newPassword); 21 22 Editor editor = sharedPrefs.edit(); 23 editor.putString(Constants.XMPP_USERNAME, 24 newUsername); 25 editor.putString(Constants.XMPP_PASSWORD, 26 newPassword); 27 editor.commit(); 28 Log 29 .i(LOGTAG, 30 "Account registered successfully"); 31 //执行task 32 xmppManager.runTask(); 33 } 34 } 35 } 36 };
addPacketListener方法传入一个监听器和过滤器,看一下内部
/** * Registers a packet listener with this connection. A packet filter determines * which packets will be delivered to the listener. If the same packet listener * is added again with a different filter, only the new filter will be used. * * @param packetListener the packet listener to notify of new received packets. * @param packetFilter the packet filter to use. */ public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) { if (packetListener == null) { throw new NullPointerException("Packet listener is null."); } ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); recvListeners.put(packetListener, wrapper); }
能够看到,监听器和过滤器被 ListenerWrapper 再次封装,后续的recvListeners这个集合将ListenerWrapper收入囊中,好整个注册过程完毕,就等待接收信息了,那么发送信息的地方在什么地方呢?分析connect过程时,上面的PacketReader中已经开始循环发送了,代码以下
listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是个Runnable
/** * A runnable to notify all listeners of a packet. */ private class ListenerNotification implements Runnable { private Packet packet; public ListenerNotification(Packet packet) { this.packet = packet; } public void run() { for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) { listenerWrapper.notifyListener(packet); } } }
而listenerWrapper的notifyListener(packet)内部,使用了传入的过滤器对Packet进行了过滤
/** * Notify and process the packet listener if the filter matches the packet. * * @param packet the packet which was sent or received. */ public void notifyListener(Packet packet) { if (packetFilter == null || packetFilter.accept(packet)) { packetListener.processPacket(packet); }
而具体的过滤机制仍是转调了传入的过滤器自己的过滤方式accept,很是的灵活。过滤完的Packet将被发送出去
这个方法connection.sendPacket(registration);将一个Registration对象发了出去,
public void sendPacket(Packet packet) { if (!isConnected()) { throw new IllegalStateException("Not connected to server."); } if (packet == null) { throw new NullPointerException("Packet is null."); } packetWriter.sendPacket(packet); }
内部转调的是 packetWriter.sendPacket(packet);之前提到过PacketWirter中有两个循环机制,其中一个就是在不停的访问队列来获取Packet,而这个sendPacket方法就是将消息写入队列中供消费者使用。
/** * Sends the specified packet to the server. * * @param packet the packet to send. */ public void sendPacket(Packet packet) { if (!done) { // Invoke interceptors for the new packet that is about to be sent. Interceptors // may modify the content of the packet. //内部执行了一个发送数据源的动做,也是为某些监听器对象服务的interceptorWrapper.notifyListener(packet); connection.firePacketInterceptors(packet); try { //将一个Packet对象放入到阻塞队列中,在上面的witerPacket方法中的wile循环中发送出去 queue.put(packet); } catch (InterruptedException ie) { ie.printStackTrace(); return; } synchronized (queue) { queue.notifyAll(); } // Process packet writer listeners. Note that we're using the sending // thread so it's expected that listeners are fast. connection.firePacketSendingListeners(packet); } }
其实,注册的过程就是在注册监听,这样在有消息发出时,才能够根据业务需求对消息进行接收和处理。
http://www.cnblogs.com/rioder/archive/2013/01/23/2873176.html